456 lines
16 KiB
Python
456 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""DWS 汇总层批量校验器
|
||
|
||
校验逻辑:对比 DWD 聚合数据与 DWS 表数据
|
||
- 按日期/门店聚合对比
|
||
- 对比数值一致性
|
||
- 批量重算 UPSERT 补齐
|
||
"""
|
||
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||
|
||
from .base_verifier import BaseVerifier, VerificationFetchError
|
||
|
||
|
||
class DwsVerifier(BaseVerifier):
|
||
"""DWS 汇总层校验器"""
|
||
|
||
def __init__(
|
||
self,
|
||
db_connection: Any,
|
||
logger: Optional[logging.Logger] = None,
|
||
):
|
||
"""
|
||
初始化 DWS 校验器
|
||
|
||
Args:
|
||
db_connection: 数据库连接
|
||
logger: 日志器
|
||
"""
|
||
super().__init__(db_connection, logger)
|
||
self._table_config = self._load_table_config()
|
||
|
||
@property
|
||
def layer_name(self) -> str:
|
||
return "DWS"
|
||
|
||
def _load_table_config(self) -> Dict[str, dict]:
|
||
"""加载 DWS 汇总表配置"""
|
||
# DWS 汇总表通常有以下结构:
|
||
# - 主键:site_id, stat_date 或类似组合
|
||
# - 数值列:各种统计值
|
||
# - 源表:对应的 DWD 事实表
|
||
|
||
return {
|
||
# 财务日度汇总表 - 包含结算、台费、商品、助教等汇总数据
|
||
# 注意:实际 DWS 表使用 gross_amount, table_fee_amount, goods_amount 等列
|
||
"dws_finance_daily_summary": {
|
||
"pk_columns": ["site_id", "stat_date"],
|
||
"time_column": "stat_date",
|
||
"source_table": "billiards_dwd.dwd_settlement_head",
|
||
"source_time_column": "pay_time",
|
||
"agg_sql": """
|
||
SELECT
|
||
site_id,
|
||
tenant_id,
|
||
DATE(pay_time) as stat_date,
|
||
COALESCE(SUM(pay_amount), 0) as cash_pay_amount,
|
||
COALESCE(SUM(table_charge_money), 0) as table_fee_amount,
|
||
COALESCE(SUM(goods_money), 0) as goods_amount,
|
||
COALESCE(SUM(table_charge_money) + SUM(goods_money) + COALESCE(SUM(assistant_pd_money), 0) + COALESCE(SUM(assistant_cx_money), 0), 0) as gross_amount
|
||
FROM billiards_dwd.dwd_settlement_head
|
||
WHERE pay_time >= %s AND pay_time < %s
|
||
GROUP BY site_id, tenant_id, DATE(pay_time)
|
||
""",
|
||
"compare_columns": ["cash_pay_amount", "table_fee_amount", "goods_amount", "gross_amount"],
|
||
},
|
||
# 助教日度明细表 - 按助教+日期汇总服务次数、时长、金额
|
||
# 注意:DWD 表中使用 site_assistant_id,DWS 表中使用 assistant_id
|
||
"dws_assistant_daily_detail": {
|
||
"pk_columns": ["site_id", "assistant_id", "stat_date"],
|
||
"time_column": "stat_date",
|
||
"source_table": "billiards_dwd.dwd_assistant_service_log",
|
||
"source_time_column": "start_use_time",
|
||
"agg_sql": """
|
||
SELECT
|
||
site_id,
|
||
tenant_id,
|
||
site_assistant_id as assistant_id,
|
||
DATE(start_use_time) as stat_date,
|
||
COUNT(*) as total_service_count,
|
||
COALESCE(SUM(income_seconds), 0) as total_seconds,
|
||
COALESCE(SUM(ledger_amount), 0) as total_ledger_amount
|
||
FROM billiards_dwd.dwd_assistant_service_log
|
||
WHERE start_use_time >= %s AND start_use_time < %s
|
||
AND is_delete = 0
|
||
GROUP BY site_id, tenant_id, site_assistant_id, DATE(start_use_time)
|
||
""",
|
||
"compare_columns": ["total_service_count", "total_seconds", "total_ledger_amount"],
|
||
},
|
||
# 会员来店明细表 - 按会员+订单记录每次来店消费
|
||
# 注意:DWD 表主键是 order_settle_id,不是 id
|
||
"dws_member_visit_detail": {
|
||
"pk_columns": ["site_id", "member_id", "order_settle_id"],
|
||
"time_column": "visit_date",
|
||
"source_table": "billiards_dwd.dwd_settlement_head",
|
||
"source_time_column": "pay_time",
|
||
"agg_sql": """
|
||
SELECT
|
||
site_id,
|
||
tenant_id,
|
||
member_id,
|
||
order_settle_id,
|
||
DATE(pay_time) as visit_date,
|
||
COALESCE(table_charge_money, 0) as table_fee,
|
||
COALESCE(goods_money, 0) as goods_amount,
|
||
COALESCE(pay_amount, 0) as actual_pay
|
||
FROM billiards_dwd.dwd_settlement_head
|
||
WHERE pay_time >= %s AND pay_time < %s
|
||
AND member_id > 0
|
||
""",
|
||
"compare_columns": ["table_fee", "goods_amount", "actual_pay"],
|
||
},
|
||
}
|
||
|
||
def get_tables(self) -> List[str]:
|
||
"""获取需要校验的 DWS 汇总表列表"""
|
||
if self._table_config:
|
||
return list(self._table_config.keys())
|
||
|
||
sql = """
|
||
SELECT table_name
|
||
FROM information_schema.tables
|
||
WHERE table_schema = 'billiards_dws'
|
||
AND table_type = 'BASE TABLE'
|
||
AND table_name LIKE 'dws_%'
|
||
AND table_name NOT LIKE 'cfg_%'
|
||
ORDER BY table_name
|
||
"""
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
return [row[0] for row in cur.fetchall()]
|
||
except Exception as e:
|
||
self.logger.warning("获取 DWS 表列表失败: %s", e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
def get_primary_keys(self, table: str) -> List[str]:
|
||
"""获取表的主键列"""
|
||
if table in self._table_config:
|
||
return self._table_config[table].get("pk_columns", ["site_id", "stat_date"])
|
||
return ["site_id", "stat_date"]
|
||
|
||
def get_time_column(self, table: str) -> Optional[str]:
|
||
"""获取表的时间列"""
|
||
if table in self._table_config:
|
||
return self._table_config[table].get("time_column", "stat_date")
|
||
return "stat_date"
|
||
|
||
def fetch_source_keys(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> Set[Tuple]:
|
||
"""从 DWD 聚合获取源数据主键集合"""
|
||
config = self._table_config.get(table, {})
|
||
agg_sql = config.get("agg_sql")
|
||
|
||
if not agg_sql:
|
||
return set()
|
||
|
||
pk_cols = self.get_primary_keys(table)
|
||
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(agg_sql, (window_start, window_end))
|
||
columns = [desc[0] for desc in cur.description]
|
||
pk_indices = [columns.index(c) for c in pk_cols if c in columns]
|
||
return {tuple(row[i] for i in pk_indices) for row in cur.fetchall()}
|
||
except Exception as e:
|
||
self.logger.warning("获取 DWD 聚合主键失败: %s, error=%s", table, e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
raise VerificationFetchError(f"获取 DWD 聚合主键失败: {table}") from e
|
||
|
||
def fetch_target_keys(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> Set[Tuple]:
|
||
"""从 DWS 表获取目标数据主键集合"""
|
||
pk_cols = self.get_primary_keys(table)
|
||
time_col = self.get_time_column(table)
|
||
|
||
pk_select = ", ".join(pk_cols)
|
||
sql = f"""
|
||
SELECT {pk_select}
|
||
FROM billiards_dws.{table}
|
||
WHERE {time_col} >= %s AND {time_col} < %s
|
||
"""
|
||
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql, (window_start.date(), window_end.date()))
|
||
return {tuple(row) for row in cur.fetchall()}
|
||
except Exception as e:
|
||
self.logger.warning("获取 DWS 主键失败: %s, error=%s", table, e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
raise VerificationFetchError(f"获取 DWS 主键失败: {table}") from e
|
||
|
||
def fetch_source_hashes(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> Dict[Tuple, str]:
|
||
"""从 DWD 聚合获取数据,返回主键->聚合值字符串"""
|
||
config = self._table_config.get(table, {})
|
||
agg_sql = config.get("agg_sql")
|
||
compare_cols = config.get("compare_columns", [])
|
||
|
||
if not agg_sql:
|
||
return {}
|
||
|
||
pk_cols = self.get_primary_keys(table)
|
||
|
||
result = {}
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(agg_sql, (window_start, window_end))
|
||
columns = [desc[0] for desc in cur.description]
|
||
pk_indices = [columns.index(c) for c in pk_cols if c in columns]
|
||
value_indices = [columns.index(c) for c in compare_cols if c in columns]
|
||
|
||
for row in cur.fetchall():
|
||
pk = tuple(row[i] for i in pk_indices)
|
||
values = tuple(row[i] for i in value_indices)
|
||
result[pk] = str(values)
|
||
except Exception as e:
|
||
self.logger.warning("获取 DWD 聚合数据失败: %s, error=%s", table, e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
raise VerificationFetchError(f"获取 DWD 聚合数据失败: {table}") from e
|
||
|
||
return result
|
||
|
||
def fetch_target_hashes(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> Dict[Tuple, str]:
|
||
"""从 DWS 表获取数据,返回主键->值字符串"""
|
||
config = self._table_config.get(table, {})
|
||
compare_cols = config.get("compare_columns", [])
|
||
pk_cols = self.get_primary_keys(table)
|
||
time_col = self.get_time_column(table)
|
||
|
||
all_cols = pk_cols + compare_cols
|
||
col_select = ", ".join(all_cols)
|
||
|
||
sql = f"""
|
||
SELECT {col_select}
|
||
FROM billiards_dws.{table}
|
||
WHERE {time_col} >= %s AND {time_col} < %s
|
||
"""
|
||
|
||
result = {}
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql, (window_start.date(), window_end.date()))
|
||
|
||
for row in cur.fetchall():
|
||
pk = tuple(row[:len(pk_cols)])
|
||
values = tuple(row[len(pk_cols):])
|
||
result[pk] = str(values)
|
||
except Exception as e:
|
||
self.logger.warning("获取 DWS 数据失败: %s, error=%s", table, e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
raise VerificationFetchError(f"获取 DWS 数据失败: {table}") from e
|
||
|
||
return result
|
||
|
||
def backfill_missing(
|
||
self,
|
||
table: str,
|
||
missing_keys: Set[Tuple],
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> int:
|
||
"""批量补齐缺失数据(重新计算并插入)"""
|
||
if not missing_keys:
|
||
return 0
|
||
|
||
self.logger.info(
|
||
"DWS 补齐缺失: 表=%s, 数量=%d",
|
||
table, len(missing_keys)
|
||
)
|
||
|
||
# 在执行之前确保事务状态干净
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
|
||
# 重新计算汇总数据
|
||
return self._recalculate_and_upsert(table, window_start, window_end, missing_keys)
|
||
|
||
def backfill_mismatch(
|
||
self,
|
||
table: str,
|
||
mismatch_keys: Set[Tuple],
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> int:
|
||
"""批量更新不一致数据(重新计算并更新)"""
|
||
if not mismatch_keys:
|
||
return 0
|
||
|
||
self.logger.info(
|
||
"DWS 更新不一致: 表=%s, 数量=%d",
|
||
table, len(mismatch_keys)
|
||
)
|
||
|
||
# 在执行之前确保事务状态干净
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
|
||
# 重新计算汇总数据
|
||
return self._recalculate_and_upsert(table, window_start, window_end, mismatch_keys)
|
||
|
||
def _recalculate_and_upsert(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
target_keys: Optional[Set[Tuple]] = None,
|
||
) -> int:
|
||
"""重新计算汇总数据并 UPSERT"""
|
||
config = self._table_config.get(table, {})
|
||
agg_sql = config.get("agg_sql")
|
||
|
||
if not agg_sql:
|
||
return 0
|
||
|
||
pk_cols = self.get_primary_keys(table)
|
||
|
||
# 执行聚合查询
|
||
try:
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(agg_sql, (window_start, window_end))
|
||
columns = [desc[0] for desc in cur.description]
|
||
records = [dict(zip(columns, row)) for row in cur.fetchall()]
|
||
except Exception as e:
|
||
self.logger.error("聚合查询失败: %s", e)
|
||
try:
|
||
self.db.conn.rollback()
|
||
except Exception:
|
||
pass
|
||
return 0
|
||
|
||
if not records:
|
||
return 0
|
||
|
||
# 如果指定了目标主键,只处理这些记录
|
||
if target_keys:
|
||
records = [
|
||
r for r in records
|
||
if tuple(r.get(c) for c in pk_cols) in target_keys
|
||
]
|
||
|
||
if not records:
|
||
return 0
|
||
|
||
# 构建 UPSERT SQL
|
||
col_list = ", ".join(columns)
|
||
placeholders = ", ".join(["%s"] * len(columns))
|
||
pk_list = ", ".join(pk_cols)
|
||
|
||
update_cols = [c for c in columns if c not in pk_cols]
|
||
update_set = ", ".join(f"{c} = EXCLUDED.{c}" for c in update_cols)
|
||
|
||
upsert_sql = f"""
|
||
INSERT INTO billiards_dws.{table} ({col_list})
|
||
VALUES ({placeholders})
|
||
ON CONFLICT ({pk_list}) DO UPDATE SET {update_set}
|
||
"""
|
||
|
||
count = 0
|
||
with self.db.conn.cursor() as cur:
|
||
for record in records:
|
||
values = [record.get(c) for c in columns]
|
||
try:
|
||
cur.execute(upsert_sql, values)
|
||
count += 1
|
||
except Exception as e:
|
||
self.logger.warning("UPSERT 失败: %s", e)
|
||
|
||
self.db.commit()
|
||
return count
|
||
|
||
def verify_aggregation(
|
||
self,
|
||
table: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
详细校验聚合数据
|
||
|
||
返回源和目标的详细对比
|
||
"""
|
||
config = self._table_config.get(table, {})
|
||
compare_cols = config.get("compare_columns", [])
|
||
|
||
source_hashes = self.fetch_source_hashes(table, window_start, window_end)
|
||
target_hashes = self.fetch_target_hashes(table, window_start, window_end)
|
||
|
||
source_keys = set(source_hashes.keys())
|
||
target_keys = set(target_hashes.keys())
|
||
|
||
missing = source_keys - target_keys
|
||
extra = target_keys - source_keys
|
||
|
||
# 对比数值
|
||
mismatch_details = []
|
||
for key in source_keys & target_keys:
|
||
if source_hashes[key] != target_hashes[key]:
|
||
mismatch_details.append({
|
||
"key": key,
|
||
"source": source_hashes[key],
|
||
"target": target_hashes[key],
|
||
})
|
||
|
||
return {
|
||
"table": table,
|
||
"window": f"{window_start.date()} ~ {window_end.date()}",
|
||
"source_count": len(source_hashes),
|
||
"target_count": len(target_hashes),
|
||
"missing_count": len(missing),
|
||
"extra_count": len(extra),
|
||
"mismatch_count": len(mismatch_details),
|
||
"is_consistent": len(missing) == 0 and len(mismatch_details) == 0,
|
||
"missing_keys": list(missing)[:10], # 只返回前10个
|
||
"mismatch_details": mismatch_details[:10],
|
||
}
|