Files
Neo-ZQYY/apps/etl/connectors/feiqiu/orchestration/task_registry.py

185 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# AI_CHANGELOG
# - 2026-02-14 | 删除废弃代码残留:移除重复的 ODS_TASK_CLASSES 注册循环(底部),保留唯一一处(顶部)
# 直接原因: 原文件有两处 `for code, task_cls in ODS_TASK_CLASSES.items()` 循环,导致重复注册
# 验证: `python -c "from orchestration.task_registry import default_registry; print(len(default_registry.get_all_task_codes()))"` → 52
# CHANGE [2026-02-14] intent: 移除 14 个废弃独立 ODS 任务 + 3 个废弃独立 DWD 任务的导入与注册
# assumptions: 这些任务写入不存在的 billiards.* schema已被通用 ODS 任务ods.*)替代
# prompt: "删除废弃的独立 ODS/DWD 任务及其 loader"
# 验证: pytest tests/unit -x 确认无 import 错误
"""任务注册表"""
from dataclasses import dataclass, field
# ODS 层任务(仅保留通用 ODS 任务工厂 + JSON 归档)
from tasks.ods.ods_tasks import ODS_TASK_CLASSES
from tasks.ods.ods_json_archive_task import OdsJsonArchiveTask
# DWD 层任务(仅保留核心装载 + 质量检查)
from tasks.dwd.dwd_load_task import DwdLoadTask
from tasks.dwd.dwd_quality_task import DwdQualityTask
# 工具类任务
from tasks.utility.manual_ingest_task import ManualIngestTask
from tasks.utility.init_schema_task import InitOdsSchemaTask
from tasks.utility.init_dwd_schema_task import InitDwdSchemaTask
from tasks.utility.init_dws_schema_task import InitDwsSchemaTask
from tasks.utility.check_cutoff_task import CheckCutoffTask
from tasks.utility.dws_build_order_summary_task import DwsBuildOrderSummaryTask
from tasks.utility.data_integrity_task import DataIntegrityTask
from tasks.utility.seed_dws_config_task import SeedDwsConfigTask
# DWS 层任务导入
from tasks.dws import (
AssistantDailyTask,
AssistantMonthlyTask,
AssistantCustomerTask,
AssistantSalaryTask,
AssistantFinanceTask,
MemberConsumptionTask,
MemberVisitTask,
FinanceDailyTask,
FinanceRechargeTask,
FinanceIncomeStructureTask,
FinanceDiscountDetailTask,
# 库存汇总任务
GoodsStockDailyTask,
GoodsStockWeeklyTask,
GoodsStockMonthlyTask,
# 指数算法任务
WinbackIndexTask,
NewconvIndexTask,
MlManualImportTask,
RelationIndexTask,
SpendingPowerIndexTask,
)
# CHANGE [2026-07-14] intent: 合并 MV 刷新 + 数据清理为 DWS_MAINTENANCE
from tasks.dws.maintenance_task import DwsMaintenanceTask
@dataclass
class TaskMeta:
"""任务元数据"""
task_class: type
requires_db_config: bool = True
layer: str | None = None # "ODS" / "DWD" / "DWS" / "INDEX" / None
task_type: str = "etl" # "etl" / "utility" / "verification"
depends_on: list[str] = field(default_factory=list) # 依赖的任务代码列表
class TaskRegistry:
"""任务注册和工厂"""
def __init__(self):
self._tasks: dict[str, TaskMeta] = {}
def register(
self,
task_code: str,
task_class: type,
requires_db_config: bool = True,
layer: str | None = None,
task_type: str = "etl",
depends_on: list[str] | None = None,
):
"""注册任务类及其元数据。向后兼容:仅传 task_code 和 task_class 时使用默认值。"""
self._tasks[task_code.upper()] = TaskMeta(
task_class=task_class,
requires_db_config=requires_db_config,
layer=layer,
task_type=task_type,
depends_on=depends_on or [],
)
def create_task(self, task_code: str, config, db_connection, api_client, logger):
"""创建任务实例"""
task_code = task_code.upper()
if task_code not in self._tasks:
raise ValueError(f"未知的任务类型: {task_code}")
task_class = self._tasks[task_code].task_class
return task_class(config, db_connection, api_client, logger)
def get_metadata(self, task_code: str) -> TaskMeta | None:
"""查询任务元数据。"""
return self._tasks.get(task_code.upper())
def get_tasks_by_layer(self, layer: str) -> list[str]:
"""获取指定层的所有任务代码。"""
return [
code for code, meta in self._tasks.items()
if meta.layer and meta.layer.upper() == layer.upper()
]
def is_utility_task(self, task_code: str) -> bool:
"""判断是否为工具类任务(不需要游标/运行记录)。"""
meta = self.get_metadata(task_code)
return meta is not None and not meta.requires_db_config
def get_all_task_codes(self) -> list[str]:
"""获取所有已注册的任务代码"""
return list(self._tasks.keys())
# 默认注册表
default_registry = TaskRegistry()
# ── ODS 层:通用 ODS 任务(由 ODS_TASK_CLASSES 动态生成)─────
for code, task_cls in ODS_TASK_CLASSES.items():
default_registry.register(code, task_cls, layer="ODS")
# ── DWD 层任务 ────────────────────────────────────────────────
default_registry.register("DWD_LOAD_FROM_ODS", DwdLoadTask, layer="DWD")
default_registry.register("DWD_QUALITY_CHECK", DwdQualityTask, requires_db_config=False, layer="DWD", task_type="verification")
# ── 工具类任务 ────────────────────────────────────────────────
default_registry.register("MANUAL_INGEST", ManualIngestTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_ODS_SCHEMA", InitOdsSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_DWD_SCHEMA", InitDwdSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("INIT_DWS_SCHEMA", InitDwsSchemaTask, requires_db_config=False, task_type="utility")
default_registry.register("ODS_JSON_ARCHIVE", OdsJsonArchiveTask, requires_db_config=False, task_type="utility")
default_registry.register("CHECK_CUTOFF", CheckCutoffTask, requires_db_config=False, task_type="utility")
default_registry.register("SEED_DWS_CONFIG", SeedDwsConfigTask, task_type="utility")
# ── 校验类任务 ────────────────────────────────────────────────
default_registry.register("DATA_INTEGRITY_CHECK", DataIntegrityTask, requires_db_config=False, task_type="verification")
# ── DWS 层业务任务 ────────────────────────────────────────────
default_registry.register("DWS_BUILD_ORDER_SUMMARY", DwsBuildOrderSummaryTask, requires_db_config=False, layer="DWS")
default_registry.register("DWS_ASSISTANT_DAILY", AssistantDailyTask, layer="DWS")
# CHANGE [2026-07-17] intent: 为已知依赖关系添加 depends_on 声明(需求 8.1, 8.2
default_registry.register("DWS_ASSISTANT_MONTHLY", AssistantMonthlyTask, layer="DWS", depends_on=["DWS_ASSISTANT_DAILY"])
default_registry.register("DWS_ASSISTANT_CUSTOMER", AssistantCustomerTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_SALARY", AssistantSalaryTask, layer="DWS")
default_registry.register("DWS_ASSISTANT_FINANCE", AssistantFinanceTask, layer="DWS", depends_on=["DWS_ASSISTANT_SALARY"])
default_registry.register("DWS_MEMBER_CONSUMPTION", MemberConsumptionTask, layer="DWS")
default_registry.register("DWS_MEMBER_VISIT", MemberVisitTask, layer="DWS")
default_registry.register("DWS_FINANCE_DAILY", FinanceDailyTask, layer="DWS")
default_registry.register("DWS_FINANCE_RECHARGE", FinanceRechargeTask, layer="DWS")
default_registry.register("DWS_FINANCE_INCOME_STRUCTURE", FinanceIncomeStructureTask, layer="DWS")
default_registry.register("DWS_FINANCE_DISCOUNT_DETAIL", FinanceDiscountDetailTask, layer="DWS")
# CHANGE [2026-07-20] intent: 注册 DWS 库存汇总任务(日/周/月),依赖 DWD 装载完成(需求 12.9
default_registry.register("DWS_GOODS_STOCK_DAILY", GoodsStockDailyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"])
default_registry.register("DWS_GOODS_STOCK_WEEKLY", GoodsStockWeeklyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"])
default_registry.register("DWS_GOODS_STOCK_MONTHLY", GoodsStockMonthlyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"])
# CHANGE [2026-07-14] intent: 移除 DWS_RETENTION_CLEANUP / DWS_MV_REFRESH_FINANCE_DAILY / DWS_MV_REFRESH_ASSISTANT_DAILY
# 替换为统一维护任务 DWS_MAINTENANCE需求 4.5
# depends_on: 所有其他 DWS 任务——MV 刷新和清理应在数据写入后执行
default_registry.register("DWS_MAINTENANCE", DwsMaintenanceTask, layer="DWS", depends_on=[
"DWS_ASSISTANT_DAILY", "DWS_ASSISTANT_MONTHLY", "DWS_ASSISTANT_CUSTOMER",
"DWS_ASSISTANT_SALARY", "DWS_ASSISTANT_FINANCE",
"DWS_MEMBER_CONSUMPTION", "DWS_MEMBER_VISIT",
"DWS_FINANCE_DAILY", "DWS_FINANCE_RECHARGE",
"DWS_FINANCE_INCOME_STRUCTURE", "DWS_FINANCE_DISCOUNT_DETAIL",
"DWS_BUILD_ORDER_SUMMARY",
"DWS_GOODS_STOCK_DAILY", "DWS_GOODS_STOCK_WEEKLY", "DWS_GOODS_STOCK_MONTHLY",
])
# ── INDEX 层:指数算法任务 ────────────────────────────────────
# CHANGE [2026-07-17] intent: 为指数任务添加 depends_on 声明(需求 8.1, 8.2
default_registry.register("DWS_WINBACK_INDEX", WinbackIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_VISIT", "DWS_MEMBER_CONSUMPTION"])
default_registry.register("DWS_NEWCONV_INDEX", NewconvIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_VISIT", "DWS_MEMBER_CONSUMPTION"])
default_registry.register("DWS_ML_MANUAL_IMPORT", MlManualImportTask, requires_db_config=False, layer="INDEX")
default_registry.register("DWS_RELATION_INDEX", RelationIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_ASSISTANT_DAILY"])
default_registry.register("DWS_SPENDING_POWER_INDEX", SpendingPowerIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_CONSUMPTION"])