Files
Neo-ZQYY/apps/etl/connectors/feiqiu/quality/consistency_checker.py

759 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据一致性检查器(黑盒测试)
以黑盒测试者角度,检查 ETL 数据流各层之间的一致性:
1. API 源数据 vs ODS 落库数据 — 字段完整性对比
2. ODS 数据 vs DWD 落库数据 — 映射正确性对比
输出 Markdown 格式的黑盒测试报告。
Requirements: 16.1, 16.2, 16.3, 16.4
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Sequence, Tuple
from zoneinfo import ZoneInfo
from tasks.dwd.dwd_load_task import DwdLoadTask
# ---------------------------------------------------------------------------
# 数据结构
# ---------------------------------------------------------------------------
@dataclass
class FieldCheckResult:
"""单个字段的检查结果"""
field_name: str
status: str # "pass" | "missing" | "mismatch" | "type_diff" | "skip"
detail: str = ""
@dataclass
class TableCheckResult:
"""单张表的检查结果"""
table_name: str
check_type: str # "api_vs_ods" | "ods_vs_dwd"
passed: bool = True
total_fields: int = 0
passed_fields: int = 0
missing_fields: int = 0
mismatch_fields: int = 0
field_results: List[FieldCheckResult] = field(default_factory=list)
row_count_source: int = 0
row_count_target: int = 0
sample_mismatches: List[Dict[str, Any]] = field(default_factory=list)
error: str | None = None
@dataclass
class ConsistencyReport:
"""完整的一致性检查报告"""
generated_at: str = ""
api_vs_ods_results: List[TableCheckResult] = field(default_factory=list)
ods_vs_dwd_results: List[TableCheckResult] = field(default_factory=list)
@property
def all_passed(self) -> bool:
return all(r.passed for r in self.api_vs_ods_results + self.ods_vs_dwd_results)
# ---------------------------------------------------------------------------
# ODS 表名 → API JSON 文件名映射(与 json_store.ENDPOINT_FILENAME_MAP 对齐)
# ---------------------------------------------------------------------------
ODS_TABLE_TO_JSON_FILE: Dict[str, str] = {
"assistant_accounts_master": "assistant_accounts_master.json",
"assistant_service_records": "assistant_service_records.json",
"assistant_cancellation_records": "assistant_cancellation_records.json",
"member_profiles": "member_profiles.json",
"member_stored_value_cards": "member_stored_value_cards.json",
"member_balance_changes": "member_balance_changes.json",
"recharge_settlements": "recharge_settlements.json",
"settlement_records": "settlement_records.json",
"table_fee_transactions": "table_fee_transactions.json",
"table_fee_discount_records": "table_fee_discount_records.json",
"store_goods_sales_records": "store_goods_sales_records.json",
"store_goods_master": "store_goods_master.json",
"tenant_goods_master": "tenant_goods_master.json",
"site_tables_master": "site_tables_master.json",
"group_buy_packages": "group_buy_packages.json",
"group_buy_redemption_records": "group_buy_redemption_records.json",
"platform_coupon_redemption_records": "platform_coupon_redemption_records.json",
"payment_transactions": "payment_transactions.json",
"refund_transactions": "refund_transactions.json",
"goods_stock_summary": "goods_stock_summary.json",
"goods_stock_movements": "goods_stock_movements.json",
"stock_goods_category_tree": "stock_goods_category_tree.json",
}
# ODS 元数据列——不来自 API由 ETL 框架自动填充
ODS_META_COLUMNS = frozenset({
"payload", "source_file", "source_endpoint",
"fetched_at", "content_hash", "record_index",
"site_id", "tenant_id", "siteprofile", "site_profile",
})
# ---------------------------------------------------------------------------
# 核心检查逻辑(纯函数,不依赖数据库连接)
# ---------------------------------------------------------------------------
def extract_api_fields_from_json(json_path: Path) -> set[str] | None:
"""从 API JSON 缓存文件中提取第一条记录的所有字段名"""
if not json_path.exists():
return None
try:
with json_path.open("r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
return None
# 支持多种 JSON 结构:直接列表 / {"data": [...]} / {"data": {"xxxList": [...]}}
records = _extract_records(data)
if not records:
return None
# 合并前 N 条记录的字段(避免单条记录字段不全)
all_fields: set[str] = set()
for rec in records[:10]:
if isinstance(rec, dict):
all_fields.update(rec.keys())
return all_fields
def _extract_records(data: Any) -> list[dict]:
"""从 API 响应中提取记录列表"""
if isinstance(data, list):
return data
if isinstance(data, dict):
# 尝试 data 键
inner = data.get("data")
if isinstance(inner, list):
return inner
if isinstance(inner, dict):
# 尝试各种 list_key
for key, val in inner.items():
if isinstance(val, list) and val:
return val
return []
def check_api_vs_ods_fields(
api_fields: set[str],
ods_columns: set[str],
) -> TableCheckResult:
"""
对比 API JSON 字段与 ODS 表列的覆盖率。
检查逻辑API 中的每个字段是否在 ODS 表中有对应列(小写匹配)。
ODS 元数据列payload, fetched_at 等)不参与对比。
"""
result = TableCheckResult(table_name="", check_type="api_vs_ods")
# 标准化为小写
api_lower = {f.lower() for f in api_fields}
ods_lower = ods_columns - ODS_META_COLUMNS
result.total_fields = len(api_lower)
for api_field in sorted(api_lower):
if api_field in ods_lower:
result.field_results.append(
FieldCheckResult(api_field, "pass", "ODS 中存在同名列")
)
result.passed_fields += 1
else:
# 嵌套对象字段(含大写字母的驼峰命名)可能被展开或存入 payload
result.field_results.append(
FieldCheckResult(api_field, "missing", "ODS 中无对应列")
)
result.missing_fields += 1
result.passed = result.missing_fields == 0
return result
def check_ods_vs_dwd_mappings(
dwd_table: str,
ods_table: str,
dwd_columns: set[str],
ods_columns: set[str],
fact_mappings: list[tuple[str, str, str | None]] | None,
) -> TableCheckResult:
"""
对比 ODS 数据与 DWD 落库数据的映射正确性。
检查逻辑:
1. DWD 表的每个非 SCD2 列,是否在 FACT_MAPPINGS 中有显式映射或在 ODS 中有同名列(自动映射)
2. FACT_MAPPINGS 中引用的 ODS 列/表达式是否合法
"""
# CHANGE [2026-02-20] intent: 上游 API 不提供的字段,标记为已知无源而非报错
KNOWN_NO_SOURCE: Dict[str, set[str]] = {
"dwd.dim_member": {"update_time"},
}
result = TableCheckResult(
table_name=dwd_table,
check_type="ods_vs_dwd",
)
scd_cols = {c.lower() for c in DwdLoadTask.SCD_COLS}
ods_lower = {c.lower() for c in ods_columns}
# 构建显式映射字典dwd_col -> (ods_expr, cast_type)
explicit_map: Dict[str, Tuple[str, str | None]] = {}
if fact_mappings:
for dwd_col, ods_expr, cast_type in fact_mappings:
explicit_map[dwd_col.lower()] = (ods_expr, cast_type)
check_cols = sorted(c for c in dwd_columns if c.lower() not in scd_cols)
result.total_fields = len(check_cols)
for col in check_cols:
col_lower = col.lower()
if col_lower in explicit_map:
ods_expr, cast_type = explicit_map[col_lower]
# 验证 ODS 表达式引用的列是否存在
expr_valid = _validate_ods_expression(ods_expr, ods_lower)
if expr_valid:
result.field_results.append(
FieldCheckResult(col, "pass", f"显式映射: {ods_expr}")
)
result.passed_fields += 1
else:
result.field_results.append(
FieldCheckResult(
col, "mismatch",
f"显式映射引用的 ODS 列/表达式无法验证: {ods_expr}"
)
)
result.mismatch_fields += 1
elif col_lower in ods_lower:
# 自动映射ODS 和 DWD 同名
result.field_results.append(
FieldCheckResult(col, "pass", "自动映射(同名列)")
)
result.passed_fields += 1
elif col_lower == "fetched_at" and "fetched_at" in ods_lower:
result.field_results.append(
FieldCheckResult(col, "pass", "ETL 元数据列")
)
result.passed_fields += 1
else:
# CHANGE [2026-02-20] intent: 白名单跳过已知无源字段,避免误报
no_source = KNOWN_NO_SOURCE.get(dwd_table, set())
if col_lower in no_source:
result.field_results.append(
FieldCheckResult(col, "pass", "已知无源字段(上游 API 不提供)")
)
result.passed_fields += 1
else:
result.field_results.append(
FieldCheckResult(
col, "missing",
"DWD 列无 ODS 映射源(无显式映射且无同名 ODS 列)"
)
)
result.missing_fields += 1
result.passed = result.missing_fields == 0 and result.mismatch_fields == 0
return result
def _validate_ods_expression(expr: str, ods_columns: set[str]) -> bool:
"""
验证 FACT_MAPPINGS 中的 ODS 表达式是否合法。
简单列名:检查是否在 ODS 列集合中
JSON 表达式(含 ->>、#>>):检查基础列名
SQL 表达式(含 CASE、COALESCE 等):视为合法(无法静态验证)
NULL 字面量:合法
"""
if expr.upper() == "NULL":
return True
# 带引号的列名(如 "siteGoodsId"
stripped = expr.strip('"')
# JSON 路径表达式
if "->>" in expr or "#>>" in expr:
base_col = expr.split("->>")[0].split("#>>")[0].strip().strip('"').lower()
return base_col in ods_columns
# SQL 表达式CASE WHEN, COALESCE, 函数调用等)
sql_keywords = {"case", "when", "coalesce", "nullif", "cast", "concat"}
if any(kw in expr.lower() for kw in sql_keywords):
return True
# 简单列名
return stripped.lower() in ods_columns
# ---------------------------------------------------------------------------
# 数据库交互层
# ---------------------------------------------------------------------------
def _fetch_table_columns(cur, schema: str, table: str) -> set[str]:
"""从 information_schema 获取表的列名集合(小写)"""
cur.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""",
(schema, table),
)
return {row[0].lower() for row in cur.fetchall()}
def _fetch_row_count(cur, schema: str, table: str) -> int:
"""获取表的行数"""
cur.execute(f'SELECT COUNT(1) FROM "{schema}"."{table}"')
row = cur.fetchone()
return int(row[0]) if row else 0
def _split_table(name: str, default_schema: str) -> Tuple[str, str]:
if "." in name:
schema, table = name.split(".", 1)
return schema, table
return default_schema, name
def _sample_value_mismatches(
cur,
dwd_schema: str,
dwd_table: str,
ods_schema: str,
ods_table: str,
dwd_pk_cols: list[str],
ods_pk_cols: list[str],
explicit_map: Dict[str, Tuple[str, str | None]],
dwd_columns: set[str],
ods_columns: set[str],
limit: int = 5,
) -> list[dict]:
"""
采样对比 ODS 与 DWD 的实际数据值差异。
通过主键 JOIN 两表,对比映射列的值是否一致。
返回不一致的采样记录。
"""
scd_cols = {c.lower() for c in DwdLoadTask.SCD_COLS}
ods_meta = {"payload", "source_file", "source_endpoint",
"fetched_at", "content_hash", "record_index"}
ods_lower = {c.lower() for c in ods_columns}
# 确定 JOIN 键
# DWD 主键中第一个在 ODS 中也存在的列(或通过映射对应的列)
join_pairs: list[Tuple[str, str]] = [] # (dwd_col, ods_expr)
for pk in dwd_pk_cols:
pk_lower = pk.lower()
if pk_lower in scd_cols:
continue
if pk_lower in explicit_map:
ods_expr, cast_type = explicit_map[pk_lower]
# 简单列名才能 JOIN
stripped = ods_expr.strip('"')
if stripped.lower() in ods_lower and stripped.isidentifier():
join_pairs.append((pk_lower, ods_expr))
elif pk_lower in ods_lower:
join_pairs.append((pk_lower, f'"{pk_lower}"'))
elif "id" in ods_lower and pk_lower.endswith("_id"):
join_pairs.append((pk_lower, '"id"'))
if not join_pairs:
return []
# 确定对比列
compare_cols: list[Tuple[str, str]] = [] # (dwd_col, ods_expr)
for col in sorted(dwd_columns):
col_lower = col.lower()
if col_lower in scd_cols or col_lower in ods_meta:
continue
if col_lower in {jp[0] for jp in join_pairs}:
continue
if col_lower in explicit_map:
ods_expr, _ = explicit_map[col_lower]
# 跳过复杂 SQL 表达式
if any(kw in ods_expr.lower() for kw in ("case", "coalesce", "nullif")):
continue
compare_cols.append((col_lower, ods_expr))
elif col_lower in ods_lower:
compare_cols.append((col_lower, f'"{col_lower}"'))
if not compare_cols:
return []
# 构建 SQL
join_cond = " AND ".join(
f'd."{dwd_col}" = o.{ods_expr}' if not ods_expr.startswith('"') or ods_expr.strip('"').isidentifier()
else f'd."{dwd_col}" = o.{ods_expr}'
for dwd_col, ods_expr in join_pairs
)
# 构建 WHERE 条件:任一对比列不一致
diff_conditions = []
for dwd_col, ods_expr in compare_cols:
# 使用 IS DISTINCT FROM 处理 NULL
diff_conditions.append(
f'd."{dwd_col}"::text IS DISTINCT FROM o.{ods_expr}::text'
)
if not diff_conditions:
return []
where_diff = " OR ".join(diff_conditions)
# ODS 快照去重(取最新 fetched_at
ods_has_content_hash = "content_hash" in ods_lower
if ods_has_content_hash and ods_pk_cols:
ods_biz_pks = [c for c in ods_pk_cols if c.lower() != "content_hash"]
if ods_biz_pks:
distinct_on = ", ".join(f'"{c}"' for c in ods_biz_pks)
ods_subquery = (
f'(SELECT DISTINCT ON ({distinct_on}) * '
f'FROM "{ods_schema}"."{ods_table}" '
f'ORDER BY {distinct_on}, "fetched_at" DESC NULLS LAST) o'
)
else:
ods_subquery = f'"{ods_schema}"."{ods_table}" o'
else:
ods_subquery = f'"{ods_schema}"."{ods_table}" o'
# DWD SCD2 过滤
dwd_where = ""
if any(c.lower() == "scd2_is_current" for c in dwd_columns):
dwd_where = "WHERE COALESCE(scd2_is_current, 1) = 1"
select_parts = []
for dwd_col, _ in join_pairs:
select_parts.append(f'd."{dwd_col}"')
for dwd_col, ods_expr in compare_cols[:10]: # 限制对比列数
select_parts.append(f'd."{dwd_col}" AS "dwd_{dwd_col}"')
select_parts.append(f'o.{ods_expr}::text AS "ods_{dwd_col}"')
select_sql = ", ".join(select_parts)
sql = (
f"SELECT {select_sql} "
f'FROM (SELECT * FROM "{dwd_schema}"."{dwd_table}" {dwd_where}) d '
f"JOIN {ods_subquery} ON {join_cond} "
f"WHERE {where_diff} "
f"LIMIT %s"
)
try:
cur.execute(sql, (limit,))
rows = cur.fetchall()
if not rows:
return []
columns = [desc[0] for desc in (cur.description or [])]
return [dict(zip(columns, r)) for r in rows]
except Exception:
# 复杂表达式可能导致 SQL 错误,静默跳过
return []
def _fetch_pk_columns(cur, schema: str, table: str) -> list[str]:
"""获取表的主键列"""
cur.execute(
"""
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = %s
AND tc.table_name = %s
ORDER BY kcu.ordinal_position
""",
(schema, table),
)
return [r[0] for r in cur.fetchall()]
# ---------------------------------------------------------------------------
# 主入口:运行完整一致性检查
# ---------------------------------------------------------------------------
def run_consistency_check(
db_conn,
*,
api_sample_dir: Path | None = None,
include_api_vs_ods: bool = True,
include_ods_vs_dwd: bool = True,
sample_limit: int = 5,
tz: ZoneInfo | None = None,
) -> ConsistencyReport:
"""
执行完整的数据一致性检查。
参数:
db_conn: 数据库连接对象(需有 .conn 属性返回 psycopg2 connection
api_sample_dir: API JSON 缓存目录(用于 API vs ODS 检查)
include_api_vs_ods: 是否执行 API vs ODS 检查
include_ods_vs_dwd: 是否执行 ODS vs DWD 检查
sample_limit: 值不一致时的采样行数
tz: 时区
"""
if tz is None:
tz = ZoneInfo("Asia/Shanghai")
report = ConsistencyReport(
generated_at=datetime.now(tz).isoformat(),
)
with db_conn.conn.cursor() as cur:
# --- 1. API vs ODS 字段完整性检查 ---
if include_api_vs_ods and api_sample_dir:
for ods_table, json_file in sorted(ODS_TABLE_TO_JSON_FILE.items()):
json_path = api_sample_dir / json_file
api_fields = extract_api_fields_from_json(json_path)
if api_fields is None:
result = TableCheckResult(
table_name=f"ods.{ods_table}",
check_type="api_vs_ods",
passed=True, # 无 JSON 缓存时跳过,不算失败
error=f"API JSON 缓存不存在: {json_file}",
)
report.api_vs_ods_results.append(result)
continue
ods_columns = _fetch_table_columns(cur, "ods", ods_table)
if not ods_columns:
result = TableCheckResult(
table_name=f"ods.{ods_table}",
check_type="api_vs_ods",
passed=False,
error="ODS 表不存在或无列",
)
report.api_vs_ods_results.append(result)
continue
result = check_api_vs_ods_fields(api_fields, ods_columns)
result.table_name = f"ods.{ods_table}"
report.api_vs_ods_results.append(result)
# --- 2. ODS vs DWD 映射正确性检查 ---
if include_ods_vs_dwd:
table_map = DwdLoadTask.TABLE_MAP
fact_mappings = DwdLoadTask.FACT_MAPPINGS
for dwd_full, ods_full in sorted(table_map.items()):
dwd_schema, dwd_table = _split_table(dwd_full, "dwd")
ods_schema, ods_table = _split_table(ods_full, "ods")
try:
dwd_columns = _fetch_table_columns(cur, dwd_schema, dwd_table)
ods_columns = _fetch_table_columns(cur, ods_schema, ods_table)
if not dwd_columns:
result = TableCheckResult(
table_name=dwd_full,
check_type="ods_vs_dwd",
passed=False,
error=f"DWD 表 {dwd_full} 不存在或无列",
)
report.ods_vs_dwd_results.append(result)
continue
mappings = fact_mappings.get(dwd_full)
result = check_ods_vs_dwd_mappings(
dwd_full, ods_full,
dwd_columns, ods_columns,
mappings,
)
# 补充行数统计
result.row_count_source = _fetch_row_count(cur, ods_schema, ods_table)
result.row_count_target = _fetch_row_count(cur, dwd_schema, dwd_table)
# 采样值对比
if sample_limit > 0:
explicit_map: Dict[str, Tuple[str, str | None]] = {}
if mappings:
for dwd_col, ods_expr, cast_type in mappings:
explicit_map[dwd_col.lower()] = (ods_expr, cast_type)
dwd_pk = _fetch_pk_columns(cur, dwd_schema, dwd_table)
ods_pk = _fetch_pk_columns(cur, ods_schema, ods_table)
samples = _sample_value_mismatches(
cur,
dwd_schema, dwd_table,
ods_schema, ods_table,
dwd_pk, ods_pk,
explicit_map,
dwd_columns, ods_columns,
limit=sample_limit,
)
result.sample_mismatches = samples
report.ods_vs_dwd_results.append(result)
except Exception as exc:
result = TableCheckResult(
table_name=dwd_full,
check_type="ods_vs_dwd",
passed=False,
error=f"{type(exc).__name__}: {exc}",
)
report.ods_vs_dwd_results.append(result)
return report
# ---------------------------------------------------------------------------
# 报告生成
# ---------------------------------------------------------------------------
def generate_markdown_report(report: ConsistencyReport) -> str:
"""将 ConsistencyReport 转换为 Markdown 格式的黑盒测试报告"""
lines: list[str] = []
lines.append("# 数据一致性黑盒测试报告")
lines.append("")
lines.append(f"生成时间: {report.generated_at}")
lines.append(f"总体结果: **{'✅ 全部通过' if report.all_passed else '❌ 存在异常'}**")
lines.append("")
# --- 汇总表格 ---
lines.append("## 汇总")
lines.append("")
if report.api_vs_ods_results:
api_pass = sum(1 for r in report.api_vs_ods_results if r.passed)
api_total = len(report.api_vs_ods_results)
lines.append(f"- API vs ODS 字段完整性: {api_pass}/{api_total} 张表通过")
if report.ods_vs_dwd_results:
dwd_pass = sum(1 for r in report.ods_vs_dwd_results if r.passed)
dwd_total = len(report.ods_vs_dwd_results)
lines.append(f"- ODS vs DWD 映射正确性: {dwd_pass}/{dwd_total} 张表通过")
lines.append("")
# --- API vs ODS 详细结果 ---
if report.api_vs_ods_results:
lines.append("## API vs ODS 字段完整性检查")
lines.append("")
lines.append("| 表名 | 状态 | 总字段 | 通过 | 缺失 | 备注 |")
lines.append("|------|------|--------|------|------|------|")
for r in report.api_vs_ods_results:
status = "" if r.passed else ""
note = r.error or ""
lines.append(
f"| {r.table_name} | {status} | {r.total_fields} "
f"| {r.passed_fields} | {r.missing_fields} | {note} |"
)
lines.append("")
# 失败表的详细字段列表
failed = [r for r in report.api_vs_ods_results if not r.passed and r.field_results]
for r in failed:
lines.append(f"### {r.table_name} — 缺失字段明细")
lines.append("")
missing = [f for f in r.field_results if f.status == "missing"]
for f in missing:
lines.append(f"- `{f.field_name}`: {f.detail}")
lines.append("")
# --- ODS vs DWD 详细结果 ---
if report.ods_vs_dwd_results:
lines.append("## ODS vs DWD 映射正确性检查")
lines.append("")
lines.append("| DWD 表 | 状态 | 总字段 | 通过 | 缺失 | 不一致 | ODS 行数 | DWD 行数 | 备注 |")
lines.append("|--------|------|--------|------|------|--------|----------|----------|------|")
for r in report.ods_vs_dwd_results:
status = "" if r.passed else ""
note = r.error or ""
lines.append(
f"| {r.table_name} | {status} | {r.total_fields} "
f"| {r.passed_fields} | {r.missing_fields} | {r.mismatch_fields} "
f"| {r.row_count_source} | {r.row_count_target} | {note} |"
)
lines.append("")
# 失败表的详细字段列表
failed_dwd = [r for r in report.ods_vs_dwd_results if not r.passed and r.field_results]
for r in failed_dwd:
lines.append(f"### {r.table_name} — 映射异常明细")
lines.append("")
issues = [f for f in r.field_results if f.status in ("missing", "mismatch")]
for f in issues:
lines.append(f"- `{f.field_name}` [{f.status}]: {f.detail}")
lines.append("")
if r.sample_mismatches:
lines.append(f"#### 值不一致采样(前 {len(r.sample_mismatches)} 条)")
lines.append("")
lines.append("```json")
# 序列化时处理不可序列化的类型
safe_samples = _safe_serialize(r.sample_mismatches)
lines.append(json.dumps(safe_samples, ensure_ascii=False, indent=2))
lines.append("```")
lines.append("")
return "\n".join(lines)
def _safe_serialize(obj: Any) -> Any:
"""将不可 JSON 序列化的类型转为字符串"""
if isinstance(obj, list):
return [_safe_serialize(item) for item in obj]
if isinstance(obj, dict):
return {k: _safe_serialize(v) for k, v in obj.items()}
if isinstance(obj, (datetime,)):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.hex()
try:
json.dumps(obj)
return obj
except (TypeError, ValueError):
return str(obj)
def write_consistency_report(
report: ConsistencyReport,
*,
report_path: Path | None = None,
) -> str:
"""
将一致性检查报告写入文件。
输出路径通过 ETL_REPORT_ROOT 环境变量控制。
"""
if report_path is None:
env_root = os.environ.get("ETL_REPORT_ROOT")
if not env_root:
raise KeyError(
"环境变量 ETL_REPORT_ROOT 未定义。"
"请在根 .env 中配置,参考 docs/deployment/EXPORT-PATHS.md"
)
root = Path(env_root)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = root / f"consistency_report_{stamp}.md"
md_content = generate_markdown_report(report)
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(md_content, encoding="utf-8")
return str(report_path)