# -*- coding: utf-8 -*- """校验结果数据模型""" from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import List, Optional, Dict, Any class VerificationStatus(Enum): """校验状态""" OK = "OK" # 数据一致 MISSING = "MISSING" # 有缺失数据 MISMATCH = "MISMATCH" # 有不一致数据 BACKFILLED = "BACKFILLED" # 已补齐 ERROR = "ERROR" # 校验出错 @dataclass class VerificationResult: """单表校验结果""" layer: str # 数据层: "ODS" / "DWD" / "DWS" / "INDEX" table: str # 表名 window_start: datetime # 校验窗口开始 window_end: datetime # 校验窗口结束 source_count: int = 0 # 源数据量 target_count: int = 0 # 目标数据量 missing_count: int = 0 # 缺失记录数 mismatch_count: int = 0 # 不一致记录数 backfilled_count: int = 0 # 已补齐记录数(缺失 + 不一致) backfilled_missing_count: int = 0 # 缺失补齐数 backfilled_mismatch_count: int = 0 # 不一致补齐数 status: VerificationStatus = VerificationStatus.OK elapsed_seconds: float = 0.0 # 耗时(秒) error_message: Optional[str] = None # 错误信息 details: Dict[str, Any] = field(default_factory=dict) # 额外详情 @property def is_consistent(self) -> bool: """数据是否一致""" return self.status == VerificationStatus.OK @property def needs_backfill(self) -> bool: """是否需要补齐""" return self.missing_count > 0 or self.mismatch_count > 0 def to_dict(self) -> dict: """转换为字典""" return { "layer": self.layer, "table": self.table, "window_start": self.window_start.isoformat() if self.window_start else None, "window_end": self.window_end.isoformat() if self.window_end else None, "source_count": self.source_count, "target_count": self.target_count, "missing_count": self.missing_count, "mismatch_count": self.mismatch_count, "backfilled_count": self.backfilled_count, "backfilled_missing_count": self.backfilled_missing_count, "backfilled_mismatch_count": self.backfilled_mismatch_count, "status": self.status.value, "elapsed_seconds": self.elapsed_seconds, "error_message": self.error_message, "details": self.details, } def format_summary(self) -> str: """格式化摘要""" lines = [ f"表: {self.table}", f"层: {self.layer}", f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}", f"源数据量: {self.source_count:,}", f"目标数据量: {self.target_count:,}", f"缺失: {self.missing_count:,}", f"不一致: {self.mismatch_count:,}", f"缺失补齐: {self.backfilled_missing_count:,}", f"不一致补齐: {self.backfilled_mismatch_count:,}", f"已补齐: {self.backfilled_count:,}", f"状态: {self.status.value}", f"耗时: {self.elapsed_seconds:.2f}s", ] if self.error_message: lines.append(f"错误: {self.error_message}") return "\n".join(lines) @dataclass class VerificationSummary: """校验汇总结果""" layer: str # 数据层 window_start: datetime # 校验窗口开始 window_end: datetime # 校验窗口结束 total_tables: int = 0 # 总表数 consistent_tables: int = 0 # 一致的表数 inconsistent_tables: int = 0 # 不一致的表数 total_source_count: int = 0 # 总源数据量 total_target_count: int = 0 # 总目标数据量 total_missing: int = 0 # 总缺失数 total_mismatch: int = 0 # 总不一致数 total_backfilled: int = 0 # 总补齐数 total_backfilled_missing: int = 0 # 总缺失补齐数 total_backfilled_mismatch: int = 0 # 总不一致补齐数 error_tables: int = 0 # 发生错误的表数 elapsed_seconds: float = 0.0 # 总耗时 results: List[VerificationResult] = field(default_factory=list) # 各表结果 status: VerificationStatus = VerificationStatus.OK def add_result(self, result: VerificationResult): """添加单表结果""" self.results.append(result) self.total_tables += 1 self.total_source_count += result.source_count self.total_target_count += result.target_count self.total_missing += result.missing_count self.total_mismatch += result.mismatch_count self.total_backfilled += result.backfilled_count self.total_backfilled_missing += result.backfilled_missing_count self.total_backfilled_mismatch += result.backfilled_mismatch_count self.elapsed_seconds += result.elapsed_seconds if result.status == VerificationStatus.ERROR: self.error_tables += 1 self.inconsistent_tables += 1 # 错误优先级最高,直接覆盖汇总状态 self.status = VerificationStatus.ERROR elif result.is_consistent: self.consistent_tables += 1 else: self.inconsistent_tables += 1 if self.status == VerificationStatus.OK: self.status = result.status @property def is_all_consistent(self) -> bool: """是否全部一致""" return self.inconsistent_tables == 0 def to_dict(self) -> dict: """转换为字典""" return { "layer": self.layer, "window_start": self.window_start.isoformat() if self.window_start else None, "window_end": self.window_end.isoformat() if self.window_end else None, "total_tables": self.total_tables, "consistent_tables": self.consistent_tables, "inconsistent_tables": self.inconsistent_tables, "total_source_count": self.total_source_count, "total_target_count": self.total_target_count, "total_missing": self.total_missing, "total_mismatch": self.total_mismatch, "total_backfilled": self.total_backfilled, "total_backfilled_missing": self.total_backfilled_missing, "total_backfilled_mismatch": self.total_backfilled_mismatch, "error_tables": self.error_tables, "elapsed_seconds": self.elapsed_seconds, "status": self.status.value, "results": [r.to_dict() for r in self.results], } def format_summary(self) -> str: """格式化汇总摘要""" lines = [ f"{'=' * 60}", f"校验汇总 - {self.layer}", f"{'=' * 60}", f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}", f"表数: {self.total_tables} (一致: {self.consistent_tables}, 不一致: {self.inconsistent_tables})", f"源数据量: {self.total_source_count:,}", f"目标数据量: {self.total_target_count:,}", f"总缺失: {self.total_missing:,}", f"总不一致: {self.total_mismatch:,}", f"总补齐: {self.total_backfilled:,} (缺失: {self.total_backfilled_missing:,}, 不一致: {self.total_backfilled_mismatch:,})", f"错误表数: {self.error_tables}", f"总耗时: {self.elapsed_seconds:.2f}s", f"状态: {self.status.value}", f"{'=' * 60}", ] return "\n".join(lines) @dataclass class WindowSegment: """时间窗口片段""" start: datetime end: datetime index: int = 0 total: int = 1 @property def label(self) -> str: """片段标签""" return f"{self.start.strftime('%Y-%m-%d')} ~ {self.end.strftime('%Y-%m-%d')}" def build_window_segments( window_start: datetime, window_end: datetime, split_unit: str = "month", ) -> List[WindowSegment]: """ 按指定单位切分时间窗口 Args: window_start: 开始时间 window_end: 结束时间 split_unit: 切分单位 ("none", "day", "week", "month") Returns: 时间窗口片段列表 """ if split_unit == "none" or not split_unit: return [WindowSegment(start=window_start, end=window_end, index=0, total=1)] segments = [] current = window_start while current < window_end: if split_unit == "day": # 按天切分 next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0) next_boundary = next_boundary + timedelta(days=1) elif split_unit == "week": # 按周切分(周一为起点) days_until_monday = (7 - current.weekday()) % 7 if days_until_monday == 0: days_until_monday = 7 next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0) next_boundary = next_boundary + timedelta(days=days_until_monday) elif split_unit == "month": # 按月切分 if current.month == 12: next_boundary = current.replace(year=current.year + 1, month=1, day=1, hour=0, minute=0, second=0, microsecond=0) else: next_boundary = current.replace(month=current.month + 1, day=1, hour=0, minute=0, second=0, microsecond=0) else: # 默认不切分 next_boundary = window_end segment_end = min(next_boundary, window_end) segments.append(WindowSegment(start=current, end=segment_end)) current = segment_end # 更新索引 total = len(segments) for i, seg in enumerate(segments): seg.index = i seg.total = total return segments def filter_verify_tables(layer: str, tables: list[str] | None) -> list[str] | None: """按层过滤校验表名,避免非目标层全量校验。 Args: layer: 数据层名称("ODS" / "DWD" / "DWS" / "INDEX") tables: 待过滤的表名列表,为 None 或空时直接返回 None Returns: 过滤后的表名列表,或 None """ if not tables: return None layer_upper = layer.upper() normalized = [t.strip().lower() for t in tables if t and t.strip()] if layer_upper == "DWD": return [t for t in normalized if t.startswith(("dwd_", "dim_", "fact_"))] if layer_upper == "DWS": return [t for t in normalized if t.startswith("dws_")] if layer_upper == "INDEX": return [t for t in normalized if t.startswith("v_") or t.endswith("_index")] if layer_upper == "ODS": return [t for t in normalized if t.startswith("ods_")] return normalized # 需要导入 timedelta from datetime import timedelta