Files
feiqiu-ETL/tmp/rewrite_schema_ods_doc_comments.py
2025-12-13 08:26:09 +08:00

561 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from pathlib import Path
from collections import defaultdict
SQL_PATH = Path(r"C:\dev\LLTQ\ETL\feiqiu-ETL\etl_billiards\database\schema_ODS_doc.sql")
DOC_DIR = Path(r"C:\dev\LLTQ\export\test-json-doc")
TABLE_CN = {
"member_profiles": "会员档案/会员账户信息",
"member_balance_changes": "会员余额变更流水",
"member_stored_value_cards": "会员储值/卡券账户列表",
"recharge_settlements": "充值结算记录",
"settlement_records": "结账/结算记录",
"assistant_cancellation_records": "助教作废/取消记录",
"assistant_accounts_master": "助教档案主数据",
"assistant_service_records": "助教服务流水",
"site_tables_master": "门店桌台主数据",
"table_fee_discount_records": "台费折扣记录",
"table_fee_transactions": "台费流水",
"goods_stock_movements": "商品库存变动流水",
"stock_goods_category_tree": "商品分类树",
"goods_stock_summary": "商品库存汇总",
"payment_transactions": "支付流水",
"refund_transactions": "退款流水",
"platform_coupon_redemption_records": "平台券核销/使用记录",
"tenant_goods_master": "租户商品主数据",
"group_buy_packages": "团购套餐主数据",
"group_buy_redemption_records": "团购核销记录",
"settlement_ticket_details": "结算小票明细",
"store_goods_master": "门店商品主数据",
"store_goods_sales_records": "门店商品销售流水",
}
COMMON_FIELD_PURPOSE = {
"tenant_id": "租户/品牌 ID用于商户维度过滤与关联。",
"site_id": "门店 ID用于门店维度过滤与关联。",
"register_site_id": "会员注册门店 ID用于归属门店维度关联。",
"site_name": "门店名称快照,用于直接展示。",
"id": "本表主键 ID用于唯一标识一条记录。",
"system_member_id": "系统级会员 ID跨门店/跨卡种统一到‘人’的维度)。",
"order_trade_no": "订单交易号,用于串联同一订单下的各类消费明细。",
"order_settle_id": "订单结算/结账主键,用于关联结算记录与小票明细。",
"order_pay_id": "关联支付流水的主键 ID用于追溯支付明细。",
"point": "积分余额,用于记录会员积分取值。",
"growth_value": "成长值/成长积分,用于会员成长与等级评估。",
"referrer_member_id": "推荐人会员 ID用于记录会员推荐/拉新关系。",
"create_time": "记录创建时间(业务侧产生时间)。",
"status": "状态枚举,用于标识记录当前业务状态。",
"user_status": "用户状态枚举,用于标识会员账户/用户可用状态。",
"is_delete": "逻辑删除标记0=否1=是)。",
"payload": "完整原始 JSON 记录快照,用于回溯与二次解析。",
"source_file": "ETL 元数据:原始导出文件名,用于数据追溯。",
"source_endpoint": "ETL 元数据:采集来源(接口/文件路径),用于数据追溯。",
"fetched_at": "ETL 元数据:采集/入库时间戳,用于口径对齐与增量处理。",
}
ETL_META_FIELDS = {"source_file", "source_endpoint", "fetched_at"}
def _first_sentence(text: str, max_len: int = 120) -> str:
s = re.sub(r"\s+", " ", (text or "").strip())
if not s:
return ""
parts = re.split(r"[。;;]\s*", s)
s = parts[0].strip() if parts else s
if len(s) > max_len:
s = s[: max_len - 1] + ""
return s
def _escape_sql(s: str) -> str:
return (s or "").replace("'", "''")
def normalize_key(s: str) -> str:
return re.sub(r"[_\-\s]", "", (s or "").lower())
def snake_to_lower_camel(s: str) -> str:
parts = re.split(r"[_\-\s]+", s)
if not parts:
return s
first = parts[0].lower()
rest = "".join(p[:1].upper() + p[1:] for p in parts[1:] if p)
return first + rest
def snake_to_upper_camel(s: str) -> str:
parts = re.split(r"[_\-\s]+", s)
return "".join(p[:1].upper() + p[1:] for p in parts if p)
def find_key_in_record(record: dict, token: str) -> str | None:
if not isinstance(record, dict) or not token:
return None
if token in record:
return token
norm_to_key = {normalize_key(k): k for k in record.keys()}
candidates = [
token,
token.lower(),
token.upper(),
snake_to_lower_camel(token),
snake_to_upper_camel(token),
]
for c in candidates:
nk = normalize_key(c)
if nk in norm_to_key:
return norm_to_key[nk]
return None
def _infer_purpose(_table: str, col: str) -> str:
if col in COMMON_FIELD_PURPOSE:
return COMMON_FIELD_PURPOSE[col]
lower = col.lower()
if lower.endswith("_id"):
return "标识类 ID 字段,用于关联/定位相关实体。"
if lower.endswith("_time") or lower.endswith("time"):
return "时间字段,用于记录业务时间点/发生时间。"
if any(k in lower for k in ["amount", "money", "fee", "price", "deduct", "cost"]):
return "金额字段,用于计费/结算/分摊等金额计算。"
if any(k in lower for k in ["count", "num", "number", "seconds", "qty"]):
return "数量/时长字段,用于统计与计量。"
if lower.endswith("_name") or lower.endswith("name"):
return "名称字段,用于展示与辅助识别。"
if lower.endswith("_code") or lower.endswith("code"):
return "编码/枚举字段,用于表示类型、等级或业务枚举。"
if lower.startswith("is_") or lower.startswith("able_") or lower.startswith("can_"):
return "布尔/开关字段,用于表示权限、可用性或状态开关。"
return "来自 JSON 导出的原始字段,用于保留业务取值。"
def _format_example(value, max_len: int = 120) -> str:
if value is None:
return "NULL"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, str):
s = value.strip()
if len(s) > max_len:
s = s[: max_len - 1] + ""
return s
if isinstance(value, list):
if not value:
return "[]"
sample = value[0]
rendered = json.dumps(sample, ensure_ascii=False)
if len(value) > 1:
rendered = f"[{rendered}, …] (len={len(value)})"
else:
rendered = f"[{rendered}]"
if len(rendered) > max_len:
rendered = rendered[: max_len - 1] + ""
return rendered
if isinstance(value, dict):
keys = list(value)[:6]
mini = {k: value.get(k) for k in keys}
rendered = json.dumps(mini, ensure_ascii=False)
if len(value) > len(keys):
rendered = rendered[:-1] + ", …}"
if len(rendered) > max_len:
rendered = rendered[: max_len - 1] + ""
return rendered
rendered = str(value)
if len(rendered) > max_len:
rendered = rendered[: max_len - 1] + ""
return rendered
def _find_best_record_list(data, columns):
cols = set(columns)
best = None
best_score = -1
queue = [(data, 0)]
visited = 0
while queue and visited < 20000:
node, depth = queue.pop(0)
visited += 1
if depth > 8:
continue
if isinstance(node, list):
if node and all(isinstance(x, dict) for x in node[:3]):
scores = []
for x in node[:5]:
scores.append(len(set(x.keys()) & cols))
score = sum(scores) / max(1, len(scores))
if score > best_score:
best_score = score
best = node
for x in node[:10]:
queue.append((x, depth + 1))
else:
for x in node[:50]:
queue.append((x, depth + 1))
elif isinstance(node, dict):
for v in list(node.values())[:80]:
queue.append((v, depth + 1))
return best
def _find_best_record_list_and_node(data, columns):
cols = set(columns)
best = None
best_score = -1
best_path = []
queue = [(data, 0, [])]
visited = 0
while queue and visited < 25000:
node, depth, path = queue.pop(0)
visited += 1
if depth > 10:
continue
if isinstance(node, list):
if node and all(isinstance(x, dict) for x in node[:3]):
scores = []
for x in node[:5]:
scores.append(len(set(x.keys()) & cols))
score = sum(scores) / max(1, len(scores))
if score > best_score:
best_score = score
best = node
best_path = path
for x in node[:10]:
queue.append((x, depth + 1, path))
else:
for x in node[:80]:
queue.append((x, depth + 1, path))
elif isinstance(node, dict):
for k, v in list(node.items())[:120]:
queue.append((v, depth + 1, path + [str(k)]))
node_str = ".".join(best_path) if best_path else "$"
return best or [], node_str
def _choose_examples(records, columns):
examples = {}
if not records:
return examples
for col in columns:
val = None
for r in records[:120]:
if isinstance(r, dict) and col in r and r[col] not in (None, ""):
val = r[col]
break
examples[col] = val
return examples
def _extract_header_fields(line: str, columns_set):
s = line.strip()
if not s:
return []
# 支持 1. id / 1.1 siteProfile / 8. tenant_id
m = re.match(r"^\d+(?:\.\d+)*[\.)]?\s+(.+)$", s)
if m:
s = m.group(1).strip()
parts = re.split(r"\s*[/、,]\s*", s)
fields = [p.strip() for p in parts if p.strip() in columns_set]
if not fields and s in columns_set:
fields = [s]
if fields and len(line) <= 120:
return fields
return []
def _parse_field_purpose_from_block(block_lines):
lines = [l.rstrip() for l in block_lines]
def pick_after_label(labels):
for i, l in enumerate(lines):
for lab in labels:
if lab in l:
after = l.split(lab, 1)[1].strip()
if after:
return after
buf = []
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
for k in range(j, len(lines)):
if not lines[k].strip():
break
if re.match(r"^[\w\u4e00-\u9fff]+[:]", lines[k].strip()):
break
buf.append(lines[k].strip())
if buf:
return " ".join(buf)
return ""
# 兼容「含义(结合其它文件):」「含义(推测):」等变体
picked = pick_after_label(["含义:", "含义:"])
if not picked:
for i, l in enumerate(lines):
s = l.strip()
m = re.match(r"^含义.*[:]\s*(.*)$", s)
if m:
after = m.group(1).strip()
if after:
picked = after
else:
buf = []
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
for k in range(j, len(lines)):
if not lines[k].strip():
break
if re.match(r"^[\w\u4e00-\u9fff]+[:]", lines[k].strip()):
break
buf.append(lines[k].strip())
if buf:
picked = " ".join(buf)
break
if not picked:
picked = pick_after_label(["作用:", "作用:"])
if not picked:
for i, l in enumerate(lines):
s = l.strip()
m = re.match(r"^作用.*[:]\s*(.*)$", s)
if m:
after = m.group(1).strip()
if after:
picked = after
break
if not picked:
# 兜底:尽量避开“类型:/唯一值个数:”这类描述
for l in lines:
s = l.strip()
if not s:
continue
if any(
s.startswith(prefix)
for prefix in [
"类型:",
"非空:",
"唯一值",
"观测",
"特征",
"统计",
"分布",
"说明:",
"关联:",
"结构关系",
"和其它表",
"重复记录",
"全部为",
]
):
continue
picked = s
break
return _first_sentence(picked, 160)
def _is_poor_purpose(purpose: str) -> bool:
s = (purpose or "").strip()
if not s:
return True
if s.endswith("") or s.endswith(":"):
return True
if s.startswith("全部为"):
return True
if s.startswith("含义") and ("" in s or ":" in s) and len(s) <= 12:
return True
return False
def parse_analysis(analysis_text: str, columns):
columns_set = set(columns)
blocks = defaultdict(list)
current_fields = []
buf = []
for raw in analysis_text.splitlines():
fields = _extract_header_fields(raw, columns_set)
if fields:
if current_fields and buf:
for f in current_fields:
blocks[f].extend(buf)
current_fields = fields
buf = []
else:
if current_fields:
buf.append(raw)
if current_fields and buf:
for f in current_fields:
blocks[f].extend(buf)
purposes = {}
for col in columns:
if col in blocks and blocks[col]:
p = _parse_field_purpose_from_block(blocks[col])
if p:
purposes[col] = p
return purposes
def parse_columns_from_ddl(create_sql: str):
start = create_sql.find("(")
end = create_sql.rfind(")")
body = create_sql[start + 1 : end]
cols = []
for line in body.splitlines():
s = line.strip().rstrip(",")
if not s:
continue
if s.startswith(")"):
continue
if s.upper().startswith("CONSTRAINT "):
continue
m = re.match(r"^([A-Za-z_][A-Za-z0-9_]*)\s+", s)
if not m:
continue
name = m.group(1)
if name.upper() in {"PRIMARY", "UNIQUE", "FOREIGN", "CHECK"}:
continue
cols.append(name)
return cols
def build_comment_block(table: str, columns, analysis_text: str, records):
# records_node: 由外部确定,避免这里重复遍历 JSON
records, records_node = records
purposes = parse_analysis(analysis_text, columns)
examples = _choose_examples(records, columns)
table_cn = TABLE_CN.get(table, table)
table_comment = (
f"ODS 原始明细表:{table_cn}"
f"来源C:/dev/LLTQ/export/test-json-doc/{table}.json分析{table}-Analysis.md。"
f"字段以导出原样为主ETL 补充 source_file/source_endpoint/fetched_at并保留 payload 为原始记录快照。"
)
lines = []
lines.append(f"COMMENT ON TABLE billiards_ods.{table} IS '{_escape_sql(table_comment)}';")
for col in columns:
json_file = f"{table}.json"
if col in ETL_META_FIELDS:
json_field = f"{json_file} - ETL元数据 - 无"
elif col == "payload":
json_field = f"{json_file} - {records_node} - $"
else:
actual = None
for r in records[:50]:
if isinstance(r, dict):
actual = find_key_in_record(r, col)
if actual:
break
field_name = actual or col
json_field = f"{json_file} - {records_node} - {field_name}"
purpose = purposes.get(col) or _infer_purpose(table, col)
purpose = _first_sentence(purpose, 140) or _infer_purpose(table, col)
if _is_poor_purpose(purpose):
purpose = COMMON_FIELD_PURPOSE.get(col) or _infer_purpose(table, col)
if col in ETL_META_FIELDS:
if col == "source_file":
ex = f"{table}.json"
elif col == "source_endpoint":
ex = f"C:/dev/LLTQ/export/test-json-doc/{table}.json"
else:
ex = "2025-11-10T00:00:00+08:00"
elif col == "payload":
ex = "{...}"
else:
ex = _format_example(examples.get(col))
func = purpose
if "用于" not in func:
func = "用于" + func.rstrip("")
# ODS来源表名-字段名ODS自身字段ETL补充字段标记
if col in ETL_META_FIELDS:
ods_src = f"{table} - {col}ETL补充"
else:
ods_src = f"{table} - {col}"
comment = (
f"【说明】{purpose}"
f" 【示例】{ex}{func})。"
f" 【ODS来源】{ods_src}"
f" 【JSON字段】{json_field}"
)
lines.append(
f"COMMENT ON COLUMN billiards_ods.{table}.{col} IS '{_escape_sql(comment)}';"
)
return "\n".join(lines)
text = SQL_PATH.read_text(encoding="utf-8")
newline = "\r\n" if "\r\n" in text else "\n"
kept = []
for raw_line in text.splitlines(True):
stripped = raw_line.lstrip()
if stripped.startswith("--"):
continue
if re.match(r"^\s*COMMENT ON\s+(TABLE|COLUMN)\s+", raw_line):
continue
kept.append(raw_line)
clean = "".join(kept)
create_re = re.compile(
r"(CREATE TABLE IF NOT EXISTS\s+billiards_ods\.(?P<table>[A-Za-z0-9_]+)\s*\([\s\S]*?\)\s*;)" ,
re.M,
)
out_parts = []
last = 0
count = 0
for m in create_re.finditer(clean):
out_parts.append(clean[last : m.end()])
table = m.group("table")
create_sql = m.group(1)
cols = parse_columns_from_ddl(create_sql)
analysis_text = (DOC_DIR / f"{table}-Analysis.md").read_text(encoding="utf-8")
data = json.loads((DOC_DIR / f"{table}.json").read_text(encoding="utf-8"))
record_list, record_node = _find_best_record_list_and_node(data, cols)
out_parts.append(newline + newline + build_comment_block(table, cols, analysis_text, (record_list, record_node)) + newline + newline)
last = m.end()
count += 1
out_parts.append(clean[last:])
result = "".join(out_parts)
result = re.sub(r"(?:\r?\n){4,}", newline * 3, result)
backup = SQL_PATH.with_suffix(SQL_PATH.suffix + ".rewrite2.bak")
backup.write_text(text, encoding="utf-8")
SQL_PATH.write_text(result, encoding="utf-8")
print(f"Rewrote comments for {count} tables. Backup: {backup}")