# -*- coding: utf-8 -*- """INDEX 层批量校验器。""" import logging from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Set, Tuple from .base_verifier import BaseVerifier, VerificationFetchError class IndexVerifier(BaseVerifier): """INDEX 层校验器(覆盖率校验 + 重算补齐)。""" def __init__( self, db_connection: Any, logger: Optional[logging.Logger] = None, lookback_days: int = 60, config: Any = None, ): super().__init__(db_connection, logger) self.lookback_days = lookback_days self.config = config self._table_config = self._load_table_config() @property def layer_name(self) -> str: return "INDEX" def _load_table_config(self) -> Dict[str, dict]: """加载 INDEX 表配置。""" return { "v_member_recall_priority": { "pk_columns": ["site_id", "member_id"], "time_column": "calc_time", "entity_sql": """ WITH params AS ( SELECT %s::timestamp AS start_time, %s::timestamp AS end_time ), visit_members AS ( SELECT DISTINCT s.site_id, s.member_id FROM billiards_dwd.dwd_settlement_head s CROSS JOIN params p WHERE s.pay_time >= p.start_time AND s.pay_time < p.end_time AND s.member_id > 0 AND ( s.settle_type = 1 OR ( s.settle_type = 3 AND EXISTS ( SELECT 1 FROM billiards_dwd.dwd_assistant_service_log asl JOIN billiards_dws.cfg_skill_type st ON asl.skill_id = st.skill_id AND st.course_type_code = 'BONUS' AND st.is_active = TRUE WHERE asl.order_settle_id = s.order_settle_id AND asl.site_id = s.site_id AND asl.tenant_member_id = s.member_id AND asl.is_delete = 0 ) ) ) ), recharge_members AS ( SELECT DISTINCT r.site_id, r.member_id FROM billiards_dwd.dwd_recharge_order r CROSS JOIN params p WHERE r.pay_time >= p.start_time AND r.pay_time < p.end_time AND r.member_id > 0 AND r.settle_type = 5 ) SELECT site_id, member_id FROM visit_members UNION SELECT site_id, member_id FROM recharge_members """, # 该视图由 WBI + NCI 共同产出,缺失时需同时触发两类重算 "task_codes": ["DWS_WINBACK_INDEX", "DWS_NEWCONV_INDEX"], "description": "客户召回/转化优先级视图", }, "dws_member_assistant_relation_index": { "pk_columns": ["site_id", "member_id", "assistant_id"], "time_column": "calc_time", "entity_sql": """ WITH params AS ( SELECT %s::timestamp AS start_time, %s::timestamp AS end_time ), service_pairs AS ( SELECT DISTINCT s.site_id, s.tenant_member_id AS member_id, d.assistant_id FROM billiards_dwd.dwd_assistant_service_log s JOIN billiards_dwd.dim_assistant d ON s.user_id = d.user_id AND d.scd2_is_current = 1 AND COALESCE(d.is_delete, 0) = 0 CROSS JOIN params p WHERE s.last_use_time >= p.start_time AND s.last_use_time < p.end_time AND s.tenant_member_id > 0 AND s.user_id > 0 AND s.is_delete = 0 ), manual_pairs AS ( SELECT DISTINCT m.site_id, m.member_id, m.assistant_id FROM billiards_dws.dws_ml_manual_order_alloc m CROSS JOIN params p WHERE m.pay_time >= p.start_time AND m.pay_time < p.end_time AND m.member_id > 0 AND m.assistant_id > 0 ) SELECT site_id, member_id, assistant_id FROM service_pairs UNION SELECT site_id, member_id, assistant_id FROM manual_pairs """, "task_code": "DWS_RELATION_INDEX", "description": "客户-助教关系指数", }, } def get_tables(self) -> List[str]: return list(self._table_config.keys()) def get_primary_keys(self, table: str) -> List[str]: if table in self._table_config: return self._table_config[table].get("pk_columns", []) self.logger.warning("表 %s 未在 INDEX 校验配置中定义,跳过", table) return [] def get_time_column(self, table: str) -> Optional[str]: if table in self._table_config: return self._table_config[table].get("time_column", "calc_time") return "calc_time" def fetch_source_keys( self, table: str, window_start: datetime, window_end: datetime, ) -> Set[Tuple]: config = self._table_config.get(table, {}) entity_sql = config.get("entity_sql") if not entity_sql: return set() actual_start = window_end - timedelta(days=self.lookback_days) try: with self.db.conn.cursor() as cur: cur.execute(entity_sql, (actual_start, window_end)) return {tuple(row) for row in cur.fetchall()} except Exception as exc: self.logger.warning("获取源实体失败: table=%s error=%s", table, exc) try: self.db.conn.rollback() except Exception: pass raise VerificationFetchError(f"获取源实体失败: {table}") from exc def fetch_target_keys( self, table: str, window_start: datetime, window_end: datetime, ) -> Set[Tuple]: pk_cols = self.get_primary_keys(table) if not pk_cols: self.logger.debug("表 %s 没有主键配置,跳过目标读取", table) return set() pk_select = ", ".join(pk_cols) sql = f""" SELECT DISTINCT {pk_select} FROM billiards_dws.{table} """ try: with self.db.conn.cursor() as cur: cur.execute(sql) return {tuple(row) for row in cur.fetchall()} except Exception as exc: self.logger.warning("获取目标实体失败: table=%s error=%s", table, exc) try: self.db.conn.rollback() except Exception: pass raise VerificationFetchError(f"获取目标实体失败: {table}") from exc def fetch_source_hashes( self, table: str, window_start: datetime, window_end: datetime, ) -> Dict[Tuple, str]: keys = self.fetch_source_keys(table, window_start, window_end) return {k: "1" for k in keys} def fetch_target_hashes( self, table: str, window_start: datetime, window_end: datetime, ) -> Dict[Tuple, str]: keys = self.fetch_target_keys(table, window_start, window_end) return {k: "1" for k in keys} def backfill_missing( self, table: str, missing_keys: Set[Tuple], window_start: datetime, window_end: datetime, ) -> int: if not missing_keys: return 0 config = self._table_config.get(table, {}) task_codes = config.get("task_codes") if not task_codes: task_code = config.get("task_code") task_codes = [task_code] if task_code else [] if not task_codes: self.logger.warning("未找到补齐任务配置: table=%s", table) return 0 self.logger.info( "INDEX 补齐: table=%s missing=%d task_codes=%s", table, len(missing_keys), ",".join(task_codes), ) try: self.db.conn.rollback() except Exception: pass try: task_config = self.config if task_config is None: from config.settings import AppConfig task_config = AppConfig.load() inserted_total = 0 for task_code in task_codes: if task_code == "DWS_RECALL_INDEX": from tasks.dws.index.recall_index_task import RecallIndexTask task = RecallIndexTask(task_config, self.db, None, self.logger) elif task_code == "DWS_WINBACK_INDEX": from tasks.dws.index.winback_index_task import WinbackIndexTask task = WinbackIndexTask(task_config, self.db, None, self.logger) elif task_code == "DWS_NEWCONV_INDEX": from tasks.dws.index.newconv_index_task import NewconvIndexTask task = NewconvIndexTask(task_config, self.db, None, self.logger) elif task_code == "DWS_INTIMACY_INDEX": from tasks.dws.index.intimacy_index_task import IntimacyIndexTask task = IntimacyIndexTask(task_config, self.db, None, self.logger) elif task_code == "DWS_RELATION_INDEX": from tasks.dws.index.relation_index_task import RelationIndexTask task = RelationIndexTask(task_config, self.db, None, self.logger) else: self.logger.warning("未知 INDEX 任务代码,跳过: %s", task_code) continue self.logger.info("执行 INDEX 补齐任务: %s", task_code) result = task.execute(None) inserted_total += result.get("records_inserted", 0) + result.get("records_updated", 0) return inserted_total except Exception as exc: self.logger.error("INDEX 补齐失败: %s", exc) try: self.db.conn.rollback() except Exception: pass return 0 def backfill_mismatch( self, table: str, mismatch_keys: Set[Tuple], window_start: datetime, window_end: datetime, ) -> int: return 0 def verify_coverage( self, table: str, window_end: Optional[datetime] = None, ) -> Dict[str, Any]: if window_end is None: window_end = datetime.now() window_start = window_end - timedelta(days=self.lookback_days) config = self._table_config.get(table, {}) description = config.get("description", table) source_keys = self.fetch_source_keys(table, window_start, window_end) target_keys = self.fetch_target_keys(table, window_start, window_end) missing = source_keys - target_keys extra = target_keys - source_keys coverage_rate = len(target_keys & source_keys) / len(source_keys) * 100 if source_keys else 100.0 return { "table": table, "description": description, "lookback_days": self.lookback_days, "window": f"{window_start.date()} ~ {window_end.date()}", "source_entities": len(source_keys), "indexed_entities": len(target_keys), "missing_count": len(missing), "extra_count": len(extra), "coverage_rate": round(coverage_rate, 2), "is_complete": len(missing) == 0, "missing_sample": list(missing)[:10], } def verify_all_indices( self, window_end: Optional[datetime] = None, ) -> Dict[str, dict]: results = {} for table in self.get_tables(): results[table] = self.verify_coverage(table, window_end) return results def get_missing_entities( self, table: str, limit: int = 100, window_end: Optional[datetime] = None, ) -> List[Tuple]: if window_end is None: window_end = datetime.now() window_start = window_end - timedelta(days=self.lookback_days) source_keys = self.fetch_source_keys(table, window_start, window_end) target_keys = self.fetch_target_keys(table, window_start, window_end) missing = source_keys - target_keys return list(missing)[:limit]