# -*- coding: utf-8 -*- """ DWS 时间分层清理任务 功能说明: 按配置的时间分层范围,对 DWS 表执行历史数据清理。 该任务默认不启用,需通过配置显式开启。 配置示例(.env / settings): DWS_RETENTION_ENABLED=true DWS_RETENTION_LAYER=LAST_3_MONTHS DWS_RETENTION_TABLES=dws_finance_daily_summary,dws_assistant_daily_detail DWS_RETENTION_TABLE_LAYERS={"dws_finance_expense_summary":"ALL"} 作者:ETL团队 创建日期:2026-02-03 """ from __future__ import annotations import json from datetime import date from typing import Any, Dict, List, Optional from .base_dws_task import BaseDwsTask, TaskContext, TimeLayer class DwsRetentionCleanupTask(BaseDwsTask): """ DWS 时间分层清理任务 """ DEFAULT_TABLES = [ {"table": "dws_assistant_daily_detail", "date_col": "stat_date"}, {"table": "dws_assistant_monthly_summary", "date_col": "stat_month"}, {"table": "dws_assistant_customer_stats", "date_col": "stat_date"}, {"table": "dws_assistant_salary_calc", "date_col": "salary_month"}, {"table": "dws_assistant_recharge_commission", "date_col": "commission_month"}, {"table": "dws_assistant_finance_analysis", "date_col": "stat_date"}, {"table": "dws_member_consumption_summary", "date_col": "stat_date"}, {"table": "dws_member_visit_detail", "date_col": "visit_date"}, {"table": "dws_finance_daily_summary", "date_col": "stat_date"}, {"table": "dws_finance_income_structure", "date_col": "stat_date"}, {"table": "dws_finance_discount_detail", "date_col": "stat_date"}, {"table": "dws_finance_recharge_summary", "date_col": "stat_date"}, {"table": "dws_finance_expense_summary", "date_col": "expense_month"}, {"table": "dws_platform_settlement", "date_col": "settlement_date"}, ] def get_task_code(self) -> str: return "DWS_RETENTION_CLEANUP" def get_target_table(self) -> str: return "dws_finance_daily_summary" def get_primary_keys(self) -> List[str]: return [] def extract(self, context: TaskContext) -> Dict[str, Any]: return {"site_id": context.store_id} def transform(self, extracted: Dict[str, Any], context: TaskContext) -> Dict[str, Any]: return extracted def load(self, transformed: Dict[str, Any], context: TaskContext) -> Dict: """ 执行清理逻辑 """ if not self._is_retention_enabled(): self.logger.info("%s: 未启用清理配置,跳过", self.get_task_code()) return {"counts": {"cleaned": 0}} base_date = context.window_end.date() if hasattr(context.window_end, "date") else context.window_end default_layer = self._get_retention_layer(self.config.get("dws.retention.layer", "ALL")) if default_layer is None: self.logger.warning("%s: 未识别的清理层级,跳过", self.get_task_code()) return {"counts": {"cleaned": 0}} target_tables = self._resolve_target_tables() if not target_tables: self.logger.info("%s: 未配置需要清理的表,跳过", self.get_task_code()) return {"counts": {"cleaned": 0}} table_layers = self._resolve_table_layers() total_deleted = 0 details = [] for item in target_tables: table = item["table"] date_col = item["date_col"] layer_name = table_layers.get(table, default_layer.value) layer = self._get_retention_layer(layer_name) if layer is None or layer == TimeLayer.ALL: continue time_range = self.get_time_layer_range(layer, base_date) cutoff = self._normalize_cutoff(date_col, time_range.start) deleted = self._cleanup_table(table, date_col, cutoff, context.store_id) total_deleted += deleted details.append({"table": table, "deleted": deleted, "cutoff": str(cutoff)}) self.logger.info("%s: 清理完成,总删除 %d 行", self.get_task_code(), total_deleted) return {"counts": {"cleaned": total_deleted}, "extra": {"details": details}} def _is_retention_enabled(self) -> bool: return bool(self.config.get("dws.retention.enabled", False)) def _get_retention_layer(self, layer_name: Optional[str]) -> Optional[TimeLayer]: if not layer_name: return None name = str(layer_name).upper() try: return TimeLayer[name] except KeyError: return None def _resolve_target_tables(self) -> List[Dict[str, str]]: table_list = self.config.get("dws.retention.tables") if not table_list: return self.DEFAULT_TABLES if isinstance(table_list, str): names = [t.strip() for t in table_list.split(",") if t.strip()] else: names = list(table_list) selected = [] for item in self.DEFAULT_TABLES: if item["table"] in names: selected.append(item) return selected def _resolve_table_layers(self) -> Dict[str, str]: raw = self.config.get("dws.retention.table_layers") if not raw: return {} if isinstance(raw, dict): return {str(k): str(v) for k, v in raw.items()} if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, dict): return {str(k): str(v) for k, v in parsed.items()} except json.JSONDecodeError: return {} return {} def _normalize_cutoff(self, date_col: str, cutoff: date) -> date: monthly_cols = {"stat_month", "salary_month", "commission_month", "expense_month"} if date_col in monthly_cols: return cutoff.replace(day=1) return cutoff def _cleanup_table(self, table: str, date_col: str, cutoff: date, site_id: int) -> int: full_table = f"{self.DWS_SCHEMA}.{table}" sql = f"DELETE FROM {full_table} WHERE site_id = %s AND {date_col} < %s" with self.db.conn.cursor() as cur: cur.execute(sql, (site_id, cutoff)) return cur.rowcount __all__ = ["DwsRetentionCleanupTask"]