Files
Neo-ZQYY/apps/etl/connectors/feiqiu/quality/integrity_service.py

266 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# AI_CHANGELOG [2026-02-14] 默认时区从 Asia/Taipei 修正为 Asia/Shanghai3 处)
"""Shared integrity flow helpers (window/history + optional backfill)."""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Tuple
from zoneinfo import ZoneInfo
import json
import os
from quality.integrity_checker import IntegrityWindow, compute_last_etl_end, run_integrity_history, run_integrity_window
from scripts.repair.backfill_missing_data import run_backfill
from utils.windowing import split_window
def _normalize_windows(cfg, windows: Iterable[Tuple[datetime, datetime]]) -> list[Tuple[datetime, datetime]]:
segments = list(windows)
if not segments:
return segments
force_monthly = bool(cfg.get("integrity.force_monthly_split", True))
if not force_monthly:
return segments
overall_start = segments[0][0]
overall_end = segments[-1][1]
total_days = (overall_end - overall_start).total_seconds() / 86400.0
if total_days <= 31 and len(segments) == 1:
return segments
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Shanghai"))
comp_hours = cfg.get("run.window_split.compensation_hours", 0)
monthly = split_window(
overall_start,
overall_end,
tz=tz,
split_unit="month",
compensation_hours=comp_hours,
)
return monthly or segments
def build_window_report(
*,
cfg,
windows: Iterable[Tuple[datetime, datetime]],
include_dimensions: bool,
task_codes: str,
logger,
compare_content: bool | None,
content_sample_limit: int | None,
) -> tuple[dict, dict]:
window_reports = []
total_missing = 0
total_mismatch = 0
total_errors = 0
segments = list(windows)
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
window = IntegrityWindow(
start=seg_start,
end=seg_end,
label=f"segment_{idx}",
granularity="window",
)
payload = run_integrity_window(
cfg=cfg,
window=window,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=logger,
write_report=False,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
report_path=None,
window_split_unit="none",
window_compensation_hours=0,
)
window_reports.append(payload)
total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0)
total_mismatch += int(payload.get("api_to_ods", {}).get("total_mismatch") or 0)
total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0)
overall_start = segments[0][0]
overall_end = segments[-1][1]
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Shanghai"))
report = {
"mode": "window",
"window": {
"start": overall_start.isoformat(),
"end": overall_end.isoformat(),
"segments": len(segments),
},
"windows": window_reports,
"api_to_ods": {
"total_missing": total_missing,
"total_mismatch": total_mismatch,
"total_errors": total_errors,
},
"total_missing": total_missing,
"total_mismatch": total_mismatch,
"total_errors": total_errors,
"generated_at": datetime.now(tz).isoformat(),
}
counts = {
"missing": int(total_missing or 0),
"mismatch": int(total_mismatch or 0),
"errors": int(total_errors or 0),
}
return report, counts
def run_window_flow(
*,
cfg,
windows: Iterable[Tuple[datetime, datetime]],
include_dimensions: bool,
task_codes: str,
logger,
compare_content: bool | None,
content_sample_limit: int | None,
do_backfill: bool,
include_mismatch: bool,
recheck_after_backfill: bool,
page_size: int | None = None,
chunk_size: int = 500,
) -> tuple[dict, dict]:
segments = _normalize_windows(cfg, windows)
report, counts = build_window_report(
cfg=cfg,
windows=segments,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=logger,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
)
overall_start = segments[0][0]
overall_end = segments[-1][1]
backfill_result = None
post_report = None
if do_backfill:
missing_count = int(counts.get("missing", 0))
mismatch_count = int(counts.get("mismatch", 0))
need_backfill = missing_count > 0 or (include_mismatch and mismatch_count > 0)
if need_backfill:
backfill_result = run_backfill(
cfg=cfg,
start=overall_start,
end=overall_end,
task_codes=task_codes or None,
include_mismatch=bool(include_mismatch),
dry_run=False,
page_size=int(page_size or cfg.get("api.page_size") or 200),
chunk_size=chunk_size,
logger=logger,
)
report["backfill_result"] = backfill_result
if recheck_after_backfill:
post_report, post_counts = build_window_report(
cfg=cfg,
windows=segments,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=logger,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
)
report["post_check"] = post_report
counts.update(post_counts)
return report, counts
def run_history_flow(
*,
cfg,
start_dt: datetime,
end_dt: datetime | None,
include_dimensions: bool,
task_codes: str,
logger,
compare_content: bool | None,
content_sample_limit: int | None,
do_backfill: bool,
include_mismatch: bool,
recheck_after_backfill: bool,
page_size: int | None = None,
chunk_size: int = 500,
) -> tuple[dict, dict]:
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Shanghai"))
if end_dt is None:
end_dt = compute_last_etl_end(cfg) or datetime.now(tz)
report = run_integrity_history(
cfg=cfg,
start_dt=start_dt,
end_dt=end_dt,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=logger,
write_report=False,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
)
counts = {
"missing": int(report.get("total_missing") or 0),
"mismatch": int(report.get("total_mismatch") or 0),
"errors": int(report.get("total_errors") or 0),
}
if do_backfill:
need_backfill = counts.get("missing", 0) > 0 or (include_mismatch and counts.get("mismatch", 0) > 0)
if need_backfill:
backfill_result = run_backfill(
cfg=cfg,
start=start_dt,
end=end_dt,
task_codes=task_codes or None,
include_mismatch=bool(include_mismatch),
dry_run=False,
page_size=int(page_size or cfg.get("api.page_size") or 200),
chunk_size=chunk_size,
logger=logger,
)
report["backfill_result"] = backfill_result
if recheck_after_backfill:
post_report = run_integrity_history(
cfg=cfg,
start_dt=start_dt,
end_dt=end_dt,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=logger,
write_report=False,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
)
report["post_check"] = post_report
counts.update(
{
"missing": int(post_report.get("total_missing") or 0),
"mismatch": int(post_report.get("total_mismatch") or 0),
"errors": int(post_report.get("total_errors") or 0),
}
)
return report, counts
def write_report(report: dict, *, prefix: str, tz: ZoneInfo, report_path: Path | None = None) -> str:
if report_path is None:
# 从 .env 读取 ETL_REPORT_ROOT必须配置
env_root = os.environ.get("ETL_REPORT_ROOT")
if not env_root:
raise KeyError(
"环境变量 ETL_REPORT_ROOT 未定义。"
"请在根 .env 中配置,参考 .env.template 和 docs/deployment/EXPORT-PATHS.md"
)
root_dir = Path(env_root)
stamp = datetime.now(tz).strftime("%Y%m%d_%H%M%S")
report_path = root_dir / f"{prefix}_{stamp}.json"
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return str(report_path)