# -*- coding: utf-8 -*- r""" 黑盒集成测试报告 -- 从 API 输入侧与 DB 输出侧(ODS/DWD/DWS)进行全链路对比。 用法: cd C:\NeoZQYY uv run python scripts/ops/blackbox_test_report.py 输出: ETL_REPORT_ROOT / blackbox_report_.md """ from __future__ import annotations import json import os import re import sys from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo from dotenv import load_dotenv _ROOT = Path(__file__).resolve().parents[2] load_dotenv(_ROOT / ".env", override=False) # ── 路径 ────────────────────────────────────────────────────────────── def _env(var: str) -> Path: val = os.environ.get(var) if not val: raise KeyError(f"环境变量 {var} 未定义") p = Path(val) p.mkdir(parents=True, exist_ok=True) return p REPORT_ROOT = _env("ETL_REPORT_ROOT") JSON_ROOT = _env("FETCH_ROOT") LOG_ROOT = _env("LOG_ROOT") TZ = ZoneInfo("Asia/Shanghai") NOW = datetime.now(TZ) TS = NOW.strftime("%Y%m%d_%H%M%S") import psycopg2 import psycopg2.extras DSN = os.environ["PG_DSN"] # ── 1. 解析 ETL 日志 ───────────────────────────────────────────────── def find_latest_log() -> Path | None: """找到最新的 ETL 日志文件""" logs = sorted(LOG_ROOT.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True) return logs[0] if logs else None def parse_etl_log(log_path: Path) -> dict: """解析 ETL 日志,提取任务执行结果""" results = {} current_task = None task_start_times = {} with open(log_path, "r", encoding="utf-8") as f: for line in f: # 匹配任务开始 m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*开始执行(\w+) \(ODS\)", line) if m: current_task = m.group(2) task_start_times[current_task] = m.group(1) continue # 匹配 ODS 任务完成 m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*?(\w+) ODS 任务完成: (\{.*\})", line) if m: task_name = m.group(2) end_time = m.group(1) try: counts = eval(m.group(3)) except Exception: counts = {} start_time = task_start_times.get(task_name, "") results[task_name] = { "status": "SUCC", "layer": "ODS", "start": start_time, "end": end_time, "counts": counts, } continue # 匹配 DWD 完成 m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*DWD_LOAD_FROM_ODS.*完成.*?(\d+).*表", line) if m: results["DWD_LOAD_FROM_ODS"] = { "status": "SUCC", "layer": "DWD", "end": m.group(1), } continue # 匹配 DWS/INDEX 成功 m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*?(\w+): 工具类任务执行成功", line) if m: results[m.group(2)] = { "status": "SUCC", "layer": "DWS/INDEX", "end": m.group(1), } continue # 匹配任务失败 m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*?(\w+): 工具类任务执行失败: (.*)", line) if m: results[m.group(2)] = { "status": "FAIL", "layer": "DWS/INDEX", "end": m.group(1), "error": m.group(3)[:120], } continue # 匹配"未启用或不存在" m = re.match(r".*任务 (\w+) 未启用或不存在", line) if m: results[m.group(1)] = { "status": "SKIP", "layer": "DWS", "error": "未注册", } return results # ── 2. API 输入侧:统计 JSON 落地行数 ──────────────────────────────── def count_api_json_records(task_name: str) -> int | None: """统计某个 ODS 任务最新一次 JSON 落地的总记录数""" task_dir = JSON_ROOT / task_name if not task_dir.exists(): return None # 找最新的子目录(按名称排序,格式 TASK-SITEID-DATE-TIME) subdirs = sorted(task_dir.iterdir(), key=lambda p: p.name, reverse=True) # 只取今天的 today_str = NOW.strftime("%Y%m%d") for sd in subdirs: if today_str in sd.name and sd.is_dir(): total = 0 for jf in sd.glob("*.json"): try: data = json.loads(jf.read_text(encoding="utf-8")) if isinstance(data, list): total += len(data) elif isinstance(data, dict): # 可能是 {"data": {"list": [...]}} 格式 lst = data.get("data", {}).get("list", data.get("data", [])) if isinstance(lst, list): total += len(lst) else: total += 1 except Exception: pass return total return None # ── 3. DB 输出侧:各层行数统计 ─────────────────────────────────────── ODS_TABLES = [ "assistant_accounts_master", "assistant_service_records", "settlement_records", "table_fee_transactions", "table_fee_discount_records", "site_tables_master", "payment_transactions", "refund_transactions", "platform_coupon_redemption_records", "member_profiles", "member_stored_value_cards", "member_balance_changes", "recharge_settlements", "group_buy_packages", "group_buy_redemption_records", "goods_stock_summary", "goods_stock_movements", "stock_goods_category_tree", "store_goods_master", "store_goods_sales_records", "tenant_goods_master", ] # ODS 任务名 → ODS 表名映射 ODS_TASK_TO_TABLE = { "ODS_ASSISTANT_ACCOUNT": "assistant_accounts_master", "ODS_ASSISTANT_LEDGER": "assistant_service_records", "ODS_SETTLEMENT_RECORDS": "settlement_records", "ODS_TABLE_USE": "table_fee_transactions", "ODS_TABLE_FEE_DISCOUNT": "table_fee_discount_records", "ODS_TABLES": "site_tables_master", "ODS_PAYMENT": "payment_transactions", "ODS_REFUND": "refund_transactions", "ODS_PLATFORM_COUPON": "platform_coupon_redemption_records", "ODS_MEMBER": "member_profiles", "ODS_MEMBER_CARD": "member_stored_value_cards", "ODS_MEMBER_BALANCE": "member_balance_changes", "ODS_RECHARGE_SETTLE": "recharge_settlements", "ODS_GROUP_PACKAGE": "group_buy_packages", "ODS_GROUP_BUY_REDEMPTION": "group_buy_redemption_records", "ODS_INVENTORY_STOCK": "goods_stock_summary", "ODS_INVENTORY_CHANGE": "goods_stock_movements", "ODS_GOODS_CATEGORY": "stock_goods_category_tree", "ODS_STORE_GOODS": "store_goods_master", "ODS_STORE_GOODS_SALES": "store_goods_sales_records", "ODS_TENANT_GOODS": "tenant_goods_master", } def query_row_counts(conn, schema: str, tables: list[str]) -> dict[str, int]: """批量查询各表行数""" result = {} with conn.cursor() as cur: for t in tables: try: cur.execute(f"SELECT COUNT(*) FROM {schema}.{t}") result[t] = cur.fetchone()[0] except Exception: conn.rollback() result[t] = -1 return result def query_dwd_tables(conn) -> list[str]: """获取 dwd schema 下所有表""" with conn.cursor() as cur: cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'dwd' AND table_type = 'BASE TABLE' ORDER BY table_name """) return [r[0] for r in cur.fetchall()] def query_dws_tables(conn) -> list[str]: """获取 dws schema 下所有表""" with conn.cursor() as cur: cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'dws' AND table_type = 'BASE TABLE' ORDER BY table_name """) return [r[0] for r in cur.fetchall()] # ── 4. ODS vs DWD 行数对比 ─────────────────────────────────────────── # DwdLoadTask.TABLE_MAP 的简化版(dwd_table → ods_table) DWD_TO_ODS = { # 维度表 dim_* "dwd.dim_assistant": "ods.assistant_accounts_master", "dwd.dim_member": "ods.member_profiles", "dwd.dim_member_card_account": "ods.member_stored_value_cards", "dwd.dim_table": "ods.site_tables_master", "dwd.dim_groupbuy_package": "ods.group_buy_packages", "dwd.dim_store_goods": "ods.store_goods_master", "dwd.dim_tenant_goods": "ods.tenant_goods_master", "dwd.dim_goods_category": "ods.stock_goods_category_tree", # 事实表 dwd_* "dwd.dwd_assistant_service_log": "ods.assistant_service_records", "dwd.dwd_member_balance_change": "ods.member_balance_changes", "dwd.dwd_recharge_order": "ods.recharge_settlements", "dwd.dwd_settlement_head": "ods.settlement_records", "dwd.dwd_table_fee_log": "ods.table_fee_transactions", "dwd.dwd_table_fee_adjust": "ods.table_fee_discount_records", "dwd.dwd_payment": "ods.payment_transactions", "dwd.dwd_refund": "ods.refund_transactions", "dwd.dwd_platform_coupon_redemption": "ods.platform_coupon_redemption_records", "dwd.dwd_groupbuy_redemption": "ods.group_buy_redemption_records", "dwd.dwd_store_goods_sale": "ods.store_goods_sales_records", } def compare_ods_dwd(conn) -> list[dict]: """对比 ODS 与 DWD 行数""" rows = [] with conn.cursor() as cur: for dwd_full, ods_full in sorted(DWD_TO_ODS.items()): dwd_s, dwd_t = dwd_full.split(".") ods_s, ods_t = ods_full.split(".") try: # ODS: 去重 id 计数(因为 content_hash 变化会产生多行) cur.execute(f"SELECT COUNT(DISTINCT id) FROM {ods_s}.{ods_t}") ods_distinct = cur.fetchone()[0] cur.execute(f"SELECT COUNT(*) FROM {ods_s}.{ods_t}") ods_total = cur.fetchone()[0] except Exception: conn.rollback() ods_distinct = -1 ods_total = -1 try: cur.execute(f"SELECT COUNT(*) FROM {dwd_s}.{dwd_t}") dwd_count = cur.fetchone()[0] except Exception: conn.rollback() dwd_count = -1 # dim 表用 SCD2,行数可能 > ODS distinct id # fact 表行数应 ≈ ODS distinct id is_dim = dwd_t.startswith("dim_") rows.append({ "dwd_table": dwd_full, "ods_table": ods_full, "ods_total_rows": ods_total, "ods_distinct_ids": ods_distinct, "dwd_rows": dwd_count, "type": "维度(SCD2)" if is_dim else "事实", "ratio": round(dwd_count / ods_distinct, 2) if ods_distinct > 0 else "N/A", }) return rows # ── 5. DWD 数据质量抽样 ────────────────────────────────────────────── def check_dwd_null_rates(conn, tables: list[str], sample_cols: int = 5) -> list[dict]: """检查 DWD 表关键列的 NULL 率""" results = [] with conn.cursor() as cur: for t in tables: try: cur.execute(f""" SELECT column_name FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = %s AND column_name NOT IN ('created_at','updated_at','fetched_at','content_hash','record_index','source_file','source_endpoint','payload') ORDER BY ordinal_position LIMIT %s """, (t, sample_cols)) cols = [r[0] for r in cur.fetchall()] if not cols: continue cur.execute(f"SELECT COUNT(*) FROM dwd.{t}") total = cur.fetchone()[0] if total == 0: results.append({"table": t, "total": 0, "null_cols": "空表"}) continue null_info = [] for c in cols: cur.execute(f"SELECT COUNT(*) FROM dwd.{t} WHERE {c} IS NULL") null_count = cur.fetchone()[0] rate = round(null_count / total * 100, 1) if rate > 0: null_info.append(f"{c}={rate}%") results.append({ "table": t, "total": total, "null_cols": ", ".join(null_info) if null_info else "无 NULL", }) except Exception as e: conn.rollback() results.append({"table": t, "total": -1, "null_cols": str(e)[:80]}) return results # ── 6. DWS 汇总合理性检查 ──────────────────────────────────────────── def check_dws_sanity(conn) -> list[dict]: """DWS 表基本合理性检查""" checks = [] with conn.cursor() as cur: # 检查各 DWS 表行数和最新数据日期 dws_tables = query_dws_tables(conn) for t in dws_tables: try: cur.execute(f"SELECT COUNT(*) FROM dws.{t}") cnt = cur.fetchone()[0] # 尝试找日期列 date_col = None for candidate in ["stat_date", "salary_month", "report_date", "calc_date", "snapshot_date", "stock_date"]: cur.execute(""" SELECT 1 FROM information_schema.columns WHERE table_schema='dws' AND table_name=%s AND column_name=%s """, (t, candidate)) if cur.fetchone(): date_col = candidate break latest = None if date_col and cnt > 0: cur.execute(f"SELECT MAX({date_col}) FROM dws.{t}") latest = cur.fetchone()[0] checks.append({ "table": f"dws.{t}", "rows": cnt, "latest_date": str(latest) if latest else "N/A", "status": "✅" if cnt > 0 else "⚠️ 空表", }) except Exception as e: conn.rollback() checks.append({ "table": f"dws.{t}", "rows": -1, "latest_date": "ERROR", "status": f"❌ {str(e)[:60]}", }) return checks # ── 7. API JSON 输入侧 vs ODS 行数对比 ─────────────────────────────── def compare_api_vs_ods(conn, log_results: dict) -> list[dict]: """对比 API JSON 落地记录数 vs ODS 表行数(仅本次 ETL 涉及的任务)""" rows = [] with conn.cursor() as cur: for task_name, ods_table in sorted(ODS_TASK_TO_TABLE.items()): log_info = log_results.get(task_name, {}) api_fetched = log_info.get("counts", {}).get("fetched", None) api_json_count = count_api_json_records(task_name) try: cur.execute(f"SELECT COUNT(*) FROM ods.{ods_table}") ods_total = cur.fetchone()[0] cur.execute(f"SELECT COUNT(DISTINCT id) FROM ods.{ods_table}") ods_distinct = cur.fetchone()[0] except Exception: conn.rollback() ods_total = -1 ods_distinct = -1 status = log_info.get("status", "N/A") rows.append({ "task": task_name, "ods_table": ods_table, "api_fetched": api_fetched if api_fetched is not None else "N/A", "json_records": api_json_count if api_json_count is not None else "N/A", "ods_total": ods_total, "ods_distinct": ods_distinct, "etl_status": status, }) return rows # ── 8. 生成 Markdown 报告 ──────────────────────────────────────────── def generate_report( log_path: Path | None, log_results: dict, api_vs_ods: list[dict], ods_dwd_compare: list[dict], dwd_quality: list[dict], dws_sanity: list[dict], dws_row_counts: dict[str, int], ) -> str: lines = [] lines.append(f"# 黑盒集成测试报告") lines.append(f"") lines.append(f"生成时间: {NOW.strftime('%Y-%m-%d %H:%M:%S')}") lines.append(f"ETL 日志: `{log_path.name if log_path else 'N/A'}`") lines.append(f"测试窗口: 2025-11-01 ~ 2026-02-20 (full_window 模式)") lines.append(f"") # ── 总览 ── succ = sum(1 for v in log_results.values() if v.get("status") == "SUCC") fail = sum(1 for v in log_results.values() if v.get("status") == "FAIL") skip = sum(1 for v in log_results.values() if v.get("status") == "SKIP") lines.append(f"## 1. ETL 执行总览") lines.append(f"") lines.append(f"| 指标 | 值 |") lines.append(f"|------|-----|") lines.append(f"| 成功任务 | {succ} |") lines.append(f"| 失败任务 | {fail} |") lines.append(f"| 跳过任务 | {skip} |") lines.append(f"| 总计 | {len(log_results)} |") lines.append(f"") # 失败详情 if fail > 0: lines.append(f"### 失败任务详情") lines.append(f"") for k, v in log_results.items(): if v.get("status") == "FAIL": lines.append(f"- **{k}**: {v.get('error', '未知错误')}") lines.append(f"") # 跳过详情 if skip > 0: lines.append(f"### 跳过任务(未注册)") lines.append(f"") for k, v in log_results.items(): if v.get("status") == "SKIP": lines.append(f"- {k}") lines.append(f"") # ── API 输入侧 vs ODS 输出侧 ── lines.append(f"## 2. 输入侧(API)vs 输出侧(ODS)对比") lines.append(f"") lines.append(f"| 任务 | ODS 表 | API 抓取数 | ODS 总行数 | ODS 去重ID | ETL 状态 |") lines.append(f"|------|--------|-----------|-----------|-----------|---------|") for r in api_vs_ods: lines.append( f"| {r['task']} | {r['ods_table']} | {r['api_fetched']} " f"| {r['ods_total']} | {r['ods_distinct']} | {r['etl_status']} |" ) lines.append(f"") lines.append(f"> 说明: ODS 采用快照模式 (id, content_hash) 为 PK,content_hash 变化产生新行,") lines.append(f"> 因此 ODS 总行数 ≥ ODS 去重 ID 数。API 抓取数 = 本次 ETL 从 API 获取的记录数。") lines.append(f"") # ── ODS vs DWD ── lines.append(f"## 3. ODS → DWD 行数对比") lines.append(f"") lines.append(f"| DWD 表 | ODS 表 | 类型 | ODS 总行 | ODS 去重ID | DWD 行数 | 比率 |") lines.append(f"|--------|--------|------|---------|-----------|---------|------|") for r in ods_dwd_compare: lines.append( f"| {r['dwd_table']} | {r['ods_table']} | {r['type']} " f"| {r['ods_total_rows']} | {r['ods_distinct_ids']} " f"| {r['dwd_rows']} | {r['ratio']} |" ) lines.append(f"") lines.append(f"> 说明: 维度表(SCD2)的 DWD 行数可能 > ODS 去重 ID(历史版本保留)。") lines.append(f"> 事实表的 DWD 行数应 ≈ ODS 去重 ID 数。比率 = DWD行数 / ODS去重ID。") lines.append(f"") # ── DWD 数据质量 ── lines.append(f"## 4. DWD 数据质量(NULL 率抽样)") lines.append(f"") lines.append(f"| DWD 表 | 总行数 | NULL 列情况 |") lines.append(f"|--------|--------|------------|") for r in dwd_quality: lines.append(f"| dwd.{r['table']} | {r['total']} | {r['null_cols']} |") lines.append(f"") # ── DWS 汇总 ── lines.append(f"## 5. DWS 汇总层检查") lines.append(f"") lines.append(f"| DWS 表 | 行数 | 最新日期 | 状态 |") lines.append(f"|--------|------|---------|------|") for r in dws_sanity: lines.append(f"| {r['table']} | {r['rows']} | {r['latest_date']} | {r['status']} |") lines.append(f"") # ── 结论 ── lines.append(f"## 6. 结论") lines.append(f"") total_ods_ok = sum(1 for r in api_vs_ods if r["etl_status"] == "SUCC") total_dwd_ok = sum(1 for r in ods_dwd_compare if r["dwd_rows"] > 0) total_dws_ok = sum(1 for r in dws_sanity if r["rows"] > 0) total_dws_all = len(dws_sanity) lines.append(f"- ODS 层: {total_ods_ok}/{len(api_vs_ods)} 个任务成功入库") lines.append(f"- DWD 层: {total_dwd_ok}/{len(ods_dwd_compare)} 个表有数据") lines.append(f"- DWS 层: {total_dws_ok}/{total_dws_all} 个表有数据") lines.append(f"- 失败任务: {fail} 个(详见第 1 节)") lines.append(f"- 跳过任务: {skip} 个(未注册的 DWS 任务)") lines.append(f"") return "\n".join(lines) # ── main ────────────────────────────────────────────────────────────── def main(): print("=== 黑盒集成测试报告生成 ===") # 1. 解析日志 log_path = find_latest_log() if log_path: print(f"解析日志: {log_path.name}") log_results = parse_etl_log(log_path) print(f" 解析到 {len(log_results)} 个任务结果") else: print("未找到 ETL 日志") log_results = {} # 2. 连接数据库 print("连接数据库...") conn = psycopg2.connect(DSN) conn.autocommit = True try: # 3. API vs ODS print("对比 API 输入侧 vs ODS...") api_vs_ods = compare_api_vs_ods(conn, log_results) # 4. ODS vs DWD print("对比 ODS vs DWD...") ods_dwd_compare = compare_ods_dwd(conn) # 5. DWD 质量 print("检查 DWD 数据质量...") dwd_tables = query_dwd_tables(conn) dwd_quality = check_dwd_null_rates(conn, dwd_tables) # 6. DWS 合理性 print("检查 DWS 汇总层...") dws_sanity = check_dws_sanity(conn) dws_tables = query_dws_tables(conn) dws_row_counts = query_row_counts(conn, "dws", dws_tables) # 7. 生成报告 print("生成报告...") report_md = generate_report( log_path, log_results, api_vs_ods, ods_dwd_compare, dwd_quality, dws_sanity, dws_row_counts, ) out_path = REPORT_ROOT / f"blackbox_report_{TS}.md" out_path.write_text(report_md, encoding="utf-8") print(f"报告已生成: {out_path}") finally: conn.close() if __name__ == "__main__": main()