284 lines
11 KiB
Python
284 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""校验结果数据模型"""
|
||
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
|
||
class VerificationStatus(Enum):
|
||
"""校验状态"""
|
||
OK = "OK" # 数据一致
|
||
MISSING = "MISSING" # 有缺失数据
|
||
MISMATCH = "MISMATCH" # 有不一致数据
|
||
BACKFILLED = "BACKFILLED" # 已补齐
|
||
ERROR = "ERROR" # 校验出错
|
||
|
||
|
||
@dataclass
|
||
class VerificationResult:
|
||
"""单表校验结果"""
|
||
layer: str # 数据层: "ODS" / "DWD" / "DWS" / "INDEX"
|
||
table: str # 表名
|
||
window_start: datetime # 校验窗口开始
|
||
window_end: datetime # 校验窗口结束
|
||
source_count: int = 0 # 源数据量
|
||
target_count: int = 0 # 目标数据量
|
||
missing_count: int = 0 # 缺失记录数
|
||
mismatch_count: int = 0 # 不一致记录数
|
||
backfilled_count: int = 0 # 已补齐记录数(缺失 + 不一致)
|
||
backfilled_missing_count: int = 0 # 缺失补齐数
|
||
backfilled_mismatch_count: int = 0 # 不一致补齐数
|
||
status: VerificationStatus = VerificationStatus.OK
|
||
elapsed_seconds: float = 0.0 # 耗时(秒)
|
||
error_message: Optional[str] = None # 错误信息
|
||
details: Dict[str, Any] = field(default_factory=dict) # 额外详情
|
||
|
||
@property
|
||
def is_consistent(self) -> bool:
|
||
"""数据是否一致"""
|
||
return self.status == VerificationStatus.OK
|
||
|
||
@property
|
||
def needs_backfill(self) -> bool:
|
||
"""是否需要补齐"""
|
||
return self.missing_count > 0 or self.mismatch_count > 0
|
||
|
||
def to_dict(self) -> dict:
|
||
"""转换为字典"""
|
||
return {
|
||
"layer": self.layer,
|
||
"table": self.table,
|
||
"window_start": self.window_start.isoformat() if self.window_start else None,
|
||
"window_end": self.window_end.isoformat() if self.window_end else None,
|
||
"source_count": self.source_count,
|
||
"target_count": self.target_count,
|
||
"missing_count": self.missing_count,
|
||
"mismatch_count": self.mismatch_count,
|
||
"backfilled_count": self.backfilled_count,
|
||
"backfilled_missing_count": self.backfilled_missing_count,
|
||
"backfilled_mismatch_count": self.backfilled_mismatch_count,
|
||
"status": self.status.value,
|
||
"elapsed_seconds": self.elapsed_seconds,
|
||
"error_message": self.error_message,
|
||
"details": self.details,
|
||
}
|
||
|
||
def format_summary(self) -> str:
|
||
"""格式化摘要"""
|
||
lines = [
|
||
f"表: {self.table}",
|
||
f"层: {self.layer}",
|
||
f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}",
|
||
f"源数据量: {self.source_count:,}",
|
||
f"目标数据量: {self.target_count:,}",
|
||
f"缺失: {self.missing_count:,}",
|
||
f"不一致: {self.mismatch_count:,}",
|
||
f"缺失补齐: {self.backfilled_missing_count:,}",
|
||
f"不一致补齐: {self.backfilled_mismatch_count:,}",
|
||
f"已补齐: {self.backfilled_count:,}",
|
||
f"状态: {self.status.value}",
|
||
f"耗时: {self.elapsed_seconds:.2f}s",
|
||
]
|
||
if self.error_message:
|
||
lines.append(f"错误: {self.error_message}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
@dataclass
|
||
class VerificationSummary:
|
||
"""校验汇总结果"""
|
||
layer: str # 数据层
|
||
window_start: datetime # 校验窗口开始
|
||
window_end: datetime # 校验窗口结束
|
||
total_tables: int = 0 # 总表数
|
||
consistent_tables: int = 0 # 一致的表数
|
||
inconsistent_tables: int = 0 # 不一致的表数
|
||
total_source_count: int = 0 # 总源数据量
|
||
total_target_count: int = 0 # 总目标数据量
|
||
total_missing: int = 0 # 总缺失数
|
||
total_mismatch: int = 0 # 总不一致数
|
||
total_backfilled: int = 0 # 总补齐数
|
||
total_backfilled_missing: int = 0 # 总缺失补齐数
|
||
total_backfilled_mismatch: int = 0 # 总不一致补齐数
|
||
error_tables: int = 0 # 发生错误的表数
|
||
elapsed_seconds: float = 0.0 # 总耗时
|
||
results: List[VerificationResult] = field(default_factory=list) # 各表结果
|
||
status: VerificationStatus = VerificationStatus.OK
|
||
|
||
def add_result(self, result: VerificationResult):
|
||
"""添加单表结果"""
|
||
self.results.append(result)
|
||
self.total_tables += 1
|
||
self.total_source_count += result.source_count
|
||
self.total_target_count += result.target_count
|
||
self.total_missing += result.missing_count
|
||
self.total_mismatch += result.mismatch_count
|
||
self.total_backfilled += result.backfilled_count
|
||
self.total_backfilled_missing += result.backfilled_missing_count
|
||
self.total_backfilled_mismatch += result.backfilled_mismatch_count
|
||
self.elapsed_seconds += result.elapsed_seconds
|
||
|
||
if result.status == VerificationStatus.ERROR:
|
||
self.error_tables += 1
|
||
self.inconsistent_tables += 1
|
||
# 错误优先级最高,直接覆盖汇总状态
|
||
self.status = VerificationStatus.ERROR
|
||
elif result.is_consistent:
|
||
self.consistent_tables += 1
|
||
else:
|
||
self.inconsistent_tables += 1
|
||
if self.status == VerificationStatus.OK:
|
||
self.status = result.status
|
||
|
||
@property
|
||
def is_all_consistent(self) -> bool:
|
||
"""是否全部一致"""
|
||
return self.inconsistent_tables == 0
|
||
|
||
def to_dict(self) -> dict:
|
||
"""转换为字典"""
|
||
return {
|
||
"layer": self.layer,
|
||
"window_start": self.window_start.isoformat() if self.window_start else None,
|
||
"window_end": self.window_end.isoformat() if self.window_end else None,
|
||
"total_tables": self.total_tables,
|
||
"consistent_tables": self.consistent_tables,
|
||
"inconsistent_tables": self.inconsistent_tables,
|
||
"total_source_count": self.total_source_count,
|
||
"total_target_count": self.total_target_count,
|
||
"total_missing": self.total_missing,
|
||
"total_mismatch": self.total_mismatch,
|
||
"total_backfilled": self.total_backfilled,
|
||
"total_backfilled_missing": self.total_backfilled_missing,
|
||
"total_backfilled_mismatch": self.total_backfilled_mismatch,
|
||
"error_tables": self.error_tables,
|
||
"elapsed_seconds": self.elapsed_seconds,
|
||
"status": self.status.value,
|
||
"results": [r.to_dict() for r in self.results],
|
||
}
|
||
|
||
def format_summary(self) -> str:
|
||
"""格式化汇总摘要"""
|
||
lines = [
|
||
f"{'=' * 60}",
|
||
f"校验汇总 - {self.layer}",
|
||
f"{'=' * 60}",
|
||
f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}",
|
||
f"表数: {self.total_tables} (一致: {self.consistent_tables}, 不一致: {self.inconsistent_tables})",
|
||
f"源数据量: {self.total_source_count:,}",
|
||
f"目标数据量: {self.total_target_count:,}",
|
||
f"总缺失: {self.total_missing:,}",
|
||
f"总不一致: {self.total_mismatch:,}",
|
||
f"总补齐: {self.total_backfilled:,} (缺失: {self.total_backfilled_missing:,}, 不一致: {self.total_backfilled_mismatch:,})",
|
||
f"错误表数: {self.error_tables}",
|
||
f"总耗时: {self.elapsed_seconds:.2f}s",
|
||
f"状态: {self.status.value}",
|
||
f"{'=' * 60}",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
|
||
@dataclass
|
||
class WindowSegment:
|
||
"""时间窗口片段"""
|
||
start: datetime
|
||
end: datetime
|
||
index: int = 0
|
||
total: int = 1
|
||
|
||
@property
|
||
def label(self) -> str:
|
||
"""片段标签"""
|
||
return f"{self.start.strftime('%Y-%m-%d')} ~ {self.end.strftime('%Y-%m-%d')}"
|
||
|
||
|
||
def build_window_segments(
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
split_unit: str = "month",
|
||
) -> List[WindowSegment]:
|
||
"""
|
||
按指定单位切分时间窗口
|
||
|
||
Args:
|
||
window_start: 开始时间
|
||
window_end: 结束时间
|
||
split_unit: 切分单位 ("none", "day", "week", "month")
|
||
|
||
Returns:
|
||
时间窗口片段列表
|
||
"""
|
||
if split_unit == "none" or not split_unit:
|
||
return [WindowSegment(start=window_start, end=window_end, index=0, total=1)]
|
||
|
||
segments = []
|
||
current = window_start
|
||
|
||
while current < window_end:
|
||
if split_unit == "day":
|
||
# 按天切分
|
||
next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
next_boundary = next_boundary + timedelta(days=1)
|
||
elif split_unit == "week":
|
||
# 按周切分(周一为起点)
|
||
days_until_monday = (7 - current.weekday()) % 7
|
||
if days_until_monday == 0:
|
||
days_until_monday = 7
|
||
next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
next_boundary = next_boundary + timedelta(days=days_until_monday)
|
||
elif split_unit == "month":
|
||
# 按月切分
|
||
if current.month == 12:
|
||
next_boundary = current.replace(year=current.year + 1, month=1, day=1,
|
||
hour=0, minute=0, second=0, microsecond=0)
|
||
else:
|
||
next_boundary = current.replace(month=current.month + 1, day=1,
|
||
hour=0, minute=0, second=0, microsecond=0)
|
||
else:
|
||
# 默认不切分
|
||
next_boundary = window_end
|
||
|
||
segment_end = min(next_boundary, window_end)
|
||
segments.append(WindowSegment(start=current, end=segment_end))
|
||
current = segment_end
|
||
|
||
# 更新索引
|
||
total = len(segments)
|
||
for i, seg in enumerate(segments):
|
||
seg.index = i
|
||
seg.total = total
|
||
|
||
return segments
|
||
|
||
def filter_verify_tables(layer: str, tables: list[str] | None) -> list[str] | None:
|
||
"""按层过滤校验表名,避免非目标层全量校验。
|
||
|
||
Args:
|
||
layer: 数据层名称("ODS" / "DWD" / "DWS" / "INDEX")
|
||
tables: 待过滤的表名列表,为 None 或空时直接返回 None
|
||
|
||
Returns:
|
||
过滤后的表名列表,或 None
|
||
"""
|
||
if not tables:
|
||
return None
|
||
layer_upper = layer.upper()
|
||
normalized = [t.strip().lower() for t in tables if t and t.strip()]
|
||
if layer_upper == "DWD":
|
||
return [t for t in normalized if t.startswith(("dwd_", "dim_", "fact_"))]
|
||
if layer_upper == "DWS":
|
||
return [t for t in normalized if t.startswith("dws_")]
|
||
if layer_upper == "INDEX":
|
||
return [t for t in normalized if t.startswith("v_") or t.endswith("_index")]
|
||
if layer_upper == "ODS":
|
||
return [t for t in normalized if t.startswith("ods_")]
|
||
return normalized
|
||
|
||
|
||
|
||
|
||
# 需要导入 timedelta
|
||
from datetime import timedelta
|