Files
feiqiu-ETL/etl_billiards/gui/models/task_registry.py
2026-02-01 22:04:15 +08:00

354 lines
12 KiB
Python

# -*- coding: utf-8 -*-
"""任务注册表:定义所有可用任务及其业务域分组。
从后端 ods_tasks 动态获取任务定义,并按业务域分组,供 UI 使用。
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple
# 尝试从后端导入 ODS 任务定义
try:
from tasks.ods_tasks import ENABLED_ODS_CODES, ODS_TASK_SPECS
_HAS_BACKEND = True
except ImportError:
_HAS_BACKEND = False
ENABLED_ODS_CODES = set()
ODS_TASK_SPECS = ()
class BusinessDomain(Enum):
"""业务域枚举"""
MEMBER = "member" # 会员
SETTLEMENT = "settlement" # 结算/支付
ASSISTANT = "assistant" # 助教
GOODS = "goods" # 商品/销售
TABLE = "table" # 台桌
PROMOTION = "promotion" # 团购/优惠券
INVENTORY = "inventory" # 库存
SCHEMA = "schema" # Schema 初始化
DWD = "dwd" # DWD 装载
QUALITY = "quality" # 质量检查
OTHER = "other" # 其他
# 业务域显示名称
DOMAIN_LABELS: Dict[BusinessDomain, str] = {
BusinessDomain.MEMBER: "会员",
BusinessDomain.SETTLEMENT: "结算/支付",
BusinessDomain.ASSISTANT: "助教",
BusinessDomain.GOODS: "商品/销售",
BusinessDomain.TABLE: "台桌",
BusinessDomain.PROMOTION: "团购/优惠券",
BusinessDomain.INVENTORY: "库存",
BusinessDomain.SCHEMA: "Schema 初始化",
BusinessDomain.DWD: "DWD 装载",
BusinessDomain.QUALITY: "质量检查",
BusinessDomain.OTHER: "其他",
}
@dataclass
class TaskDefinition:
"""任务定义"""
code: str # 任务编码
name: str # 显示名称
description: str # 描述
domain: BusinessDomain # 业务域
requires_window: bool = True # 是否需要时间窗口
is_ods: bool = False # 是否为 ODS 任务
is_dimension: bool = False # 是否为维度类任务(校验时区分)
default_enabled: bool = True # 默认是否选中
# ODS 任务到业务域的映射
ODS_DOMAIN_MAP: Dict[str, BusinessDomain] = {
# 会员相关
"ODS_MEMBER": BusinessDomain.MEMBER,
"ODS_MEMBER_CARD": BusinessDomain.MEMBER,
"ODS_MEMBER_BALANCE": BusinessDomain.MEMBER,
# 结算/支付相关
"ODS_PAYMENT": BusinessDomain.SETTLEMENT,
"ODS_REFUND": BusinessDomain.SETTLEMENT,
"ODS_SETTLEMENT_RECORDS": BusinessDomain.SETTLEMENT,
"ODS_RECHARGE_SETTLE": BusinessDomain.SETTLEMENT,
"ODS_SETTLEMENT_TICKET": BusinessDomain.SETTLEMENT,
# 助教相关
"ODS_ASSISTANT_ACCOUNT": BusinessDomain.ASSISTANT,
"ODS_ASSISTANT_LEDGER": BusinessDomain.ASSISTANT,
"ODS_ASSISTANT_ABOLISH": BusinessDomain.ASSISTANT,
# 商品/销售相关
"ODS_TENANT_GOODS": BusinessDomain.GOODS,
"ODS_STORE_GOODS": BusinessDomain.GOODS,
"ODS_STORE_GOODS_SALES": BusinessDomain.GOODS,
"ODS_GOODS_CATEGORY": BusinessDomain.GOODS,
# 台桌相关
"ODS_TABLES": BusinessDomain.TABLE,
"ODS_TABLE_USE": BusinessDomain.TABLE,
"ODS_TABLE_FEE_DISCOUNT": BusinessDomain.TABLE,
# 团购/优惠券相关
"ODS_GROUP_PACKAGE": BusinessDomain.PROMOTION,
"ODS_GROUP_BUY_REDEMPTION": BusinessDomain.PROMOTION,
"ODS_PLATFORM_COUPON": BusinessDomain.PROMOTION,
# 库存相关
"ODS_INVENTORY_STOCK": BusinessDomain.INVENTORY,
"ODS_INVENTORY_CHANGE": BusinessDomain.INVENTORY,
}
# ODS 任务显示名称(中文)
ODS_DISPLAY_NAMES: Dict[str, str] = {
"ODS_MEMBER": "会员档案",
"ODS_MEMBER_CARD": "会员储值卡",
"ODS_MEMBER_BALANCE": "会员余额变动",
"ODS_PAYMENT": "支付流水",
"ODS_REFUND": "退款流水",
"ODS_SETTLEMENT_RECORDS": "结账记录",
"ODS_RECHARGE_SETTLE": "充值结算",
"ODS_SETTLEMENT_TICKET": "结账小票",
"ODS_ASSISTANT_ACCOUNT": "助教账号",
"ODS_ASSISTANT_LEDGER": "助教流水",
"ODS_ASSISTANT_ABOLISH": "助教作废",
"ODS_TENANT_GOODS": "租户商品",
"ODS_STORE_GOODS": "门店商品",
"ODS_STORE_GOODS_SALES": "商品销售流水",
"ODS_GOODS_CATEGORY": "商品分类",
"ODS_TABLES": "台桌维表",
"ODS_TABLE_USE": "台费计费流水",
"ODS_TABLE_FEE_DISCOUNT": "台费折扣调账",
"ODS_GROUP_PACKAGE": "团购套餐",
"ODS_GROUP_BUY_REDEMPTION": "团购核销",
"ODS_PLATFORM_COUPON": "平台券核销",
"ODS_INVENTORY_STOCK": "库存汇总",
"ODS_INVENTORY_CHANGE": "库存变化",
}
# 维度类 ODS 任务(校验时通常单独处理)
DIMENSION_ODS_CODES = {
"ODS_MEMBER",
"ODS_MEMBER_CARD",
"ODS_ASSISTANT_ACCOUNT",
"ODS_TENANT_GOODS",
"ODS_STORE_GOODS",
"ODS_GOODS_CATEGORY",
"ODS_TABLES",
"ODS_GROUP_PACKAGE",
}
# 事实类 ODS 任务(需要时间窗口)
FACT_ODS_CODES = {
"ODS_MEMBER_BALANCE",
"ODS_PAYMENT",
"ODS_REFUND",
"ODS_SETTLEMENT_RECORDS",
"ODS_RECHARGE_SETTLE",
"ODS_SETTLEMENT_TICKET",
"ODS_ASSISTANT_LEDGER",
"ODS_ASSISTANT_ABOLISH",
"ODS_STORE_GOODS_SALES",
"ODS_TABLE_USE",
"ODS_TABLE_FEE_DISCOUNT",
"ODS_GROUP_BUY_REDEMPTION",
"ODS_PLATFORM_COUPON",
"ODS_INVENTORY_CHANGE",
}
# 非 ODS 任务定义
NON_ODS_TASKS: List[TaskDefinition] = [
# DWD 装载
TaskDefinition(
code="DWD_LOAD_FROM_ODS",
name="ODS→DWD 装载",
description="从 ODS 增量装载到 DWD",
domain=BusinessDomain.DWD,
requires_window=True,
),
TaskDefinition(
code="DWD_QUALITY_CHECK",
name="DWD 质量检查",
description="执行 DWD 数据质量检查",
domain=BusinessDomain.QUALITY,
requires_window=False,
),
TaskDefinition(
code="DWS_BUILD_ORDER_SUMMARY",
name="构建订单汇总",
description="重算 DWS 订单汇总表",
domain=BusinessDomain.DWD,
requires_window=False,
),
# Schema 初始化
TaskDefinition(
code="INIT_ODS_SCHEMA",
name="初始化 ODS Schema",
description="创建/重建 ODS 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="INIT_DWD_SCHEMA",
name="初始化 DWD Schema",
description="创建/重建 DWD 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="INIT_DWS_SCHEMA",
name="初始化 DWS Schema",
description="创建/重建 DWS 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
# 其他
TaskDefinition(
code="MANUAL_INGEST",
name="手工数据灌入",
description="从本地 JSON 回放入库",
domain=BusinessDomain.OTHER,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="CHECK_CUTOFF",
name="检查 Cutoff",
description="查看各表数据截止时间",
domain=BusinessDomain.QUALITY,
requires_window=False,
),
TaskDefinition(
code="DATA_INTEGRITY_CHECK",
name="数据完整性检查",
description="检查 ODS/DWD 数据完整性",
domain=BusinessDomain.QUALITY,
requires_window=True,
),
]
def _build_ods_task_definition(code: str) -> TaskDefinition:
"""根据 ODS 任务编码构建任务定义"""
domain = ODS_DOMAIN_MAP.get(code, BusinessDomain.OTHER)
name = ODS_DISPLAY_NAMES.get(code, code)
is_dimension = code in DIMENSION_ODS_CODES
# 从后端获取描述(如果可用)
description = f"抓取{name}到 ODS"
if _HAS_BACKEND:
for spec in ODS_TASK_SPECS:
if spec.code == code:
# 尝试解码描述(可能是乱码)
desc = spec.description
if desc and not any(ord(c) > 0x4e00 for c in desc[:10] if desc):
description = f"抓取{name}到 ODS"
break
return TaskDefinition(
code=code,
name=name,
description=description,
domain=domain,
requires_window=code not in DIMENSION_ODS_CODES,
is_ods=True,
is_dimension=is_dimension,
)
class TaskRegistry:
"""任务注册表:管理所有可用任务"""
_instance: Optional["TaskRegistry"] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._tasks: Dict[str, TaskDefinition] = {}
self._load_tasks()
def _load_tasks(self):
"""加载所有任务定义"""
# 加载 ODS 任务
ods_codes = ENABLED_ODS_CODES if _HAS_BACKEND else set(ODS_DOMAIN_MAP.keys())
for code in ods_codes:
self._tasks[code] = _build_ods_task_definition(code)
# 加载非 ODS 任务
for task_def in NON_ODS_TASKS:
self._tasks[task_def.code] = task_def
def get_task(self, code: str) -> Optional[TaskDefinition]:
"""获取任务定义"""
return self._tasks.get(code)
def get_all_tasks(self) -> List[TaskDefinition]:
"""获取所有任务"""
return list(self._tasks.values())
def get_ods_tasks(self) -> List[TaskDefinition]:
"""获取所有 ODS 任务"""
return [t for t in self._tasks.values() if t.is_ods]
def get_fact_ods_tasks(self) -> List[TaskDefinition]:
"""获取事实类 ODS 任务(需要时间窗口)"""
return [t for t in self._tasks.values() if t.is_ods and not t.is_dimension]
def get_dimension_ods_tasks(self) -> List[TaskDefinition]:
"""获取维度类 ODS 任务"""
return [t for t in self._tasks.values() if t.is_ods and t.is_dimension]
def get_tasks_by_domain(self, domain: BusinessDomain) -> List[TaskDefinition]:
"""按业务域获取任务"""
return [t for t in self._tasks.values() if t.domain == domain]
def get_ods_tasks_grouped(self) -> Dict[BusinessDomain, List[TaskDefinition]]:
"""获取按业务域分组的 ODS 任务"""
grouped: Dict[BusinessDomain, List[TaskDefinition]] = {}
for task in self.get_ods_tasks():
if task.domain not in grouped:
grouped[task.domain] = []
grouped[task.domain].append(task)
return grouped
def get_non_ods_tasks(self) -> List[TaskDefinition]:
"""获取非 ODS 任务"""
return [t for t in self._tasks.values() if not t.is_ods]
# 全局注册表实例
task_registry = TaskRegistry()
# 便捷函数
def get_ods_task_codes() -> List[str]:
"""获取所有 ODS 任务编码"""
return [t.code for t in task_registry.get_ods_tasks()]
def get_fact_ods_task_codes() -> List[str]:
"""获取事实类 ODS 任务编码"""
return [t.code for t in task_registry.get_fact_ods_tasks()]
def get_dimension_ods_task_codes() -> List[str]:
"""获取维度类 ODS 任务编码"""
return [t.code for t in task_registry.get_dimension_ods_tasks()]
def get_all_task_tuples() -> List[Tuple[str, str, str]]:
"""获取所有任务的 (code, name, description) 元组列表"""
return [(t.code, t.name, t.description) for t in task_registry.get_all_tasks()]
def get_ods_tasks_for_ui() -> List[Tuple[str, str, BusinessDomain]]:
"""获取 ODS 任务列表供 UI 使用:(code, display_name, domain)"""
return [(t.code, t.name, t.domain) for t in task_registry.get_ods_tasks()]