Files
ZQYY.FQ-ETL/tasks/dws/mv_refresh_task.py

197 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
DWS 物化视图刷新任务
说明:
- 按 L1/L2/L3/L4 时间分层刷新物化视图
- 默认受 dws.mv.enabled 与 dws.retention.* 配置联动控制
"""
from __future__ import annotations
import json
from typing import Any, Dict, List, Optional
from .base_dws_task import BaseDwsTask, TaskContext, TimeLayer
class BaseMvRefreshTask(BaseDwsTask):
"""物化视图刷新基类"""
BASE_TABLE: str = ""
DATE_COL: str = ""
VIEW_PREFIX = "mv_"
LAYER_ORDER = [
TimeLayer.LAST_2_DAYS,
TimeLayer.LAST_1_MONTH,
TimeLayer.LAST_3_MONTHS,
TimeLayer.LAST_6_MONTHS,
]
LAYER_SUFFIX = {
TimeLayer.LAST_2_DAYS: "l1",
TimeLayer.LAST_1_MONTH: "l2",
TimeLayer.LAST_3_MONTHS: "l3",
TimeLayer.LAST_6_MONTHS: "l4",
}
def get_target_table(self) -> str:
return self.BASE_TABLE
def get_primary_keys(self) -> List[str]:
return []
def extract(self, context: TaskContext) -> Dict[str, Any]:
return {"site_id": context.store_id}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> Dict[str, Any]:
return extracted
def load(self, transformed: Dict[str, Any], context: TaskContext) -> Dict[str, Any]:
if not self._is_enabled():
self.logger.info("%s: 未启用物化刷新,跳过", self.get_task_code())
return {"counts": {"refreshed": 0}}
layers = self._resolve_layers()
refreshed = 0
details = []
for layer in layers:
view_name = self._get_view_name(layer)
if not view_name:
continue
if not self._view_exists(view_name):
self.logger.warning("%s: 物化视图不存在,跳过 %s", self.get_task_code(), view_name)
continue
self._refresh_view(view_name)
refreshed += 1
details.append({"view": view_name, "layer": layer.value})
self.logger.info("%s: 刷新完成,物化视图数=%d", self.get_task_code(), refreshed)
return {"counts": {"refreshed": refreshed}, "extra": {"details": details}}
def _is_enabled(self) -> bool:
enabled = bool(self.config.get("dws.mv.enabled", False))
if not enabled:
return False
tables = self._parse_list(self.config.get("dws.mv.tables"))
if not tables:
tables = self._parse_list(self.config.get("dws.retention.tables"))
if tables and self.BASE_TABLE not in tables:
return False
return True
def _resolve_layers(self) -> List[TimeLayer]:
# 显式配置优先
configured = self._parse_layers(self.config.get("dws.mv.layers"))
if configured:
return configured
# 表级覆盖:优先 mv.table_layers其次 retention.table_layers
table_layers = self._resolve_layer_map(
self.config.get("dws.mv.table_layers") or self.config.get("dws.retention.table_layers")
)
layer_name = table_layers.get(self.BASE_TABLE)
if layer_name:
layer = self._get_layer(layer_name)
if layer and layer != TimeLayer.ALL:
return self._layers_up_to(layer)
# 默认使用 retention.layer
retention_layer = self._get_layer(self.config.get("dws.retention.layer"))
if retention_layer and retention_layer != TimeLayer.ALL:
return self._layers_up_to(retention_layer)
return list(self.LAYER_ORDER)
def _layers_up_to(self, target: TimeLayer) -> List[TimeLayer]:
layers = []
for layer in self.LAYER_ORDER:
layers.append(layer)
if layer == target:
break
return layers
def _get_view_name(self, layer: TimeLayer) -> Optional[str]:
suffix = self.LAYER_SUFFIX.get(layer)
if not suffix or not self.BASE_TABLE:
return None
return f"{self.VIEW_PREFIX}{self.BASE_TABLE}_{suffix}"
def _view_exists(self, view_name: str) -> bool:
sql = "SELECT to_regclass(%s) AS reg"
rows = self.db.query(sql, (f"{self.DWS_SCHEMA}.{view_name}",))
return bool(rows and rows[0].get("reg"))
def _refresh_view(self, view_name: str) -> None:
concurrently = bool(self.config.get("dws.mv.refresh_concurrently", False))
keyword = "CONCURRENTLY " if concurrently else ""
sql = f"REFRESH MATERIALIZED VIEW {keyword}{self.DWS_SCHEMA}.{view_name}"
self.db.execute(sql)
def _get_layer(self, layer_name: Optional[str]) -> Optional[TimeLayer]:
if not layer_name:
return None
name = str(layer_name).upper()
try:
return TimeLayer[name]
except KeyError:
return None
def _resolve_layer_map(self, raw: Any) -> Dict[str, str]:
if not raw:
return {}
if isinstance(raw, dict):
return {str(k): str(v) for k, v in raw.items()}
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return {str(k): str(v) for k, v in parsed.items()}
except json.JSONDecodeError:
return {}
return {}
def _parse_layers(self, raw: Any) -> List[TimeLayer]:
if not raw:
return []
if isinstance(raw, str):
items = [v.strip() for v in raw.split(",") if v.strip()]
elif isinstance(raw, (list, tuple, set)):
items = [str(v).strip() for v in raw if str(v).strip()]
else:
return []
layers = []
for item in items:
layer = self._get_layer(item)
if layer and layer not in layers:
layers.append(layer)
return layers
def _parse_list(self, raw: Any) -> List[str]:
if not raw:
return []
if isinstance(raw, str):
return [v.strip() for v in raw.split(",") if v.strip()]
if isinstance(raw, (list, tuple, set)):
return [str(v).strip() for v in raw if str(v).strip()]
return []
class DwsMvRefreshFinanceDailyTask(BaseMvRefreshTask):
BASE_TABLE = "dws_finance_daily_summary"
DATE_COL = "stat_date"
def get_task_code(self) -> str:
return "DWS_MV_REFRESH_FINANCE_DAILY"
class DwsMvRefreshAssistantDailyTask(BaseMvRefreshTask):
BASE_TABLE = "dws_assistant_daily_detail"
DATE_COL = "stat_date"
def get_task_code(self) -> str:
return "DWS_MV_REFRESH_ASSISTANT_DAILY"
__all__ = ["DwsMvRefreshFinanceDailyTask", "DwsMvRefreshAssistantDailyTask"]