Files
ZQYY.FQ-ETL/gui/models/task_registry.py

685 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""任务注册表:定义所有可用任务及其业务域分组。
从后端 ods_tasks 动态获取任务定义,并按业务域分组,供 UI 使用。
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Sequence, Tuple
# 尝试从后端导入 ODS 任务定义
try:
from tasks.ods.ods_tasks import ENABLED_ODS_CODES, ODS_TASK_SPECS
_HAS_BACKEND = True
except ImportError:
_HAS_BACKEND = False
ENABLED_ODS_CODES = set()
ODS_TASK_SPECS = ()
class BusinessDomain(Enum):
"""业务域枚举"""
MEMBER = "member" # 会员
SETTLEMENT = "settlement" # 结算/支付
ASSISTANT = "assistant" # 助教
GOODS = "goods" # 商品/销售
TABLE = "table" # 台桌
PROMOTION = "promotion" # 团购/优惠券
INVENTORY = "inventory" # 库存
SCHEMA = "schema" # Schema 初始化
DWD = "dwd" # DWD 装载
DWS = "dws" # DWS 汇总
INDEX = "index" # 指数计算
QUALITY = "quality" # 质量检查
OTHER = "other" # 其他
# 业务域显示名称
DOMAIN_LABELS: Dict[BusinessDomain, str] = {
BusinessDomain.MEMBER: "会员",
BusinessDomain.SETTLEMENT: "结算/支付",
BusinessDomain.ASSISTANT: "助教",
BusinessDomain.GOODS: "商品/销售",
BusinessDomain.TABLE: "台桌",
BusinessDomain.PROMOTION: "团购/优惠券",
BusinessDomain.INVENTORY: "库存",
BusinessDomain.SCHEMA: "Schema 初始化",
BusinessDomain.DWD: "DWD 装载",
BusinessDomain.DWS: "DWS 汇总",
BusinessDomain.INDEX: "指数计算",
BusinessDomain.QUALITY: "质量检查",
BusinessDomain.OTHER: "其他",
}
@dataclass
class TaskDefinition:
"""任务定义"""
code: str # 任务编码
name: str # 显示名称
description: str # 描述
domain: BusinessDomain # 业务域
requires_window: bool = True # 是否需要时间窗口
is_ods: bool = False # 是否为 ODS 任务
is_dimension: bool = False # 是否为维度类任务(校验时区分)
default_enabled: bool = True # 默认是否选中
# ODS 任务到业务域的映射
ODS_DOMAIN_MAP: Dict[str, BusinessDomain] = {
# 会员相关
"ODS_MEMBER": BusinessDomain.MEMBER,
"ODS_MEMBER_CARD": BusinessDomain.MEMBER,
"ODS_MEMBER_BALANCE": BusinessDomain.MEMBER,
# 结算/支付相关
"ODS_PAYMENT": BusinessDomain.SETTLEMENT,
"ODS_REFUND": BusinessDomain.SETTLEMENT,
"ODS_SETTLEMENT_RECORDS": BusinessDomain.SETTLEMENT,
"ODS_RECHARGE_SETTLE": BusinessDomain.SETTLEMENT,
"ODS_SETTLEMENT_TICKET": BusinessDomain.SETTLEMENT,
# 助教相关
"ODS_ASSISTANT_ACCOUNT": BusinessDomain.ASSISTANT,
"ODS_ASSISTANT_LEDGER": BusinessDomain.ASSISTANT,
"ODS_ASSISTANT_ABOLISH": BusinessDomain.ASSISTANT,
# 商品/销售相关
"ODS_TENANT_GOODS": BusinessDomain.GOODS,
"ODS_STORE_GOODS": BusinessDomain.GOODS,
"ODS_STORE_GOODS_SALES": BusinessDomain.GOODS,
"ODS_GOODS_CATEGORY": BusinessDomain.GOODS,
# 台桌相关
"ODS_TABLES": BusinessDomain.TABLE,
"ODS_TABLE_USE": BusinessDomain.TABLE,
"ODS_TABLE_FEE_DISCOUNT": BusinessDomain.TABLE,
# 团购/优惠券相关
"ODS_GROUP_PACKAGE": BusinessDomain.PROMOTION,
"ODS_GROUP_BUY_REDEMPTION": BusinessDomain.PROMOTION,
"ODS_PLATFORM_COUPON": BusinessDomain.PROMOTION,
# 库存相关
"ODS_INVENTORY_STOCK": BusinessDomain.INVENTORY,
"ODS_INVENTORY_CHANGE": BusinessDomain.INVENTORY,
}
# ODS 任务显示名称(中文)
ODS_DISPLAY_NAMES: Dict[str, str] = {
"ODS_MEMBER": "会员档案",
"ODS_MEMBER_CARD": "会员储值卡",
"ODS_MEMBER_BALANCE": "会员余额变动",
"ODS_PAYMENT": "支付流水",
"ODS_REFUND": "退款流水",
"ODS_SETTLEMENT_RECORDS": "结账记录",
"ODS_RECHARGE_SETTLE": "充值结算",
"ODS_SETTLEMENT_TICKET": "结账小票",
"ODS_ASSISTANT_ACCOUNT": "助教账号",
"ODS_ASSISTANT_LEDGER": "助教流水",
"ODS_ASSISTANT_ABOLISH": "助教作废",
"ODS_TENANT_GOODS": "租户商品",
"ODS_STORE_GOODS": "门店商品",
"ODS_STORE_GOODS_SALES": "商品销售流水",
"ODS_GOODS_CATEGORY": "商品分类",
"ODS_TABLES": "台桌维表",
"ODS_TABLE_USE": "台费计费流水",
"ODS_TABLE_FEE_DISCOUNT": "台费折扣调账",
"ODS_GROUP_PACKAGE": "团购套餐",
"ODS_GROUP_BUY_REDEMPTION": "团购核销",
"ODS_PLATFORM_COUPON": "平台券核销",
"ODS_INVENTORY_STOCK": "库存汇总",
"ODS_INVENTORY_CHANGE": "库存变化",
}
# 维度类 ODS 任务(校验时通常单独处理)
DIMENSION_ODS_CODES = {
"ODS_MEMBER",
"ODS_MEMBER_CARD",
"ODS_ASSISTANT_ACCOUNT",
"ODS_TENANT_GOODS",
"ODS_STORE_GOODS",
"ODS_GOODS_CATEGORY",
"ODS_TABLES",
"ODS_GROUP_PACKAGE",
}
# 事实类 ODS 任务(需要时间窗口)
FACT_ODS_CODES = {
"ODS_MEMBER_BALANCE",
"ODS_PAYMENT",
"ODS_REFUND",
"ODS_SETTLEMENT_RECORDS",
"ODS_RECHARGE_SETTLE",
"ODS_SETTLEMENT_TICKET",
"ODS_ASSISTANT_LEDGER",
"ODS_ASSISTANT_ABOLISH",
"ODS_STORE_GOODS_SALES",
"ODS_TABLE_USE",
"ODS_TABLE_FEE_DISCOUNT",
"ODS_GROUP_BUY_REDEMPTION",
"ODS_PLATFORM_COUPON",
"ODS_INVENTORY_CHANGE",
}
# ======================== DWD 表定义 ========================
@dataclass
class DwdTableDefinition:
"""DWD 表定义(用于 GUI 表级选择)"""
code: str # 表编码(不含 schema如 dim_member
name: str # 中文显示名称
description: str # 描述
domain: BusinessDomain # 业务域
is_dimension: bool = False # 是否维度表
tables: List[str] = field(default_factory=list) # 完整表名列表(含 _ex
# DWD 表定义列表(按业务域分组)
DWD_TABLE_DEFINITIONS: List[DwdTableDefinition] = [
# ---- 会员 ----
DwdTableDefinition(
"dim_member", "会员维度", "会员基本信息维度表",
BusinessDomain.MEMBER, True,
["billiards_dwd.dim_member", "billiards_dwd.dim_member_ex"],
),
DwdTableDefinition(
"dim_member_card_account", "会员储值卡", "会员储值卡账户维度表",
BusinessDomain.MEMBER, True,
["billiards_dwd.dim_member_card_account", "billiards_dwd.dim_member_card_account_ex"],
),
DwdTableDefinition(
"dwd_member_balance_change", "余额变动", "会员余额变动事实表",
BusinessDomain.MEMBER, False,
["billiards_dwd.dwd_member_balance_change", "billiards_dwd.dwd_member_balance_change_ex"],
),
# ---- 结算/支付 ----
DwdTableDefinition(
"dwd_settlement_head", "结账记录", "结账/结算事实表",
BusinessDomain.SETTLEMENT, False,
["billiards_dwd.dwd_settlement_head", "billiards_dwd.dwd_settlement_head_ex"],
),
DwdTableDefinition(
"dwd_payment", "支付流水", "支付明细事实表",
BusinessDomain.SETTLEMENT, False,
["billiards_dwd.dwd_payment"],
),
DwdTableDefinition(
"dwd_refund", "退款流水", "退款明细事实表",
BusinessDomain.SETTLEMENT, False,
["billiards_dwd.dwd_refund", "billiards_dwd.dwd_refund_ex"],
),
DwdTableDefinition(
"dwd_recharge_order", "充值订单", "充值结算事实表",
BusinessDomain.SETTLEMENT, False,
["billiards_dwd.dwd_recharge_order", "billiards_dwd.dwd_recharge_order_ex"],
),
# ---- 助教 ----
DwdTableDefinition(
"dim_assistant", "助教维度", "助教基本信息维度表",
BusinessDomain.ASSISTANT, True,
["billiards_dwd.dim_assistant", "billiards_dwd.dim_assistant_ex"],
),
DwdTableDefinition(
"dwd_assistant_service_log", "助教服务流水", "助教服务计费事实表",
BusinessDomain.ASSISTANT, False,
["billiards_dwd.dwd_assistant_service_log", "billiards_dwd.dwd_assistant_service_log_ex"],
),
DwdTableDefinition(
"dwd_assistant_trash_event", "助教作废", "助教作废事件事实表",
BusinessDomain.ASSISTANT, False,
["billiards_dwd.dwd_assistant_trash_event", "billiards_dwd.dwd_assistant_trash_event_ex"],
),
# ---- 商品/销售 ----
DwdTableDefinition(
"dim_tenant_goods", "租户商品", "租户商品维度表",
BusinessDomain.GOODS, True,
["billiards_dwd.dim_tenant_goods", "billiards_dwd.dim_tenant_goods_ex"],
),
DwdTableDefinition(
"dim_store_goods", "门店商品", "门店商品维度表",
BusinessDomain.GOODS, True,
["billiards_dwd.dim_store_goods", "billiards_dwd.dim_store_goods_ex"],
),
DwdTableDefinition(
"dim_goods_category", "商品分类", "商品分类维度表",
BusinessDomain.GOODS, True,
["billiards_dwd.dim_goods_category"],
),
DwdTableDefinition(
"dwd_store_goods_sale", "商品销售", "商品销售事实表",
BusinessDomain.GOODS, False,
["billiards_dwd.dwd_store_goods_sale", "billiards_dwd.dwd_store_goods_sale_ex"],
),
# ---- 台桌 ----
DwdTableDefinition(
"dim_site", "门店维度", "门店基本信息维度表",
BusinessDomain.TABLE, True,
["billiards_dwd.dim_site", "billiards_dwd.dim_site_ex"],
),
DwdTableDefinition(
"dim_table", "台桌维度", "台桌基本信息维度表",
BusinessDomain.TABLE, True,
["billiards_dwd.dim_table", "billiards_dwd.dim_table_ex"],
),
DwdTableDefinition(
"dwd_table_fee_log", "台费流水", "台费计费事实表",
BusinessDomain.TABLE, False,
["billiards_dwd.dwd_table_fee_log", "billiards_dwd.dwd_table_fee_log_ex"],
),
DwdTableDefinition(
"dwd_table_fee_adjust", "台费折扣调账", "台费折扣调账事实表",
BusinessDomain.TABLE, False,
["billiards_dwd.dwd_table_fee_adjust", "billiards_dwd.dwd_table_fee_adjust_ex"],
),
# ---- 团购/优惠券 ----
DwdTableDefinition(
"dim_groupbuy_package", "团购套餐", "团购套餐维度表",
BusinessDomain.PROMOTION, True,
["billiards_dwd.dim_groupbuy_package", "billiards_dwd.dim_groupbuy_package_ex"],
),
DwdTableDefinition(
"dwd_groupbuy_redemption", "团购核销", "团购核销事实表",
BusinessDomain.PROMOTION, False,
["billiards_dwd.dwd_groupbuy_redemption", "billiards_dwd.dwd_groupbuy_redemption_ex"],
),
DwdTableDefinition(
"dwd_platform_coupon_redemption", "平台券核销", "平台券核销事实表",
BusinessDomain.PROMOTION, False,
["billiards_dwd.dwd_platform_coupon_redemption", "billiards_dwd.dwd_platform_coupon_redemption_ex"],
),
]
# DWD 表按业务域显示顺序
DWD_TABLE_DOMAIN_ORDER: List[BusinessDomain] = [
BusinessDomain.MEMBER,
BusinessDomain.SETTLEMENT,
BusinessDomain.ASSISTANT,
BusinessDomain.GOODS,
BusinessDomain.TABLE,
BusinessDomain.PROMOTION,
]
def get_dwd_tables_grouped() -> Dict[BusinessDomain, List[DwdTableDefinition]]:
"""获取按业务域分组的 DWD 表定义"""
grouped: Dict[BusinessDomain, List[DwdTableDefinition]] = {}
for tbl in DWD_TABLE_DEFINITIONS:
grouped.setdefault(tbl.domain, []).append(tbl)
return grouped
def get_all_dwd_table_codes() -> List[str]:
"""获取所有 DWD 表编码"""
return [t.code for t in DWD_TABLE_DEFINITIONS]
def resolve_dwd_table_names(codes: Sequence[str]) -> List[str]:
"""将 DWD 表编码解析为完整表名列表(含 _ex"""
code_set = {c.lower() for c in codes}
result: List[str] = []
for tbl in DWD_TABLE_DEFINITIONS:
if tbl.code.lower() in code_set:
result.extend(tbl.tables)
return result
# 非 ODS 任务定义
NON_ODS_TASKS: List[TaskDefinition] = [
# DWD 装载(保留为单一调度任务,表级选择通过 DWD_ONLY_TABLES 环境变量控制)
TaskDefinition(
code="DWD_LOAD_FROM_ODS",
name="ODS→DWD 装载",
description="从 ODS 增量装载到 DWD",
domain=BusinessDomain.DWD,
requires_window=True,
),
TaskDefinition(
code="DWD_QUALITY_CHECK",
name="DWD 质量检查",
description="执行 DWD 数据质量检查",
domain=BusinessDomain.QUALITY,
requires_window=False,
),
TaskDefinition(
code="DWS_BUILD_ORDER_SUMMARY",
name="构建订单汇总",
description="重算 DWS 订单汇总表",
domain=BusinessDomain.DWS,
requires_window=False,
),
# DWS 汇总任务
TaskDefinition(
code="DWS_ASSISTANT_DAILY",
name="助教日度明细",
description="汇总助教日度服务、时长与收入指标",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_ASSISTANT_MONTHLY",
name="助教月度汇总",
description="汇总助教月度绩效与服务指标",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_ASSISTANT_CUSTOMER",
name="助教客户统计",
description="统计助教与客户的服务关系与滚动窗口指标",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_ASSISTANT_SALARY",
name="助教工资计算",
description="计算助教月度工资与奖金明细",
domain=BusinessDomain.DWS,
requires_window=True,
default_enabled=False,
),
TaskDefinition(
code="DWS_ASSISTANT_FINANCE",
name="助教财务分析",
description="汇总助教日度财务分析指标",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_MEMBER_CONSUMPTION",
name="会员消费汇总",
description="汇总会员消费行为与滚动窗口指标",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_MEMBER_VISIT",
name="会员来店明细",
description="记录会员来店消费明细与服务列表",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_FINANCE_DAILY",
name="财务日度汇总",
description="汇总当日财务发生额、优惠与现金流",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_FINANCE_RECHARGE",
name="财务充值统计",
description="统计充值笔数、金额与卡余额",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_FINANCE_INCOME_STRUCTURE",
name="财务收入结构",
description="统计收入结构分布",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_FINANCE_DISCOUNT_DETAIL",
name="优惠明细分析",
description="拆分优惠构成与占比",
domain=BusinessDomain.DWS,
requires_window=True,
),
TaskDefinition(
code="DWS_MV_REFRESH_FINANCE_DAILY",
name="物化刷新-财务日汇总",
description="刷新财务日汇总物化视图L1-L4",
domain=BusinessDomain.DWS,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="DWS_MV_REFRESH_ASSISTANT_DAILY",
name="物化刷新-助教日明细",
description="刷新助教日明细物化视图L1-L4",
domain=BusinessDomain.DWS,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="DWS_RETENTION_CLEANUP",
name="时间分层清理",
description="按配置清理历史 DWS 数据",
domain=BusinessDomain.DWS,
requires_window=True,
default_enabled=False,
),
# DWS 指数计算
TaskDefinition(
code="DWS_WINBACK_INDEX",
name="老客挽回指数WBI",
description="计算老客挽回优先级,基于个人周期超期、降频、价值与充值压力",
domain=BusinessDomain.INDEX,
requires_window=False,
),
TaskDefinition(
code="DWS_NEWCONV_INDEX",
name="新客转化指数NCI",
description="计算新客二访/三访转化紧迫度与价值",
domain=BusinessDomain.INDEX,
requires_window=False,
),
TaskDefinition(
code="DWS_RECALL_INDEX",
name="客户召回指数(旧版)",
description="旧版召回指数,建议迁移到 WBI/NCI",
domain=BusinessDomain.INDEX,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="DWS_INTIMACY_INDEX",
name="客户-助教亲密指数",
description="旧版关系指数(兼容保留,默认不启用)",
domain=BusinessDomain.INDEX,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="DWS_RELATION_INDEX",
name="关系指数RS/OS/MS/ML",
description="单任务计算关系强度、归属份额、升温动量、付费关联",
domain=BusinessDomain.INDEX,
requires_window=False,
),
TaskDefinition(
code="DWS_ML_MANUAL_IMPORT",
name="ML人工台账导入",
description="导入人工台账并按日/30天批次覆盖写入 ML 归因明细",
domain=BusinessDomain.INDEX,
requires_window=False,
default_enabled=False,
),
# Schema 初始化
TaskDefinition(
code="INIT_ODS_SCHEMA",
name="初始化 ODS Schema",
description="创建/重建 ODS 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="INIT_DWD_SCHEMA",
name="初始化 DWD Schema",
description="创建/重建 DWD 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="INIT_DWS_SCHEMA",
name="初始化 DWS Schema",
description="创建/重建 DWS 表结构",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="SEED_DWS_CONFIG",
name="初始化 DWS 配置",
description="写入 DWS 配置表基础数据",
domain=BusinessDomain.SCHEMA,
requires_window=False,
default_enabled=False,
),
# 其他
TaskDefinition(
code="MANUAL_INGEST",
name="手工数据灌入",
description="从本地 JSON 回放入库",
domain=BusinessDomain.OTHER,
requires_window=False,
default_enabled=False,
),
TaskDefinition(
code="ODS_JSON_ARCHIVE",
name="ODS JSON 归档",
description="在线抓取 ODS 接口数据并落盘 JSON",
domain=BusinessDomain.OTHER,
requires_window=True,
default_enabled=False,
),
TaskDefinition(
code="CHECK_CUTOFF",
name="检查 Cutoff",
description="查看各表数据截止时间",
domain=BusinessDomain.QUALITY,
requires_window=False,
),
TaskDefinition(
code="DATA_INTEGRITY_CHECK",
name="数据完整性检查",
description="检查 ODS/DWD 数据完整性",
domain=BusinessDomain.QUALITY,
requires_window=True,
),
]
def _build_ods_task_definition(code: str) -> TaskDefinition:
"""根据 ODS 任务编码构建任务定义"""
domain = ODS_DOMAIN_MAP.get(code, BusinessDomain.OTHER)
name = ODS_DISPLAY_NAMES.get(code, code)
is_dimension = code in DIMENSION_ODS_CODES
# 从后端获取描述(如果可用)
description = f"抓取{name}到 ODS"
if _HAS_BACKEND:
for spec in ODS_TASK_SPECS:
if spec.code == code:
# 尝试解码描述(可能是乱码)
desc = spec.description
if desc and not any(ord(c) > 0x4e00 for c in desc[:10] if desc):
description = f"抓取{name}到 ODS"
break
return TaskDefinition(
code=code,
name=name,
description=description,
domain=domain,
requires_window=code not in DIMENSION_ODS_CODES,
is_ods=True,
is_dimension=is_dimension,
)
class TaskRegistry:
"""任务注册表:管理所有可用任务"""
_instance: Optional["TaskRegistry"] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._tasks: Dict[str, TaskDefinition] = {}
self._load_tasks()
def _load_tasks(self):
"""加载所有任务定义"""
# 加载 ODS 任务
ods_codes = ENABLED_ODS_CODES if _HAS_BACKEND else set(ODS_DOMAIN_MAP.keys())
for code in ods_codes:
self._tasks[code] = _build_ods_task_definition(code)
# 加载非 ODS 任务
for task_def in NON_ODS_TASKS:
self._tasks[task_def.code] = task_def
def get_task(self, code: str) -> Optional[TaskDefinition]:
"""获取任务定义"""
return self._tasks.get(code)
def get_all_tasks(self) -> List[TaskDefinition]:
"""获取所有任务"""
return list(self._tasks.values())
def get_ods_tasks(self) -> List[TaskDefinition]:
"""获取所有 ODS 任务"""
return [t for t in self._tasks.values() if t.is_ods]
def get_fact_ods_tasks(self) -> List[TaskDefinition]:
"""获取事实类 ODS 任务(需要时间窗口)"""
return [t for t in self._tasks.values() if t.is_ods and not t.is_dimension]
def get_dimension_ods_tasks(self) -> List[TaskDefinition]:
"""获取维度类 ODS 任务"""
return [t for t in self._tasks.values() if t.is_ods and t.is_dimension]
def get_tasks_by_domain(self, domain: BusinessDomain) -> List[TaskDefinition]:
"""按业务域获取任务"""
return [t for t in self._tasks.values() if t.domain == domain]
def get_ods_tasks_grouped(self) -> Dict[BusinessDomain, List[TaskDefinition]]:
"""获取按业务域分组的 ODS 任务"""
grouped: Dict[BusinessDomain, List[TaskDefinition]] = {}
for task in self.get_ods_tasks():
if task.domain not in grouped:
grouped[task.domain] = []
grouped[task.domain].append(task)
return grouped
def get_non_ods_tasks(self) -> List[TaskDefinition]:
"""获取非 ODS 任务"""
return [t for t in self._tasks.values() if not t.is_ods]
# 全局注册表实例
task_registry = TaskRegistry()
# 便捷函数
def get_ods_task_codes() -> List[str]:
"""获取所有 ODS 任务编码"""
return [t.code for t in task_registry.get_ods_tasks()]
def get_fact_ods_task_codes() -> List[str]:
"""获取事实类 ODS 任务编码"""
return [t.code for t in task_registry.get_fact_ods_tasks()]
def get_dimension_ods_task_codes() -> List[str]:
"""获取维度类 ODS 任务编码"""
return [t.code for t in task_registry.get_dimension_ods_tasks()]
def get_all_task_tuples() -> List[Tuple[str, str, str]]:
"""获取所有任务的 (code, name, description) 元组列表"""
return [(t.code, t.name, t.description) for t in task_registry.get_all_tasks()]
def get_ods_tasks_for_ui() -> List[Tuple[str, str, BusinessDomain]]:
"""获取 ODS 任务列表供 UI 使用:(code, display_name, domain)"""
return [(t.code, t.name, t.domain) for t in task_registry.get_ods_tasks()]