Files
Neo-ZQYY/scripts/ops/field_audit.py

456 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
字段排查脚本 — 数据流字段补全 Spec Task 1.1
对 12 张目标表执行排查流程:
1. 查 DWD 现有列
2. 查 ODS 现有列
3. 解析 FACT_MAPPINGS 现状(从 dwd_load_task.py 源码导入)
4. 判断自动映射ODS 列名 == DWD 列名)
5. 输出排查记录表markdown标注每个字段的排查结论和建议操作
用法:
cd C:\\NeoZQYY
python scripts/ops/field_audit.py
python scripts/ops/field_audit.py --output path/to/output.md
"""
from __future__ import annotations
import argparse
import os
import sys
from datetime import datetime
from pathlib import Path
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
# ── 项目根目录 & 路径设置 ──
ROOT = Path(__file__).resolve().parents[2]
ETL_ROOT = ROOT / "apps" / "etl" / "connectors" / "feiqiu"
sys.path.insert(0, str(ETL_ROOT))
# 导入 FACT_MAPPINGS / TABLE_MAP仅读取类属性不实例化
from tasks.dwd.dwd_load_task import DwdLoadTask
# ── SCD2 列集合(排查时忽略) ──
SCD2_COLS = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
# ── 需要排查的表及其疑似缺失字段 ──
AUDIT_TARGETS: list[dict] = [
{
"ods_table": "assistant_accounts_master",
"dwd_tables": ["dim_assistant", "dim_assistant_ex"],
"suspect_ods_cols": ["system_role_id", "job_num", "cx_unit_price", "pd_unit_price"],
"category": "A",
"notes": "4 个 ODS→DWD 未映射",
},
{
"ods_table": "assistant_service_records",
"dwd_tables": ["dwd_assistant_service_log", "dwd_assistant_service_log_ex"],
"suspect_ods_cols": ["site_assistant_id", "operator_id", "operator_name"],
"category": "A",
"notes": "3 个 ODS→DWD 未映射site_assistant_id 可能已映射为 order_assistant_id",
},
{
"ods_table": "store_goods_sales_records",
"dwd_tables": ["dwd_store_goods_sale", "dwd_store_goods_sale_ex"],
"suspect_ods_cols": ["discount_price"],
"category": "A",
"notes": "1 个 ODS→DWD 未映射(可能已映射为 discount_money",
},
{
"ods_table": "member_balance_changes",
"dwd_tables": ["dwd_member_balance_change", "dwd_member_balance_change_ex"],
"suspect_ods_cols": ["relate_id"],
"category": "A",
"notes": "1 个 ODS→DWD 未映射",
},
{
"ods_table": "tenant_goods_master",
"dwd_tables": ["dim_tenant_goods", "dim_tenant_goods_ex"],
"suspect_ods_cols": ["commoditycode"],
"category": "A",
"notes": "1 个 ODS→DWD 未映射(可能已映射为 commodity_code_list",
},
{
"ods_table": "site_tables_master",
"dwd_tables": ["dim_table", "dim_table_ex"],
"suspect_ods_cols": [
"sitename", "appletqrcodeurl", "audit_status", "charge_free",
"create_time", "delay_lights_time", "is_rest_area", "light_status",
"only_allow_groupon", "order_delay_time", "self_table",
"tablestatusname", "temporary_light_second", "virtual_table",
],
"category": "A",
"notes": "14 个 ODS→DWD 未映射",
},
{
"ods_table": "recharge_settlements",
"dwd_tables": ["dwd_recharge_order", "dwd_recharge_order_ex"],
"suspect_ods_cols": [
"electricityadjustmoney", "electricitymoney",
"mervousalesamount", "plcouponsaleamount", "realelectricitymoney",
],
"category": "B",
"notes": "5 个 ODS→DWD 未映射 + 5 个 DWD 无 ODS 源(驼峰/蛇形命名差异)",
},
{
"ods_table": "store_goods_master",
"dwd_tables": ["dim_store_goods", "dim_store_goods_ex"],
"suspect_ods_cols": [
"time_slot_sale", "batch_stock_quantity", "provisional_total_cost",
],
"category": "B",
"notes": "平层 + 嵌套展开 + ODS→DWD 补全",
},
{
"ods_table": "goods_stock_summary",
"dwd_tables": [], # 无 DWD 表,需新建
"suspect_ods_cols": [
"sitegoodsid", "goodsname", "goodsunit", "goodscategoryid",
"goodscategorysecondid", "categoryname", "rangestartstock",
"rangeendstock", "rangein", "rangeout", "rangesale",
"rangesalemoney", "rangeinventory", "currentstock",
],
"category": "C",
"notes": "14 个 ODS 字段,无 DWD 目标表,需新建",
},
{
"ods_table": "goods_stock_movements",
"dwd_tables": [], # 无 DWD 表,需新建
"suspect_ods_cols": [
# ODS 实际列名为驼峰式(无下划线)
"sitegoodsstockid", "tenantid", "siteid", "sitegoodsid",
"goodsname", "goodscategoryid", "goodssecondcategoryid",
"unit", "price", "stocktype", "changenum", "startnum",
"endnum", "changenuma", "startnuma", "endnuma",
"remark", "operatorname", "createtime",
],
"category": "C",
"notes": "19 个 ODS 字段,无 DWD 目标表,需新建",
},
]
# ── recharge_settlements 已知的 DWD 无 ODS 源字段(用于交叉比对) ──
RECHARGE_DWD_ORPHANS = [
"pl_coupon_sale_amount", "mervou_sales_amount",
"electricity_money", "real_electricity_money", "electricity_adjust_money",
]
def get_db_columns(cur, schema: str, table: str) -> list[str]:
"""查询数据库表的列名列表(小写)。"""
cur.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position",
(schema, table),
)
return [r["column_name"].lower() for r in cur.fetchall()]
def get_sample_values(conn, schema: str, table: str, column: str, limit: int = 5) -> list:
"""获取指定列的非空采样值(最多 limit 个)。失败时回滚并返回空列表。"""
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
f'SELECT DISTINCT "{column}" FROM "{schema}"."{table}" '
f'WHERE "{column}" IS NOT NULL LIMIT %s',
(limit,),
)
return [r[column] for r in cur.fetchall()]
except Exception:
conn.rollback()
return []
finally:
cur.close()
def parse_fact_mappings() -> dict[str, dict[str, str]]:
"""
解析 FACT_MAPPINGS返回 {dwd_full_table: {dwd_col: ods_expr}} 的映射。
同时构建反向索引 {dwd_full_table: {ods_expr_lower: dwd_col}}。
"""
forward: dict[str, dict[str, str]] = {}
reverse: dict[str, dict[str, str]] = {}
for dwd_table, entries in DwdLoadTask.FACT_MAPPINGS.items():
fwd = {}
rev = {}
for dwd_col, ods_expr, _cast in entries:
fwd[dwd_col.lower()] = ods_expr
# 反向索引ods 表达式 → dwd 列名
# 处理简单列名和 JSON 表达式
ods_key = ods_expr.lower().strip('"')
rev[ods_key] = dwd_col.lower()
forward[dwd_table] = fwd
reverse[dwd_table] = rev
return forward, reverse
def audit_one_table(
conn,
target: dict,
fm_forward: dict,
fm_reverse: dict,
) -> list[dict]:
"""
对单张表执行排查,返回排查记录列表。
每条记录: {ods_col, dwd_table, dwd_col_match, fm_status, conclusion, action, samples}
"""
cur = conn.cursor(cursor_factory=RealDictCursor)
ods_table = target["ods_table"]
dwd_tables = target["dwd_tables"]
suspect_cols = target["suspect_ods_cols"]
# 查 ODS 现有列
ods_cols = set(get_db_columns(cur, "ods", ods_table))
# 查各 DWD 表现有列
dwd_cols_map: dict[str, set[str]] = {}
for dt in dwd_tables:
dwd_cols_map[dt] = set(get_db_columns(cur, "dwd", dt))
records = []
for ods_col in suspect_cols:
ods_col_lower = ods_col.lower()
record = {
"ods_col": ods_col_lower,
"ods_exists": ods_col_lower in ods_cols,
"dwd_matches": [],
"fm_status": "未配置",
"conclusion": "",
"action": "",
"samples": [],
}
# 采样值
if record["ods_exists"]:
record["samples"] = get_sample_values(conn, "ods", ods_table, ods_col_lower)
# 遍历所有关联 DWD 表检查
for dt in dwd_tables:
dwd_full = f"dwd.{dt}"
dwd_cols = dwd_cols_map.get(dt, set())
fm_fwd = fm_forward.get(dwd_full, {})
fm_rev = fm_reverse.get(dwd_full, {})
# 检查 1: FACT_MAPPINGS 反向索引 — ODS 列是否已被映射
if ods_col_lower in fm_rev:
mapped_to = fm_rev[ods_col_lower]
record["dwd_matches"].append(f"{dt}.{mapped_to}")
record["fm_status"] = f"已映射 → {dt}.{mapped_to}"
record["conclusion"] = "已映射FACT_MAPPINGS 显式配置)"
record["action"] = "无需变更"
break
# 检查 2: DWD 表中是否有同名列(自动映射)
if ods_col_lower in dwd_cols:
record["dwd_matches"].append(f"{dt}.{ods_col_lower}")
record["fm_status"] = "自动映射(同名列)"
record["conclusion"] = "已映射(自动匹配)"
record["action"] = "无需变更"
break
# 检查 3: DWD 表中是否有近似列名(蛇形/驼峰转换)
snake = _camel_to_snake(ods_col_lower)
if snake != ods_col_lower and snake in dwd_cols:
record["dwd_matches"].append(f"{dt}.{snake}")
# 还需检查 FACT_MAPPINGS 是否已配置此映射
if snake in fm_fwd:
record["fm_status"] = f"已映射 → {dt}.{snake}(命名转换)"
record["conclusion"] = "已映射命名差异FACT_MAPPINGS 已覆盖)"
record["action"] = "无需变更"
else:
record["fm_status"] = f"DWD 列存在 {dt}.{snake},但 FACT_MAPPINGS 未配置"
record["conclusion"] = "映射遗漏DWD 列已存在,缺 FACT_MAPPINGS"
record["action"] = "仅补充 FACT_MAPPINGS"
break
else:
# 所有 DWD 表都没找到匹配
if not record["ods_exists"]:
record["conclusion"] = "ODS 列不存在"
record["action"] = "需确认 API 是否返回该字段"
elif not dwd_tables:
record["conclusion"] = "无 DWD 目标表"
record["action"] = "需新建 DWD 表"
else:
record["conclusion"] = "确实缺失"
record["action"] = "需新增 DWD 列 + FACT_MAPPINGS"
records.append(record)
# 额外排查recharge_settlements 的 DWD 无 ODS 源字段
if ods_table == "recharge_settlements":
for dwd_orphan in RECHARGE_DWD_ORPHANS:
orphan_record = {
"ods_col": f"(DWD orphan) {dwd_orphan}",
"ods_exists": False,
"dwd_matches": [],
"fm_status": "",
"conclusion": "",
"action": "",
"samples": [],
}
# 检查是否已在 FACT_MAPPINGS 中被映射
for dt in dwd_tables:
dwd_full = f"dwd.{dt}"
fm_fwd = fm_forward.get(dwd_full, {})
if dwd_orphan in fm_fwd:
src = fm_fwd[dwd_orphan]
orphan_record["fm_status"] = f"已映射 ← {src}"
orphan_record["conclusion"] = "已映射FACT_MAPPINGS 已覆盖)"
orphan_record["action"] = "无需变更"
orphan_record["dwd_matches"].append(f"{dt}.{dwd_orphan}")
break
else:
orphan_record["conclusion"] = "DWD 列存在但无 ODS 映射"
orphan_record["action"] = "需补充 FACT_MAPPINGS"
records.append(orphan_record)
return records
def _camel_to_snake(name: str) -> str:
"""简易驼峰转蛇形:在大写字母前插入下划线。"""
import re
s1 = re.sub(r"([A-Z])", r"_\1", name)
return s1.lower().lstrip("_")
def generate_report(all_results: dict[str, list[dict]]) -> str:
"""生成 Markdown 排查报告。"""
lines: list[str] = []
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines.append(f"# 字段排查报告\n")
lines.append(f"> 生成时间:{now_str}\n")
lines.append(f"> 排查范围:{len(all_results)} 张表\n")
# 汇总统计
total_fields = 0
already_mapped = 0
need_fm_only = 0
need_new_col = 0
need_new_table = 0
ods_missing = 0
for table, records in all_results.items():
for r in records:
total_fields += 1
action = r["action"]
if "无需变更" in action:
already_mapped += 1
elif "仅补充" in action:
need_fm_only += 1
elif "新增 DWD 列" in action:
need_new_col += 1
elif "新建 DWD 表" in action:
need_new_table += 1
elif "需确认" in action:
ods_missing += 1
lines.append("\n## 汇总\n")
lines.append(f"| 指标 | 数量 |")
lines.append(f"|------|------|")
lines.append(f"| 排查字段总数 | {total_fields} |")
lines.append(f"| 已映射(无需变更) | {already_mapped} |")
lines.append(f"| 映射遗漏(仅补 FACT_MAPPINGS | {need_fm_only} |")
lines.append(f"| 确实缺失(需新增 DWD 列) | {need_new_col} |")
lines.append(f"| 无 DWD 表(需新建) | {need_new_table} |")
lines.append(f"| ODS 列不存在(需确认 API | {ods_missing} |")
# 逐表详情
for target_info, records in all_results.items():
ods_table, category, notes = target_info
lines.append(f"\n---\n")
lines.append(f"## {ods_table}{category} 类)\n")
lines.append(f"> {notes}\n")
lines.append(f"| # | ODS 列 | ODS 存在 | DWD 匹配 | FACT_MAPPINGS 状态 | 排查结论 | 建议操作 | 采样值 |")
lines.append(f"|---|--------|---------|---------|-------------------|---------|---------|--------|")
for i, r in enumerate(records, 1):
ods_exists = "" if r["ods_exists"] else ""
dwd_match = ", ".join(r["dwd_matches"]) if r["dwd_matches"] else ""
samples_str = ", ".join(str(s)[:30] for s in r["samples"][:3]) if r["samples"] else ""
lines.append(
f"| {i} | `{r['ods_col']}` | {ods_exists} | {dwd_match} "
f"| {r['fm_status']} | {r['conclusion']} | **{r['action']}** | {samples_str} |"
)
# TABLE_MAP 覆盖检查
lines.append(f"\n---\n")
lines.append(f"## TABLE_MAP 注册状态\n")
lines.append(f"| DWD 表 | ODS 源表 | 已注册 |")
lines.append(f"|--------|---------|--------|")
for target in AUDIT_TARGETS:
for dt in target["dwd_tables"]:
dwd_full = f"dwd.{dt}"
ods_full = f"ods.{target['ods_table']}"
registered = dwd_full in DwdLoadTask.TABLE_MAP
reg_str = "" if registered else "❌ 未注册"
if registered:
actual_ods = DwdLoadTask.TABLE_MAP[dwd_full]
if actual_ods != ods_full:
reg_str = f"⚠️ 映射到 {actual_ods}"
lines.append(f"| `{dwd_full}` | `{ods_full}` | {reg_str} |")
# C 类无 DWD 表的
for target in AUDIT_TARGETS:
if not target["dwd_tables"]:
lines.append(f"| (待新建) | `ods.{target['ods_table']}` | ❌ 无 DWD 表 |")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="字段排查脚本")
parser.add_argument(
"--output", type=str, default=None,
help="输出文件路径(默认 $FIELD_AUDIT_ROOT/field_audit_report.md",
)
args = parser.parse_args()
# 加载环境变量
load_dotenv(ROOT / ".env")
load_dotenv(ROOT / ".env.local", override=True)
dsn = os.environ.get("PG_DSN")
if not dsn:
print("错误:未配置 PG_DSN 环境变量", file=sys.stderr)
sys.exit(1)
print(f"连接数据库...")
conn = psycopg2.connect(dsn)
conn.autocommit = True
print(f"解析 FACT_MAPPINGS...")
fm_forward, fm_reverse = parse_fact_mappings()
# 执行排查
# key = (ods_table, category, notes) 用于报告分组
all_results: dict[tuple, list[dict]] = {}
for target in AUDIT_TARGETS:
key = (target["ods_table"], target["category"], target["notes"])
print(f"排查 {target['ods_table']}{target['category']} 类)...")
records = audit_one_table(conn, target, fm_forward, fm_reverse)
all_results[key] = records
# 打印简要结果
for r in records:
icon = "" if "无需变更" in r["action"] else "⚠️"
print(f" {icon} {r['ods_col']}: {r['conclusion']}{r['action']}")
conn.close()
# 生成报告
report = generate_report(all_results)
# 从 .env 读取 FIELD_AUDIT_ROOT
from _env_paths import get_output_path
default_dir = get_output_path("FIELD_AUDIT_ROOT")
output_path = Path(args.output) if args.output else default_dir / "field_audit_report.md"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"\n排查报告已生成:{output_path}")
if __name__ == "__main__":
main()