Files
Neo-ZQYY/apps/etl/connectors/feiqiu/tasks/dws/maintenance_task.py

357 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
DWS 统一维护任务
合并原 BaseMvRefreshTask物化视图刷新和 DwsRetentionCleanupTask历史数据清理
为单一维护任务 DWS_MAINTENANCE简化调度配置。
执行顺序:先刷新物化视图,再清理历史数据。
任一子功能可通过配置独立禁用。
"""
from __future__ import annotations
import json
from datetime import date
from typing import Any, Dict, List, Optional
from .base_dws_task import BaseDwsTask, TaskContext, TimeLayer
class DwsMaintenanceTask(BaseDwsTask):
"""合并 MV 刷新 + 数据清理为单一维护任务。
配置项:
dws.mv.enabled — 是否启用物化视图刷新(默认 False
dws.retention.enabled — 是否启用历史数据清理(默认 False
dws.mv.tables — 需要刷新的表列表(逗号分隔)
dws.retention.tables — 需要清理的表列表(逗号分隔)
dws.retention.layer — 默认清理层级
dws.retention.table_layers — 表级清理层级覆盖JSON dict
"""
# ── MV 刷新相关常量 ──────────────────────────────────────
VIEW_PREFIX = "mv_"
LAYER_ORDER = [
TimeLayer.LAST_2_DAYS,
TimeLayer.LAST_1_MONTH,
TimeLayer.LAST_3_MONTHS,
TimeLayer.LAST_6_MONTHS,
]
LAYER_SUFFIX = {
TimeLayer.LAST_2_DAYS: "l1",
TimeLayer.LAST_1_MONTH: "l2",
TimeLayer.LAST_3_MONTHS: "l3",
TimeLayer.LAST_6_MONTHS: "l4",
}
# 需要刷新物化视图的基表列表
MV_BASE_TABLES = [
"dws_finance_daily_summary",
"dws_assistant_daily_detail",
]
# ── 数据清理相关常量 ─────────────────────────────────────
# 默认需要清理的表及其日期列
DEFAULT_RETENTION_TABLES = [
{"table": "dws_assistant_daily_detail", "date_col": "stat_date"},
{"table": "dws_assistant_monthly_summary", "date_col": "stat_month"},
{"table": "dws_assistant_customer_stats", "date_col": "stat_date"},
{"table": "dws_assistant_salary_calc", "date_col": "salary_month"},
{"table": "dws_assistant_recharge_commission", "date_col": "commission_month"},
{"table": "dws_assistant_finance_analysis", "date_col": "stat_date"},
{"table": "dws_member_consumption_summary", "date_col": "stat_date"},
{"table": "dws_member_visit_detail", "date_col": "visit_date"},
{"table": "dws_finance_daily_summary", "date_col": "stat_date"},
{"table": "dws_finance_income_structure", "date_col": "stat_date"},
{"table": "dws_finance_discount_detail", "date_col": "stat_date"},
{"table": "dws_finance_recharge_summary", "date_col": "stat_date"},
{"table": "dws_finance_expense_summary", "date_col": "expense_month"},
{"table": "dws_platform_settlement", "date_col": "settlement_date"},
# CHANGE [2026-03-07] intent: 项目标签表纳入历史数据清理范围
# assumptions: computed_at 为清理日期列,与其他表的 stat_date 语义一致
{"table": "dws_assistant_project_tag", "date_col": "computed_at"},
{"table": "dws_member_project_tag", "date_col": "computed_at"},
]
def get_task_code(self) -> str:
return "DWS_MAINTENANCE"
def get_target_table(self) -> str:
# 维护任务不写入单一目标表
return "dws_maintenance"
def get_primary_keys(self) -> List[str]:
return []
# ── E/T/L 实现 ───────────────────────────────────────────
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""维护任务无需提取业务数据。"""
return {"site_id": context.store_id}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> Dict[str, Any]:
"""维护任务无需转换。"""
return extracted
def load(self, transformed: Dict[str, Any], context: TaskContext) -> Dict[str, Any]:
"""按顺序执行:① 物化视图刷新 → ② 历史数据清理。
任一子功能可通过配置独立禁用。
"""
stats: Dict[str, Any] = {"refreshed": 0, "cleaned": 0}
# ① 物化视图刷新(先执行,满足需求 4.2
if self._is_mv_enabled():
stats["refreshed"] = self._refresh_all_views()
else:
self.logger.info("DWS_MAINTENANCE: 物化视图刷新未启用,跳过")
# ② 历史数据清理
if self._is_retention_enabled():
stats["cleaned"] = self._cleanup_all_tables(context)
else:
self.logger.info("DWS_MAINTENANCE: 历史数据清理未启用,跳过")
return {"counts": stats}
# ══════════════════════════════════════════════════════════
# 物化视图刷新逻辑(源自 BaseMvRefreshTask
# ══════════════════════════════════════════════════════════
def _is_mv_enabled(self) -> bool:
"""检查物化视图刷新是否启用。"""
return bool(self.config.get("dws.mv.enabled", False))
def _refresh_all_views(self) -> int:
"""刷新所有已配置基表的物化视图,返回刷新视图数。"""
refreshed = 0
for base_table in self.MV_BASE_TABLES:
if not self._is_table_in_mv_scope(base_table):
continue
layers = self._resolve_mv_layers(base_table)
for layer in layers:
view_name = self._get_view_name(base_table, layer)
if not view_name:
continue
if not self._view_exists(view_name):
self.logger.warning(
"DWS_MAINTENANCE: 物化视图不存在,跳过 %s", view_name
)
continue
self._refresh_view(view_name)
refreshed += 1
self.logger.info("DWS_MAINTENANCE: 物化视图刷新完成,数量=%d", refreshed)
return refreshed
def _is_table_in_mv_scope(self, base_table: str) -> bool:
"""判断基表是否在 MV 刷新范围内。"""
tables = self._parse_list(self.config.get("dws.mv.tables"))
if not tables:
tables = self._parse_list(self.config.get("dws.retention.tables"))
# 未配置表列表时默认刷新所有
if not tables:
return True
return base_table in tables
def _resolve_mv_layers(self, base_table: str) -> List[TimeLayer]:
"""解析某基表需要刷新的时间分层列表。"""
# 显式配置优先
configured = self._parse_time_layers(self.config.get("dws.mv.layers"))
if configured:
return configured
# 表级覆盖
table_layers = self._resolve_layer_map(
self.config.get("dws.mv.table_layers")
or self.config.get("dws.retention.table_layers")
)
layer_name = table_layers.get(base_table)
if layer_name:
layer = self._get_layer(layer_name)
if layer and layer != TimeLayer.ALL:
return self._layers_up_to(layer)
# 默认使用 retention.layer
retention_layer = self._get_layer(self.config.get("dws.retention.layer"))
if retention_layer and retention_layer != TimeLayer.ALL:
return self._layers_up_to(retention_layer)
return list(self.LAYER_ORDER)
def _layers_up_to(self, target: TimeLayer) -> List[TimeLayer]:
"""返回从 L1 到 target的分层列表。"""
layers = []
for layer in self.LAYER_ORDER:
layers.append(layer)
if layer == target:
break
return layers
def _get_view_name(self, base_table: str, layer: TimeLayer) -> Optional[str]:
suffix = self.LAYER_SUFFIX.get(layer)
if not suffix or not base_table:
return None
return f"{self.VIEW_PREFIX}{base_table}_{suffix}"
def _view_exists(self, view_name: str) -> bool:
sql = "SELECT to_regclass(%s) AS reg"
rows = self.db.query(sql, (f"{self.DWS_SCHEMA}.{view_name}",))
return bool(rows and rows[0].get("reg"))
def _refresh_view(self, view_name: str) -> None:
concurrently = bool(self.config.get("dws.mv.refresh_concurrently", False))
keyword = "CONCURRENTLY " if concurrently else ""
sql = f"REFRESH MATERIALIZED VIEW {keyword}{self.DWS_SCHEMA}.{view_name}"
self.db.execute(sql)
# ══════════════════════════════════════════════════════════
# 历史数据清理逻辑(源自 DwsRetentionCleanupTask
# ══════════════════════════════════════════════════════════
def _is_retention_enabled(self) -> bool:
"""检查历史数据清理是否启用。"""
return bool(self.config.get("dws.retention.enabled", False))
def _cleanup_all_tables(self, context: TaskContext) -> int:
"""清理所有已配置表的历史数据,返回总删除行数。"""
base_date = (
context.window_end.date()
if hasattr(context.window_end, "date")
else context.window_end
)
default_layer = self._get_layer(
self.config.get("dws.retention.layer", "ALL")
)
if default_layer is None:
self.logger.warning("DWS_MAINTENANCE: 未识别的清理层级,跳过清理")
return 0
target_tables = self._resolve_retention_tables()
if not target_tables:
self.logger.info("DWS_MAINTENANCE: 未配置需要清理的表,跳过清理")
return 0
table_layers = self._resolve_table_layers()
total_deleted = 0
for item in target_tables:
table = item["table"]
date_col = item["date_col"]
layer_name = table_layers.get(table, default_layer.value)
layer = self._get_layer(layer_name)
if layer is None or layer == TimeLayer.ALL:
continue
time_range = self.get_time_layer_range(layer, base_date)
cutoff = self._normalize_cutoff(date_col, time_range.start)
deleted = self._cleanup_table(table, date_col, cutoff, context.store_id)
total_deleted += deleted
self.logger.info("DWS_MAINTENANCE: 数据清理完成,总删除 %d", total_deleted)
return total_deleted
def _resolve_retention_tables(self) -> List[Dict[str, str]]:
"""解析需要清理的目标表列表。"""
table_list = self.config.get("dws.retention.tables")
if not table_list:
return self.DEFAULT_RETENTION_TABLES
if isinstance(table_list, str):
names = [t.strip() for t in table_list.split(",") if t.strip()]
else:
names = list(table_list)
return [
item for item in self.DEFAULT_RETENTION_TABLES
if item["table"] in names
]
def _resolve_table_layers(self) -> Dict[str, str]:
"""解析表级清理层级覆盖配置。"""
raw = self.config.get("dws.retention.table_layers")
if not raw:
return {}
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return {str(k): str(v) for k, v in parsed.items()}
except json.JSONDecodeError:
return {}
return {}
def _normalize_cutoff(self, date_col: str, cutoff: date) -> date:
"""月度列的截止日期归一化到月初。"""
monthly_cols = {"stat_month", "salary_month", "commission_month", "expense_month"}
if date_col in monthly_cols:
return cutoff.replace(day=1)
return cutoff
def _cleanup_table(self, table: str, date_col: str, cutoff: date, site_id: int) -> int:
"""删除指定表中早于截止日期的历史数据。"""
full_table = f"{self.DWS_SCHEMA}.{table}"
sql = f"DELETE FROM {full_table} WHERE site_id = %s AND {date_col} < %s"
with self.db.conn.cursor() as cur:
cur.execute(sql, (site_id, cutoff))
return cur.rowcount
# ══════════════════════════════════════════════════════════
# 共享工具方法
# ══════════════════════════════════════════════════════════
def _get_layer(self, layer_name: Optional[str]) -> Optional[TimeLayer]:
if not layer_name:
return None
name = str(layer_name).upper()
try:
return TimeLayer[name]
except KeyError:
return None
def _resolve_layer_map(self, raw: Any) -> Dict[str, str]:
if not raw:
return {}
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return {str(k): str(v) for k, v in parsed.items()}
except json.JSONDecodeError:
return {}
return {}
def _parse_time_layers(self, raw: Any) -> List[TimeLayer]:
"""解析逗号分隔的时间分层配置。"""
if not raw:
return []
if isinstance(raw, str):
items = [v.strip() for v in raw.split(",") if v.strip()]
elif isinstance(raw, (list, tuple, set)):
items = [str(v).strip() for v in raw if str(v).strip()]
else:
return []
layers = []
for item in items:
layer = self._get_layer(item)
if layer and layer not in layers:
layers.append(layer)
return layers
def _parse_list(self, raw: Any) -> List[str]:
"""解析逗号分隔的字符串列表。"""
if not raw:
return []
if isinstance(raw, str):
return [v.strip() for v in raw.split(",") if v.strip()]
if isinstance(raw, (list, tuple, set)):
return [str(v).strip() for v in raw if str(v).strip()]
return []
__all__ = ["DwsMaintenanceTask"]