Files
Neo-ZQYY/apps/etl/connectors/feiqiu/scripts/compare_api_ods.py

385 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
比对 API 参考文档的 JSON 字段与 ODS 数据库表列,生成对比报告和 ALTER SQL。
支持 camelCase → snake_case 归一化匹配。
用法: python scripts/compare_api_ods.py
需要: psycopg2, python-dotenv
"""
import os, re, json, sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dotenv import load_dotenv
import psycopg2
load_dotenv()
PG_DSN = os.getenv("PG_DSN")
ENDPOINTS_DIR = os.path.join("docs", "api-reference", "endpoints")
REGISTRY_FILE = os.path.join("docs", "api-reference", "api_registry.json")
# ODS 元数据列ETL 框架自动添加,不属于 API 字段)
ODS_META_COLUMNS = {
"source_file", "source_endpoint", "fetched_at", "payload", "content_hash"
}
# JSON 类型 → 推荐 PG 类型映射
TYPE_MAP = {
"int": "bigint",
"float": "numeric(18,2)",
"string": "text",
"bool": "boolean",
"list": "jsonb",
"dict": "jsonb",
"object": "jsonb",
"array": "jsonb",
}
def camel_to_snake(name):
"""将 camelCase/PascalCase 转为 snake_case 小写"""
# 处理连续大写如 ABCDef → abc_def
s1 = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name)
s2 = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', s1)
return s2.lower()
def normalize_field_name(name):
"""统一字段名camelCase → snake_case → 全小写"""
return camel_to_snake(name).replace(".", "_").strip("_")
def parse_api_fields(md_path):
"""从 API 文档 md 中解析响应字段表,返回 {原始字段名: json_type}
跳过嵌套对象的子字段(如 siteProfile.xxx"""
fields = {}
with open(md_path, "r", encoding="utf-8") as f:
content = f.read()
# 格式: | # | 字段名 | 类型 | 示例值 |
pattern = r"\|\s*\d+\s*\|\s*`([^`]+)`\s*\|\s*(\w+)\s*\|"
for m in re.finditer(pattern, content):
field_name = m.group(1).strip()
field_type = m.group(2).strip().lower()
# 跳过嵌套子字段(如 siteProfile.address
if "." in field_name:
continue
fields[field_name] = field_type
return fields
def get_ods_columns(cursor, table_name):
"""查询 ODS 表的列信息,返回 {column_name: data_type}"""
cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'ods' AND table_name = %s
ORDER BY ordinal_position
""", (table_name,))
cols = {}
for row in cursor.fetchall():
cols[row[0]] = row[1]
return cols
def suggest_pg_type(json_type):
"""根据 JSON 类型推荐 PG 类型"""
return TYPE_MAP.get(json_type, "text")
def compare_table(api_fields, ods_columns, table_name):
"""比对单张表,使用归一化名称匹配。
返回 (truly_missing, extra_in_ods, matched_pairs, case_matched)
- truly_missing: API 有但 ODS 确实没有的字段 {api_name: json_type}
- extra_in_ods: ODS 有但 API 没有的列 {col_name: pg_type}
- matched_pairs: 精确匹配的字段 [(api_name, ods_name)]
- case_matched: 通过归一化匹配的字段 [(api_name, ods_name)]
"""
# 排除 ODS 元数据列
ods_biz = {k: v for k, v in ods_columns.items() if k not in ODS_META_COLUMNS}
# 建立归一化索引
# api: normalized → (original_name, type)
api_norm = {}
for name, typ in api_fields.items():
norm = normalize_field_name(name)
api_norm[norm] = (name, typ)
# ods: normalized → (original_name, type)
ods_norm = {}
for name, typ in ods_biz.items():
norm = name.lower() # ODS 列名已经是小写
ods_norm[norm] = (name, typ)
matched_pairs = []
case_matched = []
api_matched_norms = set()
ods_matched_norms = set()
# 第一轮精确匹配API 字段名 == ODS 列名)
for api_name, api_type in api_fields.items():
if api_name in ods_biz:
matched_pairs.append((api_name, api_name))
api_matched_norms.add(normalize_field_name(api_name))
ods_matched_norms.add(api_name)
# 第二轮归一化匹配camelCase → snake_case
for norm_name, (api_name, api_type) in api_norm.items():
if norm_name in api_matched_norms:
continue
if norm_name in ods_norm:
ods_name = ods_norm[norm_name][0]
if ods_name not in ods_matched_norms:
case_matched.append((api_name, ods_name))
api_matched_norms.add(norm_name)
ods_matched_norms.add(ods_name)
# 第三轮:尝试去掉下划线的纯小写匹配
for norm_name, (api_name, api_type) in api_norm.items():
if norm_name in api_matched_norms:
continue
flat = norm_name.replace("_", "")
for ods_col, (ods_name, ods_type) in ods_norm.items():
if ods_name in ods_matched_norms:
continue
if ods_col.replace("_", "") == flat:
case_matched.append((api_name, ods_name))
api_matched_norms.add(norm_name)
ods_matched_norms.add(ods_name)
break
# 计算真正缺失和多余
truly_missing = {}
for norm_name, (api_name, api_type) in api_norm.items():
if norm_name not in api_matched_norms:
truly_missing[api_name] = api_type
extra_in_ods = {}
for ods_name, ods_type in ods_biz.items():
if ods_name not in ods_matched_norms:
extra_in_ods[ods_name] = ods_type
return truly_missing, extra_in_ods, matched_pairs, case_matched
def generate_alter_sql(table_name, missing_fields):
"""生成 ALTER TABLE ADD COLUMN SQL列名用 snake_case"""
sqls = []
for field_name, json_type in sorted(missing_fields.items()):
pg_type = suggest_pg_type(json_type)
col_name = normalize_field_name(field_name)
sqls.append(
f"ALTER TABLE ods.{table_name} ADD COLUMN IF NOT EXISTS "
f"{col_name} {pg_type}; -- API 字段: {field_name}"
)
return sqls
def main():
# 加载 API 注册表
with open(REGISTRY_FILE, "r", encoding="utf-8") as f:
registry = json.load(f)
# 建立 id → ods_table 映射
api_to_ods = {}
api_names = {}
for entry in registry:
if entry.get("ods_table") and not entry.get("skip"):
api_to_ods[entry["id"]] = entry["ods_table"]
api_names[entry["id"]] = entry.get("name_zh", entry["id"])
conn = psycopg2.connect(PG_DSN)
cursor = conn.cursor()
results = []
all_alter_sqls = []
for api_id, ods_table in sorted(api_to_ods.items()):
md_path = os.path.join(ENDPOINTS_DIR, f"{api_id}.md")
if not os.path.exists(md_path):
results.append({
"api_id": api_id, "name_zh": api_names.get(api_id, ""),
"ods_table": ods_table, "status": "NO_DOC",
"api_fields": 0, "ods_cols": 0,
})
continue
api_fields = parse_api_fields(md_path)
ods_columns = get_ods_columns(cursor, ods_table)
if not ods_columns:
results.append({
"api_id": api_id, "name_zh": api_names.get(api_id, ""),
"ods_table": ods_table, "status": "NO_TABLE",
"api_fields": len(api_fields), "ods_cols": 0,
})
continue
missing, extra, matched, case_matched = compare_table(
api_fields, ods_columns, ods_table
)
alter_sqls = generate_alter_sql(ods_table, missing)
all_alter_sqls.extend(alter_sqls)
ods_biz_count = len({k: v for k, v in ods_columns.items()
if k not in ODS_META_COLUMNS})
status = "OK" if not missing else "DRIFT"
results.append({
"api_id": api_id,
"name_zh": api_names.get(api_id, ""),
"ods_table": ods_table,
"status": status,
"api_fields": len(api_fields),
"ods_cols": ods_biz_count,
"exact_match": len(matched),
"case_match": len(case_matched),
"total_match": len(matched) + len(case_matched),
"missing_in_ods": missing,
"extra_in_ods": extra,
"case_matched_pairs": case_matched,
})
cursor.close()
conn.close()
# ── 输出 JSON 报告 ──
_report_root = os.environ.get("ETL_REPORT_ROOT")
if not _report_root:
raise KeyError("环境变量 ETL_REPORT_ROOT 未定义。请在根 .env 中配置。")
report_json = os.path.join(_report_root, "api_ods_comparison.json")
os.makedirs(os.path.dirname(report_json), exist_ok=True)
# 序列化时把 tuple 转 list
json_results = []
for r in results:
jr = dict(r)
if "case_matched_pairs" in jr:
jr["case_matched_pairs"] = [list(p) for p in jr["case_matched_pairs"]]
if "missing_in_ods" in jr:
jr["missing_in_ods"] = dict(jr["missing_in_ods"])
if "extra_in_ods" in jr:
jr["extra_in_ods"] = dict(jr["extra_in_ods"])
json_results.append(jr)
with open(report_json, "w", encoding="utf-8") as f:
json.dump(json_results, f, ensure_ascii=False, indent=2)
# ── 输出 Markdown 报告 ──
report_md = os.path.join(_report_root, "api_ods_comparison.md")
with open(report_md, "w", encoding="utf-8") as f:
f.write("# API JSON 字段 vs ODS 表列 对比报告\n\n")
f.write("> 自动生成于 2026-02-13 | 数据来源:数据库实际表结构 + API 参考文档\n")
f.write("> 比对逻辑camelCase → snake_case 归一化匹配 + 去下划线纯小写兜底\n\n")
# 汇总
ok_count = sum(1 for r in results if r["status"] == "OK")
drift_count = sum(1 for r in results if r["status"] == "DRIFT")
total_missing = sum(len(r.get("missing_in_ods", {})) for r in results)
total_extra = sum(len(r.get("extra_in_ods", {})) for r in results)
f.write("## 汇总\n\n")
f.write("| 指标 | 值 |\n|------|----|")
f.write(f"\n| 比对表数 | {len(results)} |")
f.write(f"\n| 完全一致(含大小写归一化) | {ok_count} |")
f.write(f"\n| 存在差异 | {drift_count} |")
f.write(f"\n| ODS 缺失字段总数 | {total_missing} |")
f.write(f"\n| ODS 多余列总数 | {total_extra} |")
f.write(f"\n| 生成 ALTER SQL 数 | {len(all_alter_sqls)} |\n\n")
# 总览表
f.write("## 逐表对比总览\n\n")
f.write("| # | API ID | 中文名 | ODS 表 | 状态 | API字段 | ODS列 | 精确匹配 | 大小写匹配 | ODS缺失 | ODS多余 |\n")
f.write("|---|--------|--------|--------|------|---------|-------|----------|-----------|---------|--------|\n")
for i, r in enumerate(results, 1):
missing_count = len(r.get("missing_in_ods", {}))
extra_count = len(r.get("extra_in_ods", {}))
exact = r.get("exact_match", 0)
case = r.get("case_match", 0)
icon = "" if r["status"] == "OK" else "⚠️" if r["status"] == "DRIFT" else ""
f.write(f"| {i} | {r['api_id']} | {r.get('name_zh','')} | {r['ods_table']} | "
f"{icon} | {r['api_fields']} | {r['ods_cols']} | {exact} | {case} | "
f"{missing_count} | {extra_count} |\n")
# 差异详情
has_drift = any(r["status"] == "DRIFT" for r in results)
if has_drift:
f.write("\n## 差异详情\n\n")
for r in results:
if r["status"] != "DRIFT":
continue
f.write(f"### {r.get('name_zh','')}`{r['ods_table']}`\n\n")
missing = r.get("missing_in_ods", {})
extra = r.get("extra_in_ods", {})
case_pairs = r.get("case_matched_pairs", [])
if case_pairs:
f.write("**大小写归一化匹配(已自动对齐,无需操作):**\n\n")
f.write("| API 字段名 (camelCase) | ODS 列名 (lowercase) |\n")
f.write("|----------------------|---------------------|\n")
for api_n, ods_n in sorted(case_pairs):
f.write(f"| `{api_n}` | `{ods_n}` |\n")
f.write("\n")
if missing:
f.write("**ODS 真正缺失的字段(需要 ADD COLUMN**\n\n")
f.write("| 字段名 | JSON 类型 | 建议 PG 列名 | 建议 PG 类型 |\n")
f.write("|--------|-----------|-------------|-------------|\n")
for fname, ftype in sorted(missing.items()):
f.write(f"| `{fname}` | {ftype} | `{normalize_field_name(fname)}` | {suggest_pg_type(ftype)} |\n")
f.write("\n")
if extra:
f.write("**ODS 多余的列API 中不存在):**\n\n")
f.write("| 列名 | PG 类型 | 可能原因 |\n")
f.write("|------|---------|--------|\n")
for cname, ctype in sorted(extra.items()):
f.write(f"| `{cname}` | {ctype} | ETL 自行添加 / 历史遗留 / API 新版已移除 |\n")
f.write("\n")
# ── 输出 ALTER SQL ──
sql_path = os.path.join("database", "migrations", "20260213_align_ods_with_api.sql")
os.makedirs(os.path.dirname(sql_path), exist_ok=True)
with open(sql_path, "w", encoding="utf-8") as f:
f.write("-- ============================================================\n")
f.write("-- ODS 表与 API JSON 字段对齐迁移\n")
f.write("-- 自动生成于 2026-02-13\n")
f.write("-- 基于: docs/api-reference/ 文档 vs ods 实际表结构\n")
f.write("-- 比对逻辑: camelCase → snake_case 归一化后再比较\n")
f.write("-- ============================================================\n\n")
if all_alter_sqls:
f.write("BEGIN;\n\n")
current_table = ""
for sql in all_alter_sqls:
# 提取表名做分组注释
tbl = sql.split("ods.")[1].split(" ")[0]
if tbl != current_table:
if current_table:
f.write("\n")
f.write(f"-- ── {tbl} ──\n")
current_table = tbl
f.write(sql + "\n")
f.write("\nCOMMIT;\n")
else:
f.write("-- 无需变更,所有 ODS 表已与 API JSON 字段对齐。\n")
print(f"[完成] 比对 {len(results)} 张表")
print(f" - 完全一致: {ok_count}")
print(f" - 存在差异: {drift_count}")
print(f" - ODS 缺失字段: {total_missing}")
print(f" - ODS 多余列: {total_extra}")
print(f" - ALTER SQL: {len(all_alter_sqls)}")
print(f" - 报告: {report_md}")
print(f" - JSON: {report_json}")
print(f" - SQL: {sql_path}")
if __name__ == "__main__":
main()
# AI_CHANGELOG:
# - 日期: 2026-02-13
# - Prompt: P20260213-210000 — "用新梳理的API返回的JSON文档比对数据库ODS层"
# - 直接原因: 用户要求比对 API 参考文档与 ODS 实际表结构,生成对比报告和 ALTER SQL
# - 变更摘要: 新建比对脚本,支持 camelCase→snake_case 归一化匹配,输出 MD/JSON 报告和迁移 SQL
# - 风险与验证: 纯分析脚本不修改数据库验证python scripts/compare_api_ods.py 检查输出