Files

284 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""校验结果数据模型"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict, Any
class VerificationStatus(Enum):
"""校验状态"""
OK = "OK" # 数据一致
MISSING = "MISSING" # 有缺失数据
MISMATCH = "MISMATCH" # 有不一致数据
BACKFILLED = "BACKFILLED" # 已补齐
ERROR = "ERROR" # 校验出错
@dataclass
class VerificationResult:
"""单表校验结果"""
layer: str # 数据层: "ODS" / "DWD" / "DWS" / "INDEX"
table: str # 表名
window_start: datetime # 校验窗口开始
window_end: datetime # 校验窗口结束
source_count: int = 0 # 源数据量
target_count: int = 0 # 目标数据量
missing_count: int = 0 # 缺失记录数
mismatch_count: int = 0 # 不一致记录数
backfilled_count: int = 0 # 已补齐记录数(缺失 + 不一致)
backfilled_missing_count: int = 0 # 缺失补齐数
backfilled_mismatch_count: int = 0 # 不一致补齐数
status: VerificationStatus = VerificationStatus.OK
elapsed_seconds: float = 0.0 # 耗时(秒)
error_message: Optional[str] = None # 错误信息
details: Dict[str, Any] = field(default_factory=dict) # 额外详情
@property
def is_consistent(self) -> bool:
"""数据是否一致"""
return self.status == VerificationStatus.OK
@property
def needs_backfill(self) -> bool:
"""是否需要补齐"""
return self.missing_count > 0 or self.mismatch_count > 0
def to_dict(self) -> dict:
"""转换为字典"""
return {
"layer": self.layer,
"table": self.table,
"window_start": self.window_start.isoformat() if self.window_start else None,
"window_end": self.window_end.isoformat() if self.window_end else None,
"source_count": self.source_count,
"target_count": self.target_count,
"missing_count": self.missing_count,
"mismatch_count": self.mismatch_count,
"backfilled_count": self.backfilled_count,
"backfilled_missing_count": self.backfilled_missing_count,
"backfilled_mismatch_count": self.backfilled_mismatch_count,
"status": self.status.value,
"elapsed_seconds": self.elapsed_seconds,
"error_message": self.error_message,
"details": self.details,
}
def format_summary(self) -> str:
"""格式化摘要"""
lines = [
f"表: {self.table}",
f"层: {self.layer}",
f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}",
f"源数据量: {self.source_count:,}",
f"目标数据量: {self.target_count:,}",
f"缺失: {self.missing_count:,}",
f"不一致: {self.mismatch_count:,}",
f"缺失补齐: {self.backfilled_missing_count:,}",
f"不一致补齐: {self.backfilled_mismatch_count:,}",
f"已补齐: {self.backfilled_count:,}",
f"状态: {self.status.value}",
f"耗时: {self.elapsed_seconds:.2f}s",
]
if self.error_message:
lines.append(f"错误: {self.error_message}")
return "\n".join(lines)
@dataclass
class VerificationSummary:
"""校验汇总结果"""
layer: str # 数据层
window_start: datetime # 校验窗口开始
window_end: datetime # 校验窗口结束
total_tables: int = 0 # 总表数
consistent_tables: int = 0 # 一致的表数
inconsistent_tables: int = 0 # 不一致的表数
total_source_count: int = 0 # 总源数据量
total_target_count: int = 0 # 总目标数据量
total_missing: int = 0 # 总缺失数
total_mismatch: int = 0 # 总不一致数
total_backfilled: int = 0 # 总补齐数
total_backfilled_missing: int = 0 # 总缺失补齐数
total_backfilled_mismatch: int = 0 # 总不一致补齐数
error_tables: int = 0 # 发生错误的表数
elapsed_seconds: float = 0.0 # 总耗时
results: List[VerificationResult] = field(default_factory=list) # 各表结果
status: VerificationStatus = VerificationStatus.OK
def add_result(self, result: VerificationResult):
"""添加单表结果"""
self.results.append(result)
self.total_tables += 1
self.total_source_count += result.source_count
self.total_target_count += result.target_count
self.total_missing += result.missing_count
self.total_mismatch += result.mismatch_count
self.total_backfilled += result.backfilled_count
self.total_backfilled_missing += result.backfilled_missing_count
self.total_backfilled_mismatch += result.backfilled_mismatch_count
self.elapsed_seconds += result.elapsed_seconds
if result.status == VerificationStatus.ERROR:
self.error_tables += 1
self.inconsistent_tables += 1
# 错误优先级最高,直接覆盖汇总状态
self.status = VerificationStatus.ERROR
elif result.is_consistent:
self.consistent_tables += 1
else:
self.inconsistent_tables += 1
if self.status == VerificationStatus.OK:
self.status = result.status
@property
def is_all_consistent(self) -> bool:
"""是否全部一致"""
return self.inconsistent_tables == 0
def to_dict(self) -> dict:
"""转换为字典"""
return {
"layer": self.layer,
"window_start": self.window_start.isoformat() if self.window_start else None,
"window_end": self.window_end.isoformat() if self.window_end else None,
"total_tables": self.total_tables,
"consistent_tables": self.consistent_tables,
"inconsistent_tables": self.inconsistent_tables,
"total_source_count": self.total_source_count,
"total_target_count": self.total_target_count,
"total_missing": self.total_missing,
"total_mismatch": self.total_mismatch,
"total_backfilled": self.total_backfilled,
"total_backfilled_missing": self.total_backfilled_missing,
"total_backfilled_mismatch": self.total_backfilled_mismatch,
"error_tables": self.error_tables,
"elapsed_seconds": self.elapsed_seconds,
"status": self.status.value,
"results": [r.to_dict() for r in self.results],
}
def format_summary(self) -> str:
"""格式化汇总摘要"""
lines = [
f"{'=' * 60}",
f"校验汇总 - {self.layer}",
f"{'=' * 60}",
f"窗口: {self.window_start.strftime('%Y-%m-%d %H:%M')} ~ {self.window_end.strftime('%Y-%m-%d %H:%M')}",
f"表数: {self.total_tables} (一致: {self.consistent_tables}, 不一致: {self.inconsistent_tables})",
f"源数据量: {self.total_source_count:,}",
f"目标数据量: {self.total_target_count:,}",
f"总缺失: {self.total_missing:,}",
f"总不一致: {self.total_mismatch:,}",
f"总补齐: {self.total_backfilled:,} (缺失: {self.total_backfilled_missing:,}, 不一致: {self.total_backfilled_mismatch:,})",
f"错误表数: {self.error_tables}",
f"总耗时: {self.elapsed_seconds:.2f}s",
f"状态: {self.status.value}",
f"{'=' * 60}",
]
return "\n".join(lines)
@dataclass
class WindowSegment:
"""时间窗口片段"""
start: datetime
end: datetime
index: int = 0
total: int = 1
@property
def label(self) -> str:
"""片段标签"""
return f"{self.start.strftime('%Y-%m-%d')} ~ {self.end.strftime('%Y-%m-%d')}"
def build_window_segments(
window_start: datetime,
window_end: datetime,
split_unit: str = "month",
) -> List[WindowSegment]:
"""
按指定单位切分时间窗口
Args:
window_start: 开始时间
window_end: 结束时间
split_unit: 切分单位 ("none", "day", "week", "month")
Returns:
时间窗口片段列表
"""
if split_unit == "none" or not split_unit:
return [WindowSegment(start=window_start, end=window_end, index=0, total=1)]
segments = []
current = window_start
while current < window_end:
if split_unit == "day":
# 按天切分
next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0)
next_boundary = next_boundary + timedelta(days=1)
elif split_unit == "week":
# 按周切分(周一为起点)
days_until_monday = (7 - current.weekday()) % 7
if days_until_monday == 0:
days_until_monday = 7
next_boundary = current.replace(hour=0, minute=0, second=0, microsecond=0)
next_boundary = next_boundary + timedelta(days=days_until_monday)
elif split_unit == "month":
# 按月切分
if current.month == 12:
next_boundary = current.replace(year=current.year + 1, month=1, day=1,
hour=0, minute=0, second=0, microsecond=0)
else:
next_boundary = current.replace(month=current.month + 1, day=1,
hour=0, minute=0, second=0, microsecond=0)
else:
# 默认不切分
next_boundary = window_end
segment_end = min(next_boundary, window_end)
segments.append(WindowSegment(start=current, end=segment_end))
current = segment_end
# 更新索引
total = len(segments)
for i, seg in enumerate(segments):
seg.index = i
seg.total = total
return segments
def filter_verify_tables(layer: str, tables: list[str] | None) -> list[str] | None:
"""按层过滤校验表名,避免非目标层全量校验。
Args:
layer: 数据层名称("ODS" / "DWD" / "DWS" / "INDEX"
tables: 待过滤的表名列表,为 None 或空时直接返回 None
Returns:
过滤后的表名列表,或 None
"""
if not tables:
return None
layer_upper = layer.upper()
normalized = [t.strip().lower() for t in tables if t and t.strip()]
if layer_upper == "DWD":
return [t for t in normalized if t.startswith(("dwd_", "dim_", "fact_"))]
if layer_upper == "DWS":
return [t for t in normalized if t.startswith("dws_")]
if layer_upper == "INDEX":
return [t for t in normalized if t.startswith("v_") or t.endswith("_index")]
if layer_upper == "ODS":
return [t for t in normalized if t.startswith("ods_")]
return normalized
# 需要导入 timedelta
from datetime import timedelta