131 lines
3.8 KiB
Python
131 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""生成 ETL 计时报告和一致性检查报告的独立脚本。
|
|
|
|
用于验证 EtlTimer 和 ConsistencyChecker 集成后的报告输出功能。
|
|
不执行实际 ETL 任务,仅运行计时器模拟和数据库一致性检查。
|
|
|
|
输出路径通过 ETL_REPORT_ROOT / API_SAMPLE_CACHE_ROOT 环境变量控制。
|
|
"""
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# 确保 ETL 模块可导入
|
|
ETL_ROOT = Path(__file__).resolve().parent.parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
|
|
sys.path.insert(0, str(ETL_ROOT))
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv(Path(__file__).resolve().parent.parent.parent / ".env")
|
|
|
|
|
|
def generate_timing_report() -> str:
|
|
"""生成模拟计时报告,验证 EtlTimer 输出功能。"""
|
|
import time
|
|
from utils.timer import EtlTimer
|
|
|
|
timer = EtlTimer()
|
|
timer.start()
|
|
|
|
# 模拟几个 ETL 步骤
|
|
steps = [
|
|
("ODS_ASSISTANT_ACCOUNT", 0.05),
|
|
("ODS_MEMBER", 0.03),
|
|
("DWD_LOAD_FROM_ODS", 0.08),
|
|
("DWS_COACH_PERFORMANCE", 0.04),
|
|
("CONSISTENCY_CHECK", 0.02),
|
|
]
|
|
|
|
for step_name, delay in steps:
|
|
timer.start_step(step_name)
|
|
time.sleep(delay)
|
|
timer.stop_step(step_name)
|
|
|
|
report_text = timer.finish(write_report=True)
|
|
print("[OK] 计时报告已生成")
|
|
return report_text
|
|
|
|
|
|
def generate_consistency_report() -> str | None:
|
|
"""运行数据一致性检查并生成报告。"""
|
|
from quality.consistency_checker import (
|
|
run_consistency_check,
|
|
write_consistency_report,
|
|
)
|
|
from database.connection import DatabaseConnection
|
|
from zoneinfo import ZoneInfo
|
|
|
|
dsn = os.environ.get("PG_DSN")
|
|
if not dsn:
|
|
print("[SKIP] PG_DSN 未定义,跳过一致性检查")
|
|
return None
|
|
|
|
api_sample_dir_str = os.environ.get("API_SAMPLE_CACHE_ROOT")
|
|
api_sample_dir = Path(api_sample_dir_str) if api_sample_dir_str else None
|
|
|
|
db_conn = DatabaseConnection(dsn=dsn)
|
|
try:
|
|
report = run_consistency_check(
|
|
db_conn,
|
|
api_sample_dir=api_sample_dir,
|
|
include_api_vs_ods=bool(api_sample_dir),
|
|
include_ods_vs_dwd=True,
|
|
tz=ZoneInfo("Asia/Shanghai"),
|
|
)
|
|
|
|
report_path = write_consistency_report(report)
|
|
print(f"[OK] 一致性检查报告已生成: {report_path}")
|
|
|
|
# 打印摘要
|
|
if report.ods_vs_dwd_results:
|
|
passed = sum(1 for r in report.ods_vs_dwd_results if r.passed)
|
|
total = len(report.ods_vs_dwd_results)
|
|
print(f" ODS vs DWD: {passed}/{total} 张表通过")
|
|
|
|
if report.api_vs_ods_results:
|
|
passed = sum(1 for r in report.api_vs_ods_results if r.passed)
|
|
total = len(report.api_vs_ods_results)
|
|
print(f" API vs ODS: {passed}/{total} 张表通过")
|
|
|
|
return report_path
|
|
finally:
|
|
db_conn.close()
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("ETL 报告生成脚本")
|
|
print("=" * 60)
|
|
|
|
# 检查环境变量
|
|
etl_report_root = os.environ.get("ETL_REPORT_ROOT")
|
|
if not etl_report_root:
|
|
print("[ERROR] ETL_REPORT_ROOT 环境变量未定义")
|
|
sys.exit(1)
|
|
print(f"报告输出目录: {etl_report_root}")
|
|
print()
|
|
|
|
# 1. 计时报告
|
|
print("--- 1. 生成计时报告 ---")
|
|
generate_timing_report()
|
|
print()
|
|
|
|
# 2. 一致性检查报告
|
|
print("--- 2. 生成一致性检查报告 ---")
|
|
generate_consistency_report()
|
|
print()
|
|
|
|
# 列出生成的文件
|
|
print("--- 生成的报告文件 ---")
|
|
report_dir = Path(etl_report_root)
|
|
if report_dir.exists():
|
|
for f in sorted(report_dir.iterdir()):
|
|
if f.name.startswith(("etl_timing_", "consistency_report_")):
|
|
print(f" {f.name} ({f.stat().st_size} bytes)")
|
|
|
|
print()
|
|
print("完成。")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|