456 lines
18 KiB
Python
456 lines
18 KiB
Python
"""
|
||
字段排查脚本 — 数据流字段补全 Spec Task 1.1
|
||
|
||
对 12 张目标表执行排查流程:
|
||
1. 查 DWD 现有列
|
||
2. 查 ODS 现有列
|
||
3. 解析 FACT_MAPPINGS 现状(从 dwd_load_task.py 源码导入)
|
||
4. 判断自动映射(ODS 列名 == DWD 列名)
|
||
5. 输出排查记录表(markdown),标注每个字段的排查结论和建议操作
|
||
|
||
用法:
|
||
cd C:\\NeoZQYY
|
||
python scripts/ops/field_audit.py
|
||
python scripts/ops/field_audit.py --output path/to/output.md
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
from dotenv import load_dotenv
|
||
|
||
# ── 项目根目录 & 路径设置 ──
|
||
ROOT = Path(__file__).resolve().parents[2]
|
||
ETL_ROOT = ROOT / "apps" / "etl" / "connectors" / "feiqiu"
|
||
sys.path.insert(0, str(ETL_ROOT))
|
||
|
||
# 导入 FACT_MAPPINGS / TABLE_MAP(仅读取类属性,不实例化)
|
||
from tasks.dwd.dwd_load_task import DwdLoadTask
|
||
|
||
# ── SCD2 列集合(排查时忽略) ──
|
||
SCD2_COLS = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
|
||
|
||
# ── 需要排查的表及其疑似缺失字段 ──
|
||
AUDIT_TARGETS: list[dict] = [
|
||
{
|
||
"ods_table": "assistant_accounts_master",
|
||
"dwd_tables": ["dim_assistant", "dim_assistant_ex"],
|
||
"suspect_ods_cols": ["system_role_id", "job_num", "cx_unit_price", "pd_unit_price"],
|
||
"category": "A",
|
||
"notes": "4 个 ODS→DWD 未映射",
|
||
},
|
||
{
|
||
"ods_table": "assistant_service_records",
|
||
"dwd_tables": ["dwd_assistant_service_log", "dwd_assistant_service_log_ex"],
|
||
"suspect_ods_cols": ["site_assistant_id", "operator_id", "operator_name"],
|
||
"category": "A",
|
||
"notes": "3 个 ODS→DWD 未映射(site_assistant_id 可能已映射为 order_assistant_id)",
|
||
},
|
||
{
|
||
"ods_table": "store_goods_sales_records",
|
||
"dwd_tables": ["dwd_store_goods_sale", "dwd_store_goods_sale_ex"],
|
||
"suspect_ods_cols": ["discount_price"],
|
||
"category": "A",
|
||
"notes": "1 个 ODS→DWD 未映射(可能已映射为 discount_money)",
|
||
},
|
||
{
|
||
"ods_table": "member_balance_changes",
|
||
"dwd_tables": ["dwd_member_balance_change", "dwd_member_balance_change_ex"],
|
||
"suspect_ods_cols": ["relate_id"],
|
||
"category": "A",
|
||
"notes": "1 个 ODS→DWD 未映射",
|
||
},
|
||
{
|
||
"ods_table": "tenant_goods_master",
|
||
"dwd_tables": ["dim_tenant_goods", "dim_tenant_goods_ex"],
|
||
"suspect_ods_cols": ["commoditycode"],
|
||
"category": "A",
|
||
"notes": "1 个 ODS→DWD 未映射(可能已映射为 commodity_code_list)",
|
||
},
|
||
{
|
||
"ods_table": "site_tables_master",
|
||
"dwd_tables": ["dim_table", "dim_table_ex"],
|
||
"suspect_ods_cols": [
|
||
"sitename", "appletqrcodeurl", "audit_status", "charge_free",
|
||
"create_time", "delay_lights_time", "is_rest_area", "light_status",
|
||
"only_allow_groupon", "order_delay_time", "self_table",
|
||
"tablestatusname", "temporary_light_second", "virtual_table",
|
||
],
|
||
"category": "A",
|
||
"notes": "14 个 ODS→DWD 未映射",
|
||
},
|
||
{
|
||
"ods_table": "recharge_settlements",
|
||
"dwd_tables": ["dwd_recharge_order", "dwd_recharge_order_ex"],
|
||
"suspect_ods_cols": [
|
||
"electricityadjustmoney", "electricitymoney",
|
||
"mervousalesamount", "plcouponsaleamount", "realelectricitymoney",
|
||
],
|
||
"category": "B",
|
||
"notes": "5 个 ODS→DWD 未映射 + 5 个 DWD 无 ODS 源(驼峰/蛇形命名差异)",
|
||
},
|
||
{
|
||
"ods_table": "store_goods_master",
|
||
"dwd_tables": ["dim_store_goods", "dim_store_goods_ex"],
|
||
"suspect_ods_cols": [
|
||
"time_slot_sale", "batch_stock_quantity", "provisional_total_cost",
|
||
],
|
||
"category": "B",
|
||
"notes": "平层 + 嵌套展开 + ODS→DWD 补全",
|
||
},
|
||
{
|
||
"ods_table": "goods_stock_summary",
|
||
"dwd_tables": [], # 无 DWD 表,需新建
|
||
"suspect_ods_cols": [
|
||
"sitegoodsid", "goodsname", "goodsunit", "goodscategoryid",
|
||
"goodscategorysecondid", "categoryname", "rangestartstock",
|
||
"rangeendstock", "rangein", "rangeout", "rangesale",
|
||
"rangesalemoney", "rangeinventory", "currentstock",
|
||
],
|
||
"category": "C",
|
||
"notes": "14 个 ODS 字段,无 DWD 目标表,需新建",
|
||
},
|
||
{
|
||
"ods_table": "goods_stock_movements",
|
||
"dwd_tables": [], # 无 DWD 表,需新建
|
||
"suspect_ods_cols": [
|
||
# ODS 实际列名为驼峰式(无下划线)
|
||
"sitegoodsstockid", "tenantid", "siteid", "sitegoodsid",
|
||
"goodsname", "goodscategoryid", "goodssecondcategoryid",
|
||
"unit", "price", "stocktype", "changenum", "startnum",
|
||
"endnum", "changenuma", "startnuma", "endnuma",
|
||
"remark", "operatorname", "createtime",
|
||
],
|
||
"category": "C",
|
||
"notes": "19 个 ODS 字段,无 DWD 目标表,需新建",
|
||
},
|
||
]
|
||
|
||
# ── recharge_settlements 已知的 DWD 无 ODS 源字段(用于交叉比对) ──
|
||
RECHARGE_DWD_ORPHANS = [
|
||
"pl_coupon_sale_amount", "mervou_sales_amount",
|
||
"electricity_money", "real_electricity_money", "electricity_adjust_money",
|
||
]
|
||
|
||
|
||
def get_db_columns(cur, schema: str, table: str) -> list[str]:
|
||
"""查询数据库表的列名列表(小写)。"""
|
||
cur.execute(
|
||
"SELECT column_name FROM information_schema.columns "
|
||
"WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position",
|
||
(schema, table),
|
||
)
|
||
return [r["column_name"].lower() for r in cur.fetchall()]
|
||
|
||
|
||
def get_sample_values(conn, schema: str, table: str, column: str, limit: int = 5) -> list:
|
||
"""获取指定列的非空采样值(最多 limit 个)。失败时回滚并返回空列表。"""
|
||
cur = conn.cursor(cursor_factory=RealDictCursor)
|
||
try:
|
||
cur.execute(
|
||
f'SELECT DISTINCT "{column}" FROM "{schema}"."{table}" '
|
||
f'WHERE "{column}" IS NOT NULL LIMIT %s',
|
||
(limit,),
|
||
)
|
||
return [r[column] for r in cur.fetchall()]
|
||
except Exception:
|
||
conn.rollback()
|
||
return []
|
||
finally:
|
||
cur.close()
|
||
|
||
|
||
def parse_fact_mappings() -> dict[str, dict[str, str]]:
|
||
"""
|
||
解析 FACT_MAPPINGS,返回 {dwd_full_table: {dwd_col: ods_expr}} 的映射。
|
||
同时构建反向索引 {dwd_full_table: {ods_expr_lower: dwd_col}}。
|
||
"""
|
||
forward: dict[str, dict[str, str]] = {}
|
||
reverse: dict[str, dict[str, str]] = {}
|
||
for dwd_table, entries in DwdLoadTask.FACT_MAPPINGS.items():
|
||
fwd = {}
|
||
rev = {}
|
||
for dwd_col, ods_expr, _cast in entries:
|
||
fwd[dwd_col.lower()] = ods_expr
|
||
# 反向索引:ods 表达式 → dwd 列名
|
||
# 处理简单列名和 JSON 表达式
|
||
ods_key = ods_expr.lower().strip('"')
|
||
rev[ods_key] = dwd_col.lower()
|
||
forward[dwd_table] = fwd
|
||
reverse[dwd_table] = rev
|
||
return forward, reverse
|
||
|
||
|
||
def audit_one_table(
|
||
conn,
|
||
target: dict,
|
||
fm_forward: dict,
|
||
fm_reverse: dict,
|
||
) -> list[dict]:
|
||
"""
|
||
对单张表执行排查,返回排查记录列表。
|
||
每条记录: {ods_col, dwd_table, dwd_col_match, fm_status, conclusion, action, samples}
|
||
"""
|
||
cur = conn.cursor(cursor_factory=RealDictCursor)
|
||
ods_table = target["ods_table"]
|
||
dwd_tables = target["dwd_tables"]
|
||
suspect_cols = target["suspect_ods_cols"]
|
||
|
||
# 查 ODS 现有列
|
||
ods_cols = set(get_db_columns(cur, "ods", ods_table))
|
||
|
||
# 查各 DWD 表现有列
|
||
dwd_cols_map: dict[str, set[str]] = {}
|
||
for dt in dwd_tables:
|
||
dwd_cols_map[dt] = set(get_db_columns(cur, "dwd", dt))
|
||
|
||
records = []
|
||
for ods_col in suspect_cols:
|
||
ods_col_lower = ods_col.lower()
|
||
record = {
|
||
"ods_col": ods_col_lower,
|
||
"ods_exists": ods_col_lower in ods_cols,
|
||
"dwd_matches": [],
|
||
"fm_status": "未配置",
|
||
"conclusion": "",
|
||
"action": "",
|
||
"samples": [],
|
||
}
|
||
|
||
# 采样值
|
||
if record["ods_exists"]:
|
||
record["samples"] = get_sample_values(conn, "ods", ods_table, ods_col_lower)
|
||
|
||
# 遍历所有关联 DWD 表检查
|
||
for dt in dwd_tables:
|
||
dwd_full = f"dwd.{dt}"
|
||
dwd_cols = dwd_cols_map.get(dt, set())
|
||
fm_fwd = fm_forward.get(dwd_full, {})
|
||
fm_rev = fm_reverse.get(dwd_full, {})
|
||
|
||
# 检查 1: FACT_MAPPINGS 反向索引 — ODS 列是否已被映射
|
||
if ods_col_lower in fm_rev:
|
||
mapped_to = fm_rev[ods_col_lower]
|
||
record["dwd_matches"].append(f"{dt}.{mapped_to}")
|
||
record["fm_status"] = f"已映射 → {dt}.{mapped_to}"
|
||
record["conclusion"] = "已映射(FACT_MAPPINGS 显式配置)"
|
||
record["action"] = "无需变更"
|
||
break
|
||
|
||
# 检查 2: DWD 表中是否有同名列(自动映射)
|
||
if ods_col_lower in dwd_cols:
|
||
record["dwd_matches"].append(f"{dt}.{ods_col_lower}")
|
||
record["fm_status"] = "自动映射(同名列)"
|
||
record["conclusion"] = "已映射(自动匹配)"
|
||
record["action"] = "无需变更"
|
||
break
|
||
|
||
# 检查 3: DWD 表中是否有近似列名(蛇形/驼峰转换)
|
||
snake = _camel_to_snake(ods_col_lower)
|
||
if snake != ods_col_lower and snake in dwd_cols:
|
||
record["dwd_matches"].append(f"{dt}.{snake}")
|
||
# 还需检查 FACT_MAPPINGS 是否已配置此映射
|
||
if snake in fm_fwd:
|
||
record["fm_status"] = f"已映射 → {dt}.{snake}(命名转换)"
|
||
record["conclusion"] = "已映射(命名差异,FACT_MAPPINGS 已覆盖)"
|
||
record["action"] = "无需变更"
|
||
else:
|
||
record["fm_status"] = f"DWD 列存在 {dt}.{snake},但 FACT_MAPPINGS 未配置"
|
||
record["conclusion"] = "映射遗漏(DWD 列已存在,缺 FACT_MAPPINGS)"
|
||
record["action"] = "仅补充 FACT_MAPPINGS"
|
||
break
|
||
else:
|
||
# 所有 DWD 表都没找到匹配
|
||
if not record["ods_exists"]:
|
||
record["conclusion"] = "ODS 列不存在"
|
||
record["action"] = "需确认 API 是否返回该字段"
|
||
elif not dwd_tables:
|
||
record["conclusion"] = "无 DWD 目标表"
|
||
record["action"] = "需新建 DWD 表"
|
||
else:
|
||
record["conclusion"] = "确实缺失"
|
||
record["action"] = "需新增 DWD 列 + FACT_MAPPINGS"
|
||
|
||
records.append(record)
|
||
|
||
# 额外排查:recharge_settlements 的 DWD 无 ODS 源字段
|
||
if ods_table == "recharge_settlements":
|
||
for dwd_orphan in RECHARGE_DWD_ORPHANS:
|
||
orphan_record = {
|
||
"ods_col": f"(DWD orphan) {dwd_orphan}",
|
||
"ods_exists": False,
|
||
"dwd_matches": [],
|
||
"fm_status": "",
|
||
"conclusion": "",
|
||
"action": "",
|
||
"samples": [],
|
||
}
|
||
# 检查是否已在 FACT_MAPPINGS 中被映射
|
||
for dt in dwd_tables:
|
||
dwd_full = f"dwd.{dt}"
|
||
fm_fwd = fm_forward.get(dwd_full, {})
|
||
if dwd_orphan in fm_fwd:
|
||
src = fm_fwd[dwd_orphan]
|
||
orphan_record["fm_status"] = f"已映射 ← {src}"
|
||
orphan_record["conclusion"] = "已映射(FACT_MAPPINGS 已覆盖)"
|
||
orphan_record["action"] = "无需变更"
|
||
orphan_record["dwd_matches"].append(f"{dt}.{dwd_orphan}")
|
||
break
|
||
else:
|
||
orphan_record["conclusion"] = "DWD 列存在但无 ODS 映射"
|
||
orphan_record["action"] = "需补充 FACT_MAPPINGS"
|
||
records.append(orphan_record)
|
||
|
||
return records
|
||
|
||
|
||
def _camel_to_snake(name: str) -> str:
|
||
"""简易驼峰转蛇形:在大写字母前插入下划线。"""
|
||
import re
|
||
s1 = re.sub(r"([A-Z])", r"_\1", name)
|
||
return s1.lower().lstrip("_")
|
||
|
||
|
||
|
||
def generate_report(all_results: dict[str, list[dict]]) -> str:
|
||
"""生成 Markdown 排查报告。"""
|
||
lines: list[str] = []
|
||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
lines.append(f"# 字段排查报告\n")
|
||
lines.append(f"> 生成时间:{now_str}\n")
|
||
lines.append(f"> 排查范围:{len(all_results)} 张表\n")
|
||
|
||
# 汇总统计
|
||
total_fields = 0
|
||
already_mapped = 0
|
||
need_fm_only = 0
|
||
need_new_col = 0
|
||
need_new_table = 0
|
||
ods_missing = 0
|
||
|
||
for table, records in all_results.items():
|
||
for r in records:
|
||
total_fields += 1
|
||
action = r["action"]
|
||
if "无需变更" in action:
|
||
already_mapped += 1
|
||
elif "仅补充" in action:
|
||
need_fm_only += 1
|
||
elif "新增 DWD 列" in action:
|
||
need_new_col += 1
|
||
elif "新建 DWD 表" in action:
|
||
need_new_table += 1
|
||
elif "需确认" in action:
|
||
ods_missing += 1
|
||
|
||
lines.append("\n## 汇总\n")
|
||
lines.append(f"| 指标 | 数量 |")
|
||
lines.append(f"|------|------|")
|
||
lines.append(f"| 排查字段总数 | {total_fields} |")
|
||
lines.append(f"| 已映射(无需变更) | {already_mapped} |")
|
||
lines.append(f"| 映射遗漏(仅补 FACT_MAPPINGS) | {need_fm_only} |")
|
||
lines.append(f"| 确实缺失(需新增 DWD 列) | {need_new_col} |")
|
||
lines.append(f"| 无 DWD 表(需新建) | {need_new_table} |")
|
||
lines.append(f"| ODS 列不存在(需确认 API) | {ods_missing} |")
|
||
|
||
# 逐表详情
|
||
for target_info, records in all_results.items():
|
||
ods_table, category, notes = target_info
|
||
lines.append(f"\n---\n")
|
||
lines.append(f"## {ods_table}({category} 类)\n")
|
||
lines.append(f"> {notes}\n")
|
||
lines.append(f"| # | ODS 列 | ODS 存在 | DWD 匹配 | FACT_MAPPINGS 状态 | 排查结论 | 建议操作 | 采样值 |")
|
||
lines.append(f"|---|--------|---------|---------|-------------------|---------|---------|--------|")
|
||
for i, r in enumerate(records, 1):
|
||
ods_exists = "✅" if r["ods_exists"] else "❌"
|
||
dwd_match = ", ".join(r["dwd_matches"]) if r["dwd_matches"] else "—"
|
||
samples_str = ", ".join(str(s)[:30] for s in r["samples"][:3]) if r["samples"] else "—"
|
||
lines.append(
|
||
f"| {i} | `{r['ods_col']}` | {ods_exists} | {dwd_match} "
|
||
f"| {r['fm_status']} | {r['conclusion']} | **{r['action']}** | {samples_str} |"
|
||
)
|
||
|
||
# TABLE_MAP 覆盖检查
|
||
lines.append(f"\n---\n")
|
||
lines.append(f"## TABLE_MAP 注册状态\n")
|
||
lines.append(f"| DWD 表 | ODS 源表 | 已注册 |")
|
||
lines.append(f"|--------|---------|--------|")
|
||
for target in AUDIT_TARGETS:
|
||
for dt in target["dwd_tables"]:
|
||
dwd_full = f"dwd.{dt}"
|
||
ods_full = f"ods.{target['ods_table']}"
|
||
registered = dwd_full in DwdLoadTask.TABLE_MAP
|
||
reg_str = "✅" if registered else "❌ 未注册"
|
||
if registered:
|
||
actual_ods = DwdLoadTask.TABLE_MAP[dwd_full]
|
||
if actual_ods != ods_full:
|
||
reg_str = f"⚠️ 映射到 {actual_ods}"
|
||
lines.append(f"| `{dwd_full}` | `{ods_full}` | {reg_str} |")
|
||
# C 类无 DWD 表的
|
||
for target in AUDIT_TARGETS:
|
||
if not target["dwd_tables"]:
|
||
lines.append(f"| (待新建) | `ods.{target['ods_table']}` | ❌ 无 DWD 表 |")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="字段排查脚本")
|
||
parser.add_argument(
|
||
"--output", type=str, default=None,
|
||
help="输出文件路径(默认 $FIELD_AUDIT_ROOT/field_audit_report.md)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
# 加载环境变量
|
||
load_dotenv(ROOT / ".env")
|
||
load_dotenv(ROOT / ".env.local", override=True)
|
||
|
||
dsn = os.environ.get("PG_DSN")
|
||
if not dsn:
|
||
print("错误:未配置 PG_DSN 环境变量", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
print(f"连接数据库...")
|
||
conn = psycopg2.connect(dsn)
|
||
conn.autocommit = True
|
||
|
||
print(f"解析 FACT_MAPPINGS...")
|
||
fm_forward, fm_reverse = parse_fact_mappings()
|
||
|
||
# 执行排查
|
||
# key = (ods_table, category, notes) 用于报告分组
|
||
all_results: dict[tuple, list[dict]] = {}
|
||
for target in AUDIT_TARGETS:
|
||
key = (target["ods_table"], target["category"], target["notes"])
|
||
print(f"排查 {target['ods_table']}({target['category']} 类)...")
|
||
records = audit_one_table(conn, target, fm_forward, fm_reverse)
|
||
all_results[key] = records
|
||
# 打印简要结果
|
||
for r in records:
|
||
icon = "✅" if "无需变更" in r["action"] else "⚠️"
|
||
print(f" {icon} {r['ods_col']}: {r['conclusion']} → {r['action']}")
|
||
|
||
conn.close()
|
||
|
||
# 生成报告
|
||
report = generate_report(all_results)
|
||
# 从 .env 读取 FIELD_AUDIT_ROOT
|
||
from _env_paths import get_output_path
|
||
default_dir = get_output_path("FIELD_AUDIT_ROOT")
|
||
output_path = Path(args.output) if args.output else default_dir / "field_audit_report.md"
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_text(report, encoding="utf-8")
|
||
print(f"\n排查报告已生成:{output_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|