Files
ZQYY.FQ-ETL/orchestration/task_registry.py

194 lines
9.7 KiB
Python

# -*- coding: utf-8 -*-
"""任务注册表"""
from dataclasses import dataclass
# ODS 层任务
from tasks.ods.orders_task import OrdersTask
from tasks.ods.payments_task import PaymentsTask
from tasks.ods.members_task import MembersTask
from tasks.ods.products_task import ProductsTask
from tasks.ods.tables_task import TablesTask
from tasks.ods.assistants_task import AssistantsTask
from tasks.ods.packages_task import PackagesDefTask
from tasks.ods.refunds_task import RefundsTask
from tasks.ods.coupon_usage_task import CouponUsageTask
from tasks.ods.inventory_change_task import InventoryChangeTask
from tasks.ods.topups_task import TopupsTask
from tasks.ods.table_discount_task import TableDiscountTask
from tasks.ods.assistant_abolish_task import AssistantAbolishTask
from tasks.ods.ledger_task import LedgerTask
from tasks.ods.ods_tasks import ODS_TASK_CLASSES
from tasks.ods.ods_json_archive_task import OdsJsonArchiveTask
# DWD 层任务
from tasks.dwd.payments_dwd_task import PaymentsDwdTask
from tasks.dwd.members_dwd_task import MembersDwdTask
from tasks.dwd.dwd_load_task import DwdLoadTask
from tasks.dwd.ticket_dwd_task import TicketDwdTask
from tasks.dwd.dwd_quality_task import DwdQualityTask
# 工具类任务
from tasks.utility.manual_ingest_task import ManualIngestTask
from tasks.utility.init_schema_task import InitOdsSchemaTask
from tasks.utility.init_dwd_schema_task import InitDwdSchemaTask
from tasks.utility.init_dws_schema_task import InitDwsSchemaTask
from tasks.utility.check_cutoff_task import CheckCutoffTask
from tasks.utility.dws_build_order_summary_task import DwsBuildOrderSummaryTask
from tasks.utility.data_integrity_task import DataIntegrityTask
from tasks.utility.seed_dws_config_task import SeedDwsConfigTask
# DWS 层任务导入
from tasks.dws import (
AssistantDailyTask,
AssistantMonthlyTask,
AssistantCustomerTask,
AssistantSalaryTask,
AssistantFinanceTask,
MemberConsumptionTask,
MemberVisitTask,
FinanceDailyTask,
FinanceRechargeTask,
FinanceIncomeStructureTask,
FinanceDiscountDetailTask,
DwsRetentionCleanupTask,
DwsMvRefreshFinanceDailyTask,
DwsMvRefreshAssistantDailyTask,
# 指数算法任务
RecallIndexTask,
IntimacyIndexTask,
WinbackIndexTask,
NewconvIndexTask,
MlManualImportTask,
RelationIndexTask,
)
@dataclass
class TaskMeta:
"""任务元数据"""
task_class: type
requires_db_config: bool = True
layer: str | None = None # "ODS" / "DWD" / "DWS" / "INDEX" / None
task_type: str = "etl" # "etl" / "utility" / "verification"
class TaskRegistry:
"""任务注册和工厂"""
def __init__(self):
self._tasks: dict[str, TaskMeta] = {}
def register(
self,
task_code: str,
task_class: type,
requires_db_config: bool = True,
layer: str | None = None,
task_type: str = "etl",
):
"""注册任务类及其元数据。向后兼容:仅传 task_code 和 task_class 时使用默认值。"""
self._tasks[task_code.upper()] = TaskMeta(
task_class=task_class,
requires_db_config=requires_db_config,
layer=layer,
task_type=task_type,
)
def create_task(self, task_code: str, config, db_connection, api_client, logger):
"""创建任务实例"""
task_code = task_code.upper()
if task_code not in self._tasks:
raise ValueError(f"未知的任务类型: {task_code}")
task_class = self._tasks[task_code].task_class
return task_class(config, db_connection, api_client, logger)
def get_metadata(self, task_code: str) -> TaskMeta | None:
"""查询任务元数据。"""
return self._tasks.get(task_code.upper())
def get_tasks_by_layer(self, layer: str) -> list[str]:
"""获取指定层的所有任务代码。"""
return [
code for code, meta in self._tasks.items()
if meta.layer and meta.layer.upper() == layer.upper()
]
def is_utility_task(self, task_code: str) -> bool:
"""判断是否为工具类任务(不需要游标/运行记录)。"""
meta = self.get_metadata(task_code)
return meta is not None and not meta.requires_db_config
def get_all_task_codes(self) -> list[str]:
"""获取所有已注册的任务代码"""
return list(self._tasks.keys())
# 默认注册表
default_registry = TaskRegistry()
# ── ODS 层:基础抓取任务 ──────────────────────────────────────
default_registry.register("PRODUCTS", ProductsTask, layer="ODS")
default_registry.register("TABLES", TablesTask, layer="ODS")
default_registry.register("MEMBERS", MembersTask, layer="ODS")
default_registry.register("ASSISTANTS", AssistantsTask, layer="ODS")
default_registry.register("PACKAGES_DEF", PackagesDefTask, layer="ODS")
default_registry.register("ORDERS", OrdersTask, layer="ODS")
default_registry.register("PAYMENTS", PaymentsTask, layer="ODS")
default_registry.register("REFUNDS", RefundsTask, layer="ODS")
default_registry.register("COUPON_USAGE", CouponUsageTask, layer="ODS")
default_registry.register("INVENTORY_CHANGE", InventoryChangeTask, layer="ODS")
default_registry.register("TOPUPS", TopupsTask, layer="ODS")
default_registry.register("TABLE_DISCOUNT", TableDiscountTask, layer="ODS")
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask, layer="ODS")
default_registry.register("LEDGER", LedgerTask, layer="ODS")
# ── DWD 层任务 ────────────────────────────────────────────────
default_registry.register("TICKET_DWD", TicketDwdTask, layer="DWD")
default_registry.register("PAYMENTS_DWD", PaymentsDwdTask, layer="DWD")
default_registry.register("MEMBERS_DWD", MembersDwdTask, layer="DWD")
default_registry.register("DWD_LOAD_FROM_ODS", DwdLoadTask, layer="DWD")
default_registry.register("DWD_QUALITY_CHECK", DwdQualityTask, requires_db_config=False, layer="DWD", task_type="verification")
# ── 工具类任务 ────────────────────────────────────────────────
default_registry.register("MANUAL_INGEST", ManualIngestTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_ODS_SCHEMA", InitOdsSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_DWD_SCHEMA", InitDwdSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_DWS_SCHEMA", InitDwsSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("ODS_JSON_ARCHIVE", OdsJsonArchiveTask, requires_db_config=False, task_type="utility")
default_registry.register("CHECK_CUTOFF", CheckCutoffTask, requires_db_config=False, task_type="utility")
default_registry.register("SEED_DWS_CONFIG", SeedDwsConfigTask, task_type="utility")
# ── 校验类任务 ────────────────────────────────────────────────
default_registry.register("DATA_INTEGRITY_CHECK", DataIntegrityTask, requires_db_config=False, task_type="verification")
# ── DWS 层业务任务 ────────────────────────────────────────────
default_registry.register("DWS_BUILD_ORDER_SUMMARY", DwsBuildOrderSummaryTask, requires_db_config=False, layer="DWS")
default_registry.register("DWS_ASSISTANT_DAILY", AssistantDailyTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_MONTHLY", AssistantMonthlyTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_CUSTOMER", AssistantCustomerTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_SALARY", AssistantSalaryTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_FINANCE", AssistantFinanceTask, layer="DWS")
default_registry.register("DWS_MEMBER_CONSUMPTION", MemberConsumptionTask, layer="DWS")
default_registry.register("DWS_MEMBER_VISIT", MemberVisitTask, layer="DWS")
default_registry.register("DWS_FINANCE_DAILY", FinanceDailyTask, layer="DWS")
default_registry.register("DWS_FINANCE_RECHARGE", FinanceRechargeTask, layer="DWS")
default_registry.register("DWS_FINANCE_INCOME_STRUCTURE", FinanceIncomeStructureTask, layer="DWS")
default_registry.register("DWS_FINANCE_DISCOUNT_DETAIL", FinanceDiscountDetailTask, layer="DWS")
default_registry.register("DWS_RETENTION_CLEANUP", DwsRetentionCleanupTask, layer="DWS")
default_registry.register("DWS_MV_REFRESH_FINANCE_DAILY", DwsMvRefreshFinanceDailyTask, layer="DWS")
default_registry.register("DWS_MV_REFRESH_ASSISTANT_DAILY", DwsMvRefreshAssistantDailyTask, layer="DWS")
# ── INDEX 层:指数算法任务 ────────────────────────────────────
default_registry.register("DWS_RECALL_INDEX", RecallIndexTask, layer="INDEX")
default_registry.register("DWS_WINBACK_INDEX", WinbackIndexTask, requires_db_config=False, layer="INDEX")
default_registry.register("DWS_NEWCONV_INDEX", NewconvIndexTask, requires_db_config=False, layer="INDEX")
default_registry.register("DWS_INTIMACY_INDEX", IntimacyIndexTask, requires_db_config=False, layer="INDEX")
default_registry.register("DWS_ML_MANUAL_IMPORT", MlManualImportTask, requires_db_config=False, layer="INDEX")
default_registry.register("DWS_RELATION_INDEX", RelationIndexTask, requires_db_config=False, layer="INDEX")
# ── ODS 层:通用 ODS 任务(由 ODS_TASK_CLASSES 动态生成)─────
for code, task_cls in ODS_TASK_CLASSES.items():
default_registry.register(code, task_cls, layer="ODS")