Files

791 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""DWD 层调试脚本。
执行 DWD_LOAD_FROM_ODS 任务,验证 TABLE_MAP 中每对 DWD→ODS 映射的处理结果,
检查维度表 SCD2 版本链完整性、事实表时间窗口增量写入正确性、FACT_MAPPINGS 列映射。
用法:
cd apps/etl/connectors/feiqiu
python -m scripts.debug.debug_dwd [--hours 2] [--tables dwd.dim_member,dwd.dwd_payment]
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import time
import traceback
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
# ── 确保项目根目录在 sys.path ──
_FEIQIU_ROOT = Path(__file__).resolve().parents[2]
if str(_FEIQIU_ROOT) not in sys.path:
sys.path.insert(0, str(_FEIQIU_ROOT))
from config.settings import AppConfig
from database.connection import DatabaseConnection
from database.operations import DatabaseOperations
from api.client import APIClient
from orchestration.task_registry import default_registry
from orchestration.cursor_manager import CursorManager
from orchestration.run_tracker import RunTracker
from orchestration.task_executor import TaskExecutor
from tasks.dwd.dwd_load_task import DwdLoadTask
# 时间列候选列表(原 DwdLoadTask.FACT_ORDER_CANDIDATES重构后内联
_TIME_COLUMN_CANDIDATES = [
"pay_time", "create_time", "update_time",
"occur_time", "settle_time", "start_use_time", "fetched_at",
]
@dataclass
class DebugResult:
"""单个 DWD 表的调试结果"""
layer: str = "DWD"
task_code: str = "DWD_LOAD_FROM_ODS"
table_name: str = ""
ods_source: str = ""
mode: str = "" # SCD2 / INCREMENT / TYPE1_UPSERT
status: str = "" # PASS / FAIL / WARN / ERROR
message: str = ""
counts: dict = field(default_factory=dict)
dwd_row_count: int | None = None
ods_row_count: int | None = None
scd2_check: dict | None = None
fact_window_check: dict | None = None
mapping_check: dict | None = None
duration_sec: float = 0.0
error_detail: str | None = None
fix_applied: str | None = None
# ── 工具函数 ──────────────────────────────────────────────────
def _setup_logging() -> logging.Logger:
logger = logging.getLogger("debug_dwd")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S"
))
logger.addHandler(handler)
return logger
def _build_components(config: AppConfig, logger: logging.Logger):
"""构建 DB / API / TaskExecutor 等组件。"""
db_conn = DatabaseConnection(
dsn=config["db"]["dsn"],
session=config["db"].get("session"),
connect_timeout=config["db"].get("connect_timeout_sec"),
)
api_client = APIClient(
base_url=config["api"]["base_url"],
token=config["api"]["token"],
timeout=config["api"].get("timeout_sec", 20),
retry_max=config["api"].get("retries", {}).get("max_attempts", 3),
headers_extra=config["api"].get("headers_extra"),
)
db_ops = DatabaseOperations(db_conn)
cursor_mgr = CursorManager(db_conn)
run_tracker = RunTracker(db_conn)
executor = TaskExecutor(
config, db_ops, api_client,
cursor_mgr, run_tracker, default_registry, logger,
)
return db_conn, api_client, db_ops, executor
def _query_count(db_conn: DatabaseConnection, table: str) -> int:
"""查询表的总行数。"""
rows = db_conn.query(f"SELECT COUNT(*) AS cnt FROM {table}")
return int(rows[0]["cnt"]) if rows else 0
def _query_count_windowed(db_conn: DatabaseConnection, table: str,
col: str, start: datetime, end: datetime) -> int:
"""查询表在指定时间窗口内的行数。"""
sql = f'SELECT COUNT(*) AS cnt FROM {table} WHERE "{col}" >= %s AND "{col}" < %s'
rows = db_conn.query(sql, (start, end))
return int(rows[0]["cnt"]) if rows else 0
def _has_column(db_conn: DatabaseConnection, table: str, column: str) -> bool:
"""检查表是否包含指定列。"""
sql = """
SELECT 1 FROM information_schema.columns
WHERE table_schema || '.' || table_name = %s
AND column_name = %s
LIMIT 1
"""
rows = db_conn.query(sql, (table, column))
return bool(rows)
def _is_dim_table(table_name: str) -> bool:
"""判断是否为维度表dim_ 前缀)。"""
base = table_name.split(".")[-1] if "." in table_name else table_name
return base.startswith("dim_")
# ── SCD2 版本链完整性检查 ─────────────────────────────────────
def _check_scd2_integrity(db_conn: DatabaseConnection, dwd_table: str,
logger: logging.Logger) -> dict:
"""检查维度表 SCD2 版本链完整性。
验证项:
- 每个业务主键至多一条 scd2_is_current=1 的记录
- scd2_version 连续递增(无跳号)
- scd2_end_time 与下一版本的 scd2_start_time 一致
"""
result = {"has_scd2": False, "checks": []}
# 先确认表是否有 SCD2 列
if not _has_column(db_conn, dwd_table, "scd2_is_current"):
result["checks"].append("无 SCD2 列,跳过检查")
return result
result["has_scd2"] = True
# 获取业务主键(排除 SCD2 列)
pk_sql = """
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum)
"""
pk_rows = db_conn.query(pk_sql, (dwd_table,))
scd_cols = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
business_keys = [r["attname"] for r in pk_rows if r["attname"] not in scd_cols]
if not business_keys:
result["checks"].append("未找到业务主键")
return result
bk_sql = ", ".join(f'"{k}"' for k in business_keys)
# 检查1每个业务主键至多一条 current 记录
dup_current_sql = f"""
SELECT {bk_sql}, COUNT(*) AS cnt
FROM {dwd_table}
WHERE COALESCE(scd2_is_current, 1) = 1
GROUP BY {bk_sql}
HAVING COUNT(*) > 1
LIMIT 10
"""
try:
dup_rows = db_conn.query(dup_current_sql)
dup_count = len(dup_rows) if dup_rows else 0
if dup_count > 0:
result["checks"].append(f"⚠ 发现 {dup_count} 个业务主键有多条 current 记录")
else:
result["checks"].append("✓ 每个业务主键至多一条 current 记录")
except Exception as exc:
result["checks"].append(f"✗ 检查 current 唯一性失败: {exc}")
# 检查2scd2_version 连续性(抽样检查前 100 个多版本主键)
version_gap_sql = f"""
WITH multi_ver AS (
SELECT {bk_sql}
FROM {dwd_table}
GROUP BY {bk_sql}
HAVING COUNT(*) > 1
LIMIT 100
),
versioned AS (
SELECT t.{business_keys[0]},
t.scd2_version,
LAG(t.scd2_version) OVER (
PARTITION BY {', '.join(f't."{k}"' for k in business_keys)}
ORDER BY t.scd2_version
) AS prev_version
FROM {dwd_table} t
INNER JOIN multi_ver m ON {' AND '.join(f't."{k}" = m."{k}"' for k in business_keys)}
)
SELECT COUNT(*) AS gap_count
FROM versioned
WHERE prev_version IS NOT NULL AND scd2_version - prev_version != 1
"""
try:
gap_rows = db_conn.query(version_gap_sql)
gap_count = int(gap_rows[0]["gap_count"]) if gap_rows else 0
if gap_count > 0:
result["checks"].append(f"⚠ 发现 {gap_count} 处版本号跳号")
else:
result["checks"].append("✓ 版本号连续递增")
except Exception as exc:
result["checks"].append(f"✗ 检查版本连续性失败: {exc}")
# 检查3总行数和 current 行数
try:
total = _query_count(db_conn, dwd_table)
current_sql = f"SELECT COUNT(*) AS cnt FROM {dwd_table} WHERE COALESCE(scd2_is_current, 1) = 1"
current_rows = db_conn.query(current_sql)
current_count = int(current_rows[0]["cnt"]) if current_rows else 0
result["total_rows"] = total
result["current_rows"] = current_count
result["historical_rows"] = total - current_count
result["checks"].append(f"✓ 总行数={total}, current={current_count}, 历史={total - current_count}")
except Exception as exc:
result["checks"].append(f"✗ 查询行数失败: {exc}")
return result
# ── 事实表时间窗口增量写入检查 ────────────────────────────────
def _check_fact_window(db_conn: DatabaseConnection, dwd_table: str, ods_table: str,
window_start: datetime, window_end: datetime,
logger: logging.Logger) -> dict:
"""检查事实表时间窗口增量写入正确性。
验证项:
- DWD 表在窗口内的行数 vs ODS 表在窗口内的行数
- 主键无重复
- fetched_at 范围合理
"""
result = {"checks": []}
# 确定时间列:优先用 _TIME_COLUMN_CANDIDATES 中存在的列
order_col = None
for candidate in _TIME_COLUMN_CANDIDATES:
if _has_column(db_conn, dwd_table, candidate):
order_col = candidate
break
if not order_col:
result["checks"].append("⚠ 未找到可用的时间列,跳过窗口检查")
return result
# DWD 窗口内行数
try:
dwd_count = _query_count_windowed(db_conn, dwd_table, order_col, window_start, window_end)
result["dwd_window_count"] = dwd_count
result["order_column"] = order_col
except Exception as exc:
result["checks"].append(f"✗ 查询 DWD 窗口行数失败: {exc}")
return result
# ODS 窗口内行数(用 fetched_at
try:
ods_count = _query_count_windowed(db_conn, ods_table, "fetched_at", window_start, window_end)
result["ods_window_count"] = ods_count
except Exception as exc:
result["checks"].append(f"✗ 查询 ODS 窗口行数失败: {exc}")
ods_count = None
if ods_count is not None:
# 事实表可能因去重/映射导致行数不完全一致,但差异不应过大
if ods_count > 0:
ratio = dwd_count / ods_count if ods_count > 0 else 0
result["ratio"] = round(ratio, 4)
if ratio < 0.5:
result["checks"].append(f"⚠ DWD/ODS 比率偏低: {ratio:.2%} (DWD={dwd_count}, ODS={ods_count})")
else:
result["checks"].append(f"✓ DWD/ODS 比率正常: {ratio:.2%} (DWD={dwd_count}, ODS={ods_count})")
else:
result["checks"].append(f" ODS 窗口内无数据 (DWD={dwd_count})")
# 主键重复检查
pk_sql = """
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum)
"""
try:
pk_rows = db_conn.query(pk_sql, (dwd_table,))
pk_cols = [r["attname"] for r in pk_rows]
if pk_cols:
pk_list = ", ".join(f'"{c}"' for c in pk_cols)
dup_sql = f"""
SELECT {pk_list}, COUNT(*) AS cnt
FROM {dwd_table}
GROUP BY {pk_list}
HAVING COUNT(*) > 1
LIMIT 5
"""
dup_rows = db_conn.query(dup_sql)
dup_count = len(dup_rows) if dup_rows else 0
if dup_count > 0:
result["checks"].append(f"⚠ 发现 {dup_count} 组主键重复")
else:
result["checks"].append("✓ 主键无重复")
except Exception as exc:
result["checks"].append(f"✗ 主键重复检查失败: {exc}")
return result
# ── FACT_MAPPINGS 列映射检查 ──────────────────────────────────
def _check_fact_mappings(db_conn: DatabaseConnection, dwd_table: str, ods_table: str,
logger: logging.Logger) -> dict:
"""验证 FACT_MAPPINGS 中的列映射和类型转换。
验证项:
- 映射中的 DWD 目标列确实存在于 DWD 表
- 简单列名映射的 ODS 源列确实存在于 ODS 表
- 类型转换标注合理cast_type 非空时目标列类型匹配)
"""
result = {"checks": [], "mapping_count": 0, "issues": []}
mappings = DwdLoadTask.FACT_MAPPINGS.get(dwd_table, [])
if not mappings:
result["checks"].append(" 无显式 FACT_MAPPINGS 条目")
return result
result["mapping_count"] = len(mappings)
# 获取 DWD 和 ODS 的列集合
dwd_cols_sql = """
SELECT column_name FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
"""
ods_cols_sql = dwd_cols_sql
dwd_schema, dwd_name = dwd_table.split(".", 1)
ods_schema, ods_name = ods_table.split(".", 1)
try:
dwd_col_rows = db_conn.query(dwd_cols_sql, (dwd_schema, dwd_name))
dwd_cols = {r["column_name"].lower() for r in dwd_col_rows}
except Exception as exc:
result["checks"].append(f"✗ 获取 DWD 列信息失败: {exc}")
return result
try:
ods_col_rows = db_conn.query(ods_cols_sql, (ods_schema, ods_name))
ods_cols = {r["column_name"].lower() for r in ods_col_rows}
except Exception as exc:
result["checks"].append(f"✗ 获取 ODS 列信息失败: {exc}")
return result
missing_dwd = []
missing_ods = []
for dwd_col, ods_expr, cast_type in mappings:
# 检查 DWD 目标列
if dwd_col.lower() not in dwd_cols:
missing_dwd.append(dwd_col)
# 检查 ODS 源列(仅简单列名,跳过表达式如 JSON 提取、CASE 等)
is_simple_col = (
ods_expr.isidentifier()
or (ods_expr.startswith('"') and ods_expr.endswith('"'))
)
if is_simple_col:
col_name = ods_expr.strip('"').lower()
if col_name not in ods_cols:
missing_ods.append((dwd_col, ods_expr))
if missing_dwd:
result["issues"].extend([f"DWD 列不存在: {c}" for c in missing_dwd])
result["checks"].append(f"{len(missing_dwd)} 个 DWD 目标列不存在: {missing_dwd}")
else:
result["checks"].append(f"✓ 所有 {len(mappings)} 个 DWD 目标列均存在")
if missing_ods:
result["issues"].extend([f"ODS 列不存在: {dwd}{ods}" for dwd, ods in missing_ods])
result["checks"].append(f"{len(missing_ods)} 个 ODS 源列不存在: {missing_ods}")
else:
simple_count = sum(
1 for _, expr, _ in mappings
if expr.isidentifier() or (expr.startswith('"') and expr.endswith('"'))
)
result["checks"].append(f"✓ 所有 {simple_count} 个简单列名映射的 ODS 源列均存在")
return result
# ── 单表调试 ──────────────────────────────────────────────────
def _debug_single_table(
dwd_table: str,
ods_table: str,
db_conn: DatabaseConnection,
window_start: datetime,
window_end: datetime,
logger: logging.Logger,
) -> DebugResult:
"""对单张 DWD 表执行调试检查(不执行装载,仅验证现有数据)。"""
result = DebugResult(table_name=dwd_table, ods_source=ods_table)
is_dim = _is_dim_table(dwd_table)
result.mode = "SCD2" if is_dim else "INCREMENT"
logger.info("" * 60)
logger.info("▶ 检查: %s%s (%s)", dwd_table, ods_table, result.mode)
t0 = time.monotonic()
issues = []
# 1) 基本行数
try:
dwd_total = _query_count(db_conn, dwd_table)
ods_total = _query_count(db_conn, ods_table)
result.dwd_row_count = dwd_total
result.ods_row_count = ods_total
logger.info(" 行数: DWD=%d, ODS=%d", dwd_total, ods_total)
except Exception as exc:
result.status = "ERROR"
result.message = f"查询行数失败: {exc}"
result.error_detail = traceback.format_exc()
result.duration_sec = round(time.monotonic() - t0, 2)
logger.error("%s", result.message)
return result
# 2) FACT_MAPPINGS 列映射检查
try:
mapping_check = _check_fact_mappings(db_conn, dwd_table, ods_table, logger)
result.mapping_check = mapping_check
for check in mapping_check.get("checks", []):
logger.info(" 映射: %s", check)
if mapping_check.get("issues"):
issues.extend(mapping_check["issues"])
except Exception as exc:
logger.warning(" ⚠ 列映射检查异常: %s", exc)
# 3) 维度表 SCD2 检查 / 事实表窗口检查
if is_dim:
try:
scd2_check = _check_scd2_integrity(db_conn, dwd_table, logger)
result.scd2_check = scd2_check
for check in scd2_check.get("checks", []):
logger.info(" SCD2: %s", check)
# 含 ⚠ 的检查项视为 issue
issues.extend(c for c in scd2_check.get("checks", []) if "" in c)
except Exception as exc:
logger.warning(" ⚠ SCD2 检查异常: %s", exc)
else:
try:
fact_check = _check_fact_window(
db_conn, dwd_table, ods_table, window_start, window_end, logger,
)
result.fact_window_check = fact_check
for check in fact_check.get("checks", []):
logger.info(" 窗口: %s", check)
issues.extend(c for c in fact_check.get("checks", []) if "" in c)
except Exception as exc:
logger.warning(" ⚠ 窗口检查异常: %s", exc)
# 4) 最终状态
result.duration_sec = round(time.monotonic() - t0, 2)
if issues:
result.status = "WARN"
result.message = f"{len(issues)} 个问题: {issues[0]}"
elif dwd_total == 0:
result.status = "WARN"
result.message = "DWD 表为空"
else:
result.status = "PASS"
result.message = f"检查通过 (DWD={dwd_total}行)"
icon = {"PASS": "", "WARN": "", "ERROR": "", "FAIL": ""}.get(result.status, "?")
logger.info(" %s 结果: %s - %s (%.1fs)", icon, result.status, result.message, result.duration_sec)
return result
# ── 执行 DWD_LOAD_FROM_ODS 任务 ──────────────────────────────
def _execute_dwd_load(
executor: TaskExecutor,
config: AppConfig,
logger: logging.Logger,
) -> dict:
"""执行 DWD_LOAD_FROM_ODS 任务并返回结果。"""
store_id = int(config.get("app.store_id"))
run_uuid = f"debug-dwd-load-{int(time.time())}"
logger.info("" * 60)
logger.info("▶ 执行 DWD_LOAD_FROM_ODS 任务")
t0 = time.monotonic()
try:
task_result = executor.run_single_task(
task_code="DWD_LOAD_FROM_ODS",
run_uuid=run_uuid,
store_id=store_id,
data_source="online",
)
elapsed = round(time.monotonic() - t0, 2)
logger.info(" 执行完成,耗时 %.1fs", elapsed)
# 解析结果
tables = task_result.get("tables", [])
errors = task_result.get("errors", [])
logger.info(" 处理表数: %d, 错误表数: %d", len(tables), len(errors))
for t in tables:
tbl = t.get("table", "")
mode = t.get("mode", "")
ins = t.get("inserted", 0)
upd = t.get("updated", 0)
proc = t.get("processed", 0)
logger.info(" %s [%s]: processed=%d, inserted=%d, updated=%d", tbl, mode, proc, ins, upd)
for e in errors:
logger.error("%s: %s", e.get("table", ""), e.get("error", ""))
return {
"status": "SUCCESS" if not errors else "PARTIAL",
"tables": tables,
"errors": errors,
"duration_sec": elapsed,
}
except Exception as exc:
elapsed = round(time.monotonic() - t0, 2)
logger.error(" ✗ 执行异常: %s", exc)
return {
"status": "ERROR",
"tables": [],
"errors": [{"table": "DWD_LOAD_FROM_ODS", "error": str(exc)}],
"duration_sec": elapsed,
"traceback": traceback.format_exc(),
}
# ── 主流程 ────────────────────────────────────────────────────
def run_dwd_debug(
hours: float = 2.0,
table_filter: list[str] | None = None,
skip_load: bool = False,
) -> list[DebugResult]:
"""执行 DWD 层全量调试。
Args:
hours: 回溯窗口小时数(默认 2 小时)
table_filter: 仅调试指定的 DWD 表名列表None 表示全部
skip_load: 跳过 DWD_LOAD_FROM_ODS 执行,仅做数据检查
Returns:
所有表的 DebugResult 列表
"""
logger = _setup_logging()
logger.info("=" * 60)
logger.info("DWD 层调试开始")
logger.info("=" * 60)
# 加载配置
config = AppConfig.load()
tz = ZoneInfo(config.get("app.timezone", "Asia/Shanghai"))
window_end = datetime.now(tz)
window_start = window_end - timedelta(hours=hours)
logger.info("门店 ID: %s", config.get("app.store_id"))
logger.info("数据库: %s", config.get("db.name", ""))
logger.info("时间窗口: %s ~ %s (%.1f 小时)", window_start, window_end, hours)
# 设置 window_override
config.config.setdefault("run", {}).setdefault("window_override", {})
config.config["run"]["window_override"]["start"] = window_start
config.config["run"]["window_override"]["end"] = window_end
# 构建组件
db_conn, api_client, db_ops, executor = _build_components(config, logger)
# 步骤1执行 DWD_LOAD_FROM_ODS可选
load_result = None
if not skip_load:
load_result = _execute_dwd_load(executor, config, logger)
logger.info("")
# 步骤2逐表检查 TABLE_MAP 中的映射
table_map = DwdLoadTask.TABLE_MAP
if table_filter:
filter_set = {t.lower() for t in table_filter}
filtered_map = {
k: v for k, v in table_map.items()
if k.lower() in filter_set or k.split(".")[-1].lower() in filter_set
}
skipped = filter_set - {k.lower() for k in filtered_map}
if skipped:
logger.warning("以下表不在 TABLE_MAP 中,已跳过: %s", skipped)
table_map = filtered_map
logger.info("")
logger.info("=" * 60)
logger.info("逐表数据检查 (%d 张表)", len(table_map))
logger.info("=" * 60)
results: list[DebugResult] = []
for idx, (dwd_table, ods_table) in enumerate(table_map.items(), start=1):
logger.info("[%d/%d] %s", idx, len(table_map), dwd_table)
try:
r = _debug_single_table(
dwd_table=dwd_table,
ods_table=ods_table,
db_conn=db_conn,
window_start=window_start,
window_end=window_end,
logger=logger,
)
# 补充装载结果中的 counts
if load_result and load_result.get("tables"):
for t in load_result["tables"]:
if t.get("table") == dwd_table:
r.counts = {
k: v for k, v in t.items() if k != "table"
}
break
# 补充装载错误
if load_result and load_result.get("errors"):
for e in load_result["errors"]:
if e.get("table") == dwd_table:
r.status = "ERROR"
r.message = f"装载失败: {e.get('error', '')}"
r.error_detail = e.get("error", "")
break
except Exception as exc:
r = DebugResult(
table_name=dwd_table,
ods_source=ods_table,
status="ERROR",
message=f"未捕获异常: {exc}",
error_detail=traceback.format_exc(),
)
logger.error(" ✗ 未捕获异常: %s", exc)
results.append(r)
db_conn.ensure_open()
# 汇总
_print_summary(results, load_result, logger)
# 输出 JSON
output_dir = _FEIQIU_ROOT / "scripts" / "debug" / "output"
output_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now(tz).strftime("%Y%m%d_%H%M%S")
output_file = output_dir / f"debug_dwd_{ts}.json"
_save_results(results, load_result, output_file)
logger.info("结果已保存: %s", output_file)
db_conn.close()
return results
# ── 汇总与输出 ────────────────────────────────────────────────
def _print_summary(results: list[DebugResult], load_result: dict | None,
logger: logging.Logger):
"""打印调试汇总。"""
logger.info("")
logger.info("=" * 60)
logger.info("DWD 层调试汇总")
logger.info("=" * 60)
# 装载结果
if load_result:
logger.info("DWD_LOAD_FROM_ODS 执行: %s (耗时 %.1fs)",
load_result.get("status", "N/A"),
load_result.get("duration_sec", 0))
tables = load_result.get("tables", [])
errors = load_result.get("errors", [])
total_inserted = sum(t.get("inserted", 0) for t in tables)
total_updated = sum(t.get("updated", 0) for t in tables)
logger.info(" 处理表数: %d, 错误表数: %d", len(tables), len(errors))
logger.info(" 总计: inserted=%d, updated=%d", total_inserted, total_updated)
# 逐表检查结果
pass_count = sum(1 for r in results if r.status == "PASS")
warn_count = sum(1 for r in results if r.status == "WARN")
error_count = sum(1 for r in results if r.status in ("ERROR", "FAIL"))
total_duration = sum(r.duration_sec for r in results)
logger.info("")
logger.info("逐表检查: %d 张表", len(results))
logger.info(" ✓ PASS: %d", pass_count)
logger.info(" ⚠ WARN: %d", warn_count)
logger.info(" ✗ ERROR: %d", error_count)
logger.info(" 总耗时: %.1f", total_duration)
# 维度表 vs 事实表统计
dim_results = [r for r in results if r.mode == "SCD2"]
fact_results = [r for r in results if r.mode == "INCREMENT"]
logger.info("")
logger.info("维度表: %d 张 (PASS=%d, WARN=%d, ERROR=%d)",
len(dim_results),
sum(1 for r in dim_results if r.status == "PASS"),
sum(1 for r in dim_results if r.status == "WARN"),
sum(1 for r in dim_results if r.status in ("ERROR", "FAIL")))
logger.info("事实表: %d 张 (PASS=%d, WARN=%d, ERROR=%d)",
len(fact_results),
sum(1 for r in fact_results if r.status == "PASS"),
sum(1 for r in fact_results if r.status == "WARN"),
sum(1 for r in fact_results if r.status in ("ERROR", "FAIL")))
# 列出非 PASS 的表
non_pass = [r for r in results if r.status != "PASS"]
if non_pass:
logger.info("")
logger.info("需关注的表:")
for r in non_pass:
logger.info(" [%s] %s: %s", r.status, r.table_name, r.message)
else:
logger.info("")
logger.info("所有表均通过 ✓")
def _save_results(results: list[DebugResult], load_result: dict | None, path: Path):
"""将结果序列化为 JSON。"""
data = {
"load_result": _sanitize_for_json(load_result) if load_result else None,
"table_checks": [_sanitize_for_json(asdict(r)) for r in results],
}
path.write_text(
json.dumps(data, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
def _sanitize_for_json(obj):
"""递归处理不可序列化的值。"""
if isinstance(obj, dict):
return {k: _sanitize_for_json(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_sanitize_for_json(v) for v in obj]
if isinstance(obj, datetime):
return obj.isoformat()
return obj
# ── CLI 入口 ──────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser(description="DWD 层调试脚本")
parser.add_argument("--hours", type=float, default=2.0,
help="回溯窗口小时数(默认 2")
parser.add_argument("--tables", type=str, default=None,
help="仅调试指定 DWD 表,逗号分隔(如 dwd.dim_member,dwd.dwd_payment")
parser.add_argument("--skip-load", action="store_true",
help="跳过 DWD_LOAD_FROM_ODS 执行,仅做数据检查")
return parser.parse_args()
def main():
args = parse_args()
table_filter = None
if args.tables:
table_filter = [t.strip() for t in args.tables.split(",") if t.strip()]
results = run_dwd_debug(
hours=args.hours,
table_filter=table_filter,
skip_load=args.skip_load,
)
has_error = any(r.status in ("ERROR", "FAIL") for r in results)
sys.exit(1 if has_error else 0)
if __name__ == "__main__":
main()