162 lines
6.2 KiB
Python
162 lines
6.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
DWS 时间分层清理任务
|
||
|
||
功能说明:
|
||
按配置的时间分层范围,对 DWS 表执行历史数据清理。
|
||
该任务默认不启用,需通过配置显式开启。
|
||
|
||
配置示例(.env / settings):
|
||
DWS_RETENTION_ENABLED=true
|
||
DWS_RETENTION_LAYER=LAST_3_MONTHS
|
||
DWS_RETENTION_TABLES=dws_finance_daily_summary,dws_assistant_daily_detail
|
||
DWS_RETENTION_TABLE_LAYERS={"dws_finance_expense_summary":"ALL"}
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-03
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from datetime import date
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from .base_dws_task import BaseDwsTask, TaskContext, TimeLayer
|
||
|
||
|
||
class DwsRetentionCleanupTask(BaseDwsTask):
|
||
"""
|
||
DWS 时间分层清理任务
|
||
"""
|
||
|
||
DEFAULT_TABLES = [
|
||
{"table": "dws_assistant_daily_detail", "date_col": "stat_date"},
|
||
{"table": "dws_assistant_monthly_summary", "date_col": "stat_month"},
|
||
{"table": "dws_assistant_customer_stats", "date_col": "stat_date"},
|
||
{"table": "dws_assistant_salary_calc", "date_col": "salary_month"},
|
||
{"table": "dws_assistant_recharge_commission", "date_col": "commission_month"},
|
||
{"table": "dws_assistant_finance_analysis", "date_col": "stat_date"},
|
||
{"table": "dws_member_consumption_summary", "date_col": "stat_date"},
|
||
{"table": "dws_member_visit_detail", "date_col": "visit_date"},
|
||
{"table": "dws_finance_daily_summary", "date_col": "stat_date"},
|
||
{"table": "dws_finance_income_structure", "date_col": "stat_date"},
|
||
{"table": "dws_finance_discount_detail", "date_col": "stat_date"},
|
||
{"table": "dws_finance_recharge_summary", "date_col": "stat_date"},
|
||
{"table": "dws_finance_expense_summary", "date_col": "expense_month"},
|
||
{"table": "dws_platform_settlement", "date_col": "settlement_date"},
|
||
]
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_RETENTION_CLEANUP"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_finance_daily_summary"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return []
|
||
|
||
def extract(self, context: TaskContext) -> Dict[str, Any]:
|
||
return {"site_id": context.store_id}
|
||
|
||
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> Dict[str, Any]:
|
||
return extracted
|
||
|
||
def load(self, transformed: Dict[str, Any], context: TaskContext) -> Dict:
|
||
"""
|
||
执行清理逻辑
|
||
"""
|
||
if not self._is_retention_enabled():
|
||
self.logger.info("%s: 未启用清理配置,跳过", self.get_task_code())
|
||
return {"counts": {"cleaned": 0}}
|
||
|
||
base_date = context.window_end.date() if hasattr(context.window_end, "date") else context.window_end
|
||
default_layer = self._get_retention_layer(self.config.get("dws.retention.layer", "ALL"))
|
||
if default_layer is None:
|
||
self.logger.warning("%s: 未识别的清理层级,跳过", self.get_task_code())
|
||
return {"counts": {"cleaned": 0}}
|
||
|
||
target_tables = self._resolve_target_tables()
|
||
if not target_tables:
|
||
self.logger.info("%s: 未配置需要清理的表,跳过", self.get_task_code())
|
||
return {"counts": {"cleaned": 0}}
|
||
|
||
table_layers = self._resolve_table_layers()
|
||
|
||
total_deleted = 0
|
||
details = []
|
||
for item in target_tables:
|
||
table = item["table"]
|
||
date_col = item["date_col"]
|
||
layer_name = table_layers.get(table, default_layer.value)
|
||
layer = self._get_retention_layer(layer_name)
|
||
if layer is None or layer == TimeLayer.ALL:
|
||
continue
|
||
|
||
time_range = self.get_time_layer_range(layer, base_date)
|
||
cutoff = self._normalize_cutoff(date_col, time_range.start)
|
||
deleted = self._cleanup_table(table, date_col, cutoff, context.store_id)
|
||
total_deleted += deleted
|
||
details.append({"table": table, "deleted": deleted, "cutoff": str(cutoff)})
|
||
|
||
self.logger.info("%s: 清理完成,总删除 %d 行", self.get_task_code(), total_deleted)
|
||
return {"counts": {"cleaned": total_deleted}, "extra": {"details": details}}
|
||
|
||
def _is_retention_enabled(self) -> bool:
|
||
return bool(self.config.get("dws.retention.enabled", False))
|
||
|
||
def _get_retention_layer(self, layer_name: Optional[str]) -> Optional[TimeLayer]:
|
||
if not layer_name:
|
||
return None
|
||
name = str(layer_name).upper()
|
||
try:
|
||
return TimeLayer[name]
|
||
except KeyError:
|
||
return None
|
||
|
||
def _resolve_target_tables(self) -> List[Dict[str, str]]:
|
||
table_list = self.config.get("dws.retention.tables")
|
||
if not table_list:
|
||
return self.DEFAULT_TABLES
|
||
|
||
if isinstance(table_list, str):
|
||
names = [t.strip() for t in table_list.split(",") if t.strip()]
|
||
else:
|
||
names = list(table_list)
|
||
|
||
selected = []
|
||
for item in self.DEFAULT_TABLES:
|
||
if item["table"] in names:
|
||
selected.append(item)
|
||
return selected
|
||
|
||
def _resolve_table_layers(self) -> Dict[str, str]:
|
||
raw = self.config.get("dws.retention.table_layers")
|
||
if not raw:
|
||
return {}
|
||
if isinstance(raw, dict):
|
||
return {str(k): str(v) for k, v in raw.items()}
|
||
if isinstance(raw, str):
|
||
try:
|
||
parsed = json.loads(raw)
|
||
if isinstance(parsed, dict):
|
||
return {str(k): str(v) for k, v in parsed.items()}
|
||
except json.JSONDecodeError:
|
||
return {}
|
||
return {}
|
||
|
||
def _normalize_cutoff(self, date_col: str, cutoff: date) -> date:
|
||
monthly_cols = {"stat_month", "salary_month", "commission_month", "expense_month"}
|
||
if date_col in monthly_cols:
|
||
return cutoff.replace(day=1)
|
||
return cutoff
|
||
|
||
def _cleanup_table(self, table: str, date_col: str, cutoff: date, site_id: int) -> int:
|
||
full_table = f"{self.DWS_SCHEMA}.{table}"
|
||
sql = f"DELETE FROM {full_table} WHERE site_id = %s AND {date_col} < %s"
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql, (site_id, cutoff))
|
||
return cur.rowcount
|
||
|
||
|
||
__all__ = ["DwsRetentionCleanupTask"]
|