# -*- coding: utf-8 -*- """ DWS 统一维护任务 合并原 BaseMvRefreshTask(物化视图刷新)和 DwsRetentionCleanupTask(历史数据清理) 为单一维护任务 DWS_MAINTENANCE,简化调度配置。 执行顺序:先刷新物化视图,再清理历史数据。 任一子功能可通过配置独立禁用。 """ from __future__ import annotations import json from datetime import date from typing import Any, Dict, List, Optional from .base_dws_task import BaseDwsTask, TaskContext, TimeLayer class DwsMaintenanceTask(BaseDwsTask): """合并 MV 刷新 + 数据清理为单一维护任务。 配置项: dws.mv.enabled — 是否启用物化视图刷新(默认 False) dws.retention.enabled — 是否启用历史数据清理(默认 False) dws.mv.tables — 需要刷新的表列表(逗号分隔) dws.retention.tables — 需要清理的表列表(逗号分隔) dws.retention.layer — 默认清理层级 dws.retention.table_layers — 表级清理层级覆盖(JSON dict) """ # ── MV 刷新相关常量 ────────────────────────────────────── VIEW_PREFIX = "mv_" LAYER_ORDER = [ TimeLayer.LAST_2_DAYS, TimeLayer.LAST_1_MONTH, TimeLayer.LAST_3_MONTHS, TimeLayer.LAST_6_MONTHS, ] LAYER_SUFFIX = { TimeLayer.LAST_2_DAYS: "l1", TimeLayer.LAST_1_MONTH: "l2", TimeLayer.LAST_3_MONTHS: "l3", TimeLayer.LAST_6_MONTHS: "l4", } # 需要刷新物化视图的基表列表 MV_BASE_TABLES = [ "dws_finance_daily_summary", "dws_assistant_daily_detail", ] # ── 数据清理相关常量 ───────────────────────────────────── # 默认需要清理的表及其日期列 DEFAULT_RETENTION_TABLES = [ {"table": "dws_assistant_daily_detail", "date_col": "stat_date"}, {"table": "dws_assistant_monthly_summary", "date_col": "stat_month"}, {"table": "dws_assistant_customer_stats", "date_col": "stat_date"}, {"table": "dws_assistant_salary_calc", "date_col": "salary_month"}, {"table": "dws_assistant_recharge_commission", "date_col": "commission_month"}, {"table": "dws_assistant_finance_analysis", "date_col": "stat_date"}, {"table": "dws_member_consumption_summary", "date_col": "stat_date"}, {"table": "dws_member_visit_detail", "date_col": "visit_date"}, {"table": "dws_finance_daily_summary", "date_col": "stat_date"}, {"table": "dws_finance_income_structure", "date_col": "stat_date"}, {"table": "dws_finance_discount_detail", "date_col": "stat_date"}, {"table": "dws_finance_recharge_summary", "date_col": "stat_date"}, {"table": "dws_finance_expense_summary", "date_col": "expense_month"}, {"table": "dws_platform_settlement", "date_col": "settlement_date"}, ] def get_task_code(self) -> str: return "DWS_MAINTENANCE" def get_target_table(self) -> str: # 维护任务不写入单一目标表 return "dws_maintenance" def get_primary_keys(self) -> List[str]: return [] # ── E/T/L 实现 ─────────────────────────────────────────── def extract(self, context: TaskContext) -> Dict[str, Any]: """维护任务无需提取业务数据。""" return {"site_id": context.store_id} def transform(self, extracted: Dict[str, Any], context: TaskContext) -> Dict[str, Any]: """维护任务无需转换。""" return extracted def load(self, transformed: Dict[str, Any], context: TaskContext) -> Dict[str, Any]: """按顺序执行:① 物化视图刷新 → ② 历史数据清理。 任一子功能可通过配置独立禁用。 """ stats: Dict[str, Any] = {"refreshed": 0, "cleaned": 0} # ① 物化视图刷新(先执行,满足需求 4.2) if self._is_mv_enabled(): stats["refreshed"] = self._refresh_all_views() else: self.logger.info("DWS_MAINTENANCE: 物化视图刷新未启用,跳过") # ② 历史数据清理 if self._is_retention_enabled(): stats["cleaned"] = self._cleanup_all_tables(context) else: self.logger.info("DWS_MAINTENANCE: 历史数据清理未启用,跳过") return {"counts": stats} # ══════════════════════════════════════════════════════════ # 物化视图刷新逻辑(源自 BaseMvRefreshTask) # ══════════════════════════════════════════════════════════ def _is_mv_enabled(self) -> bool: """检查物化视图刷新是否启用。""" return bool(self.config.get("dws.mv.enabled", False)) def _refresh_all_views(self) -> int: """刷新所有已配置基表的物化视图,返回刷新视图数。""" refreshed = 0 for base_table in self.MV_BASE_TABLES: if not self._is_table_in_mv_scope(base_table): continue layers = self._resolve_mv_layers(base_table) for layer in layers: view_name = self._get_view_name(base_table, layer) if not view_name: continue if not self._view_exists(view_name): self.logger.warning( "DWS_MAINTENANCE: 物化视图不存在,跳过 %s", view_name ) continue self._refresh_view(view_name) refreshed += 1 self.logger.info("DWS_MAINTENANCE: 物化视图刷新完成,数量=%d", refreshed) return refreshed def _is_table_in_mv_scope(self, base_table: str) -> bool: """判断基表是否在 MV 刷新范围内。""" tables = self._parse_list(self.config.get("dws.mv.tables")) if not tables: tables = self._parse_list(self.config.get("dws.retention.tables")) # 未配置表列表时默认刷新所有 if not tables: return True return base_table in tables def _resolve_mv_layers(self, base_table: str) -> List[TimeLayer]: """解析某基表需要刷新的时间分层列表。""" # 显式配置优先 configured = self._parse_time_layers(self.config.get("dws.mv.layers")) if configured: return configured # 表级覆盖 table_layers = self._resolve_layer_map( self.config.get("dws.mv.table_layers") or self.config.get("dws.retention.table_layers") ) layer_name = table_layers.get(base_table) if layer_name: layer = self._get_layer(layer_name) if layer and layer != TimeLayer.ALL: return self._layers_up_to(layer) # 默认使用 retention.layer retention_layer = self._get_layer(self.config.get("dws.retention.layer")) if retention_layer and retention_layer != TimeLayer.ALL: return self._layers_up_to(retention_layer) return list(self.LAYER_ORDER) def _layers_up_to(self, target: TimeLayer) -> List[TimeLayer]: """返回从 L1 到 target(含)的分层列表。""" layers = [] for layer in self.LAYER_ORDER: layers.append(layer) if layer == target: break return layers def _get_view_name(self, base_table: str, layer: TimeLayer) -> Optional[str]: suffix = self.LAYER_SUFFIX.get(layer) if not suffix or not base_table: return None return f"{self.VIEW_PREFIX}{base_table}_{suffix}" def _view_exists(self, view_name: str) -> bool: sql = "SELECT to_regclass(%s) AS reg" rows = self.db.query(sql, (f"{self.DWS_SCHEMA}.{view_name}",)) return bool(rows and rows[0].get("reg")) def _refresh_view(self, view_name: str) -> None: concurrently = bool(self.config.get("dws.mv.refresh_concurrently", False)) keyword = "CONCURRENTLY " if concurrently else "" sql = f"REFRESH MATERIALIZED VIEW {keyword}{self.DWS_SCHEMA}.{view_name}" self.db.execute(sql) # ══════════════════════════════════════════════════════════ # 历史数据清理逻辑(源自 DwsRetentionCleanupTask) # ══════════════════════════════════════════════════════════ def _is_retention_enabled(self) -> bool: """检查历史数据清理是否启用。""" return bool(self.config.get("dws.retention.enabled", False)) def _cleanup_all_tables(self, context: TaskContext) -> int: """清理所有已配置表的历史数据,返回总删除行数。""" base_date = ( context.window_end.date() if hasattr(context.window_end, "date") else context.window_end ) default_layer = self._get_layer( self.config.get("dws.retention.layer", "ALL") ) if default_layer is None: self.logger.warning("DWS_MAINTENANCE: 未识别的清理层级,跳过清理") return 0 target_tables = self._resolve_retention_tables() if not target_tables: self.logger.info("DWS_MAINTENANCE: 未配置需要清理的表,跳过清理") return 0 table_layers = self._resolve_table_layers() total_deleted = 0 for item in target_tables: table = item["table"] date_col = item["date_col"] layer_name = table_layers.get(table, default_layer.value) layer = self._get_layer(layer_name) if layer is None or layer == TimeLayer.ALL: continue time_range = self.get_time_layer_range(layer, base_date) cutoff = self._normalize_cutoff(date_col, time_range.start) deleted = self._cleanup_table(table, date_col, cutoff, context.store_id) total_deleted += deleted self.logger.info("DWS_MAINTENANCE: 数据清理完成,总删除 %d 行", total_deleted) return total_deleted def _resolve_retention_tables(self) -> List[Dict[str, str]]: """解析需要清理的目标表列表。""" table_list = self.config.get("dws.retention.tables") if not table_list: return self.DEFAULT_RETENTION_TABLES if isinstance(table_list, str): names = [t.strip() for t in table_list.split(",") if t.strip()] else: names = list(table_list) return [ item for item in self.DEFAULT_RETENTION_TABLES if item["table"] in names ] def _resolve_table_layers(self) -> Dict[str, str]: """解析表级清理层级覆盖配置。""" raw = self.config.get("dws.retention.table_layers") if not raw: return {} if isinstance(raw, dict): return {str(k): str(v) for k, v in raw.items()} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): return {str(k): str(v) for k, v in parsed.items()} except json.JSONDecodeError: return {} return {} def _normalize_cutoff(self, date_col: str, cutoff: date) -> date: """月度列的截止日期归一化到月初。""" monthly_cols = {"stat_month", "salary_month", "commission_month", "expense_month"} if date_col in monthly_cols: return cutoff.replace(day=1) return cutoff def _cleanup_table(self, table: str, date_col: str, cutoff: date, site_id: int) -> int: """删除指定表中早于截止日期的历史数据。""" full_table = f"{self.DWS_SCHEMA}.{table}" sql = f"DELETE FROM {full_table} WHERE site_id = %s AND {date_col} < %s" with self.db.conn.cursor() as cur: cur.execute(sql, (site_id, cutoff)) return cur.rowcount # ══════════════════════════════════════════════════════════ # 共享工具方法 # ══════════════════════════════════════════════════════════ def _get_layer(self, layer_name: Optional[str]) -> Optional[TimeLayer]: if not layer_name: return None name = str(layer_name).upper() try: return TimeLayer[name] except KeyError: return None def _resolve_layer_map(self, raw: Any) -> Dict[str, str]: if not raw: return {} if isinstance(raw, dict): return {str(k): str(v) for k, v in raw.items()} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): return {str(k): str(v) for k, v in parsed.items()} except json.JSONDecodeError: return {} return {} def _parse_time_layers(self, raw: Any) -> List[TimeLayer]: """解析逗号分隔的时间分层配置。""" if not raw: return [] if isinstance(raw, str): items = [v.strip() for v in raw.split(",") if v.strip()] elif isinstance(raw, (list, tuple, set)): items = [str(v).strip() for v in raw if str(v).strip()] else: return [] layers = [] for item in items: layer = self._get_layer(item) if layer and layer not in layers: layers.append(layer) return layers def _parse_list(self, raw: Any) -> List[str]: """解析逗号分隔的字符串列表。""" if not raw: return [] if isinstance(raw, str): return [v.strip() for v in raw.split(",") if v.strip()] if isinstance(raw, (list, tuple, set)): return [str(v).strip() for v in raw if str(v).strip()] return [] __all__ = ["DwsMaintenanceTask"]