# -*- coding: utf-8 -*- # AI_CHANGELOG # - 2026-02-14 | 删除废弃代码残留:移除重复的 ODS_TASK_CLASSES 注册循环(底部),保留唯一一处(顶部) # 直接原因: 原文件有两处 `for code, task_cls in ODS_TASK_CLASSES.items()` 循环,导致重复注册 # 验证: `python -c "from orchestration.task_registry import default_registry; print(len(default_registry.get_all_task_codes()))"` → 52 # CHANGE [2026-02-14] intent: 移除 14 个废弃独立 ODS 任务 + 3 个废弃独立 DWD 任务的导入与注册 # assumptions: 这些任务写入不存在的 billiards.* schema,已被通用 ODS 任务(ods.*)替代 # prompt: "删除废弃的独立 ODS/DWD 任务及其 loader" # 验证: pytest tests/unit -x 确认无 import 错误 """任务注册表""" from dataclasses import dataclass, field # ODS 层任务(仅保留通用 ODS 任务工厂 + JSON 归档) from tasks.ods.ods_tasks import ODS_TASK_CLASSES from tasks.ods.ods_json_archive_task import OdsJsonArchiveTask # DWD 层任务(仅保留核心装载 + 质量检查) from tasks.dwd.dwd_load_task import DwdLoadTask from tasks.dwd.dwd_quality_task import DwdQualityTask # 工具类任务 from tasks.utility.manual_ingest_task import ManualIngestTask from tasks.utility.init_schema_task import InitOdsSchemaTask from tasks.utility.init_dwd_schema_task import InitDwdSchemaTask from tasks.utility.init_dws_schema_task import InitDwsSchemaTask from tasks.utility.check_cutoff_task import CheckCutoffTask from tasks.utility.dws_build_order_summary_task import DwsBuildOrderSummaryTask from tasks.utility.data_integrity_task import DataIntegrityTask from tasks.utility.seed_dws_config_task import SeedDwsConfigTask # DWS 层任务导入 from tasks.dws import ( AssistantDailyTask, AssistantOrderContributionTask, AssistantMonthlyTask, AssistantCustomerTask, AssistantSalaryTask, AssistantFinanceTask, MemberConsumptionTask, MemberVisitTask, AssistantProjectTagTask, MemberProjectTagTask, FinanceDailyTask, FinanceRechargeTask, FinanceIncomeStructureTask, FinanceDiscountDetailTask, # 库存汇总任务 GoodsStockDailyTask, GoodsStockWeeklyTask, GoodsStockMonthlyTask, # 指数算法任务 WinbackIndexTask, NewconvIndexTask, MlManualImportTask, RelationIndexTask, SpendingPowerIndexTask, ) # CHANGE [2026-07-14] intent: 合并 MV 刷新 + 数据清理为 DWS_MAINTENANCE from tasks.dws.maintenance_task import DwsMaintenanceTask @dataclass class TaskMeta: """任务元数据""" task_class: type requires_db_config: bool = True layer: str | None = None # "ODS" / "DWD" / "DWS" / "INDEX" / None task_type: str = "etl" # "etl" / "utility" / "verification" depends_on: list[str] = field(default_factory=list) # 依赖的任务代码列表 class TaskRegistry: """任务注册和工厂""" def __init__(self): self._tasks: dict[str, TaskMeta] = {} def register( self, task_code: str, task_class: type, requires_db_config: bool = True, layer: str | None = None, task_type: str = "etl", depends_on: list[str] | None = None, ): """注册任务类及其元数据。向后兼容:仅传 task_code 和 task_class 时使用默认值。""" self._tasks[task_code.upper()] = TaskMeta( task_class=task_class, requires_db_config=requires_db_config, layer=layer, task_type=task_type, depends_on=depends_on or [], ) def create_task(self, task_code: str, config, db_connection, api_client, logger): """创建任务实例""" task_code = task_code.upper() if task_code not in self._tasks: raise ValueError(f"未知的任务类型: {task_code}") task_class = self._tasks[task_code].task_class return task_class(config, db_connection, api_client, logger) def get_metadata(self, task_code: str) -> TaskMeta | None: """查询任务元数据。""" return self._tasks.get(task_code.upper()) def get_tasks_by_layer(self, layer: str) -> list[str]: """获取指定层的所有任务代码。""" return [ code for code, meta in self._tasks.items() if meta.layer and meta.layer.upper() == layer.upper() ] def is_utility_task(self, task_code: str) -> bool: """判断是否为工具类任务(不需要游标/运行记录)。""" meta = self.get_metadata(task_code) return meta is not None and not meta.requires_db_config def get_all_task_codes(self) -> list[str]: """获取所有已注册的任务代码""" return list(self._tasks.keys()) # 默认注册表 default_registry = TaskRegistry() # ── ODS 层:通用 ODS 任务(由 ODS_TASK_CLASSES 动态生成)───── for code, task_cls in ODS_TASK_CLASSES.items(): default_registry.register(code, task_cls, layer="ODS") # ── DWD 层任务 ──────────────────────────────────────────────── default_registry.register("DWD_LOAD_FROM_ODS", DwdLoadTask, layer="DWD") default_registry.register("DWD_QUALITY_CHECK", DwdQualityTask, requires_db_config=False, layer="DWD", task_type="verification") # ── 工具类任务 ──────────────────────────────────────────────── default_registry.register("MANUAL_INGEST", ManualIngestTask, requires_db_config=False, task_type="utility") default_registry.register("INIT_ODS_SCHEMA", InitOdsSchemaTask, requires_db_config=False, task_type="utility") default_registry.register("INIT_DWD_SCHEMA", InitDwdSchemaTask, requires_db_config=False, task_type="utility") default_registry.register("INIT_DWS_SCHEMA", InitDwsSchemaTask, requires_db_config=False, task_type="utility") default_registry.register("ODS_JSON_ARCHIVE", OdsJsonArchiveTask, requires_db_config=False, task_type="utility") default_registry.register("CHECK_CUTOFF", CheckCutoffTask, requires_db_config=False, task_type="utility") default_registry.register("SEED_DWS_CONFIG", SeedDwsConfigTask, task_type="utility") # ── 校验类任务 ──────────────────────────────────────────────── default_registry.register("DATA_INTEGRITY_CHECK", DataIntegrityTask, requires_db_config=False, task_type="verification") # ── DWS 层业务任务 ──────────────────────────────────────────── default_registry.register("DWS_BUILD_ORDER_SUMMARY", DwsBuildOrderSummaryTask, requires_db_config=False, layer="DWS") default_registry.register("DWS_ASSISTANT_DAILY", AssistantDailyTask, layer="DWS") default_registry.register("DWS_ASSISTANT_ORDER_CONTRIBUTION", AssistantOrderContributionTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) # CHANGE [2026-07-17] intent: 为已知依赖关系添加 depends_on 声明(需求 8.1, 8.2) default_registry.register("DWS_ASSISTANT_MONTHLY", AssistantMonthlyTask, layer="DWS", depends_on=["DWS_ASSISTANT_DAILY"]) default_registry.register("DWS_ASSISTANT_CUSTOMER", AssistantCustomerTask, layer="DWS") default_registry.register("DWS_ASSISTANT_SALARY", AssistantSalaryTask, layer="DWS") default_registry.register("DWS_ASSISTANT_FINANCE", AssistantFinanceTask, layer="DWS", depends_on=["DWS_ASSISTANT_SALARY"]) default_registry.register("DWS_MEMBER_CONSUMPTION", MemberConsumptionTask, layer="DWS") default_registry.register("DWS_MEMBER_VISIT", MemberVisitTask, layer="DWS") # CHANGE [2026-03-07] intent: 注册项目标签任务,依赖 DWD 装载完成 default_registry.register("DWS_ASSISTANT_PROJECT_TAG", AssistantProjectTagTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) default_registry.register("DWS_MEMBER_PROJECT_TAG", MemberProjectTagTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) default_registry.register("DWS_FINANCE_DAILY", FinanceDailyTask, layer="DWS") default_registry.register("DWS_FINANCE_RECHARGE", FinanceRechargeTask, layer="DWS") default_registry.register("DWS_FINANCE_INCOME_STRUCTURE", FinanceIncomeStructureTask, layer="DWS") default_registry.register("DWS_FINANCE_DISCOUNT_DETAIL", FinanceDiscountDetailTask, layer="DWS") # CHANGE [2026-07-20] intent: 注册 DWS 库存汇总任务(日/周/月),依赖 DWD 装载完成(需求 12.9) default_registry.register("DWS_GOODS_STOCK_DAILY", GoodsStockDailyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) default_registry.register("DWS_GOODS_STOCK_WEEKLY", GoodsStockWeeklyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) default_registry.register("DWS_GOODS_STOCK_MONTHLY", GoodsStockMonthlyTask, layer="DWS", depends_on=["DWD_LOAD_FROM_ODS"]) # CHANGE [2026-07-14] intent: 移除 DWS_RETENTION_CLEANUP / DWS_MV_REFRESH_FINANCE_DAILY / DWS_MV_REFRESH_ASSISTANT_DAILY, # 替换为统一维护任务 DWS_MAINTENANCE(需求 4.5) # depends_on: 所有其他 DWS 任务——MV 刷新和清理应在数据写入后执行 default_registry.register("DWS_MAINTENANCE", DwsMaintenanceTask, layer="DWS", depends_on=[ "DWS_ASSISTANT_DAILY", "DWS_ASSISTANT_ORDER_CONTRIBUTION", "DWS_ASSISTANT_MONTHLY", "DWS_ASSISTANT_CUSTOMER", "DWS_ASSISTANT_SALARY", "DWS_ASSISTANT_FINANCE", "DWS_MEMBER_CONSUMPTION", "DWS_MEMBER_VISIT", "DWS_ASSISTANT_PROJECT_TAG", "DWS_MEMBER_PROJECT_TAG", "DWS_FINANCE_DAILY", "DWS_FINANCE_RECHARGE", "DWS_FINANCE_INCOME_STRUCTURE", "DWS_FINANCE_DISCOUNT_DETAIL", "DWS_BUILD_ORDER_SUMMARY", "DWS_GOODS_STOCK_DAILY", "DWS_GOODS_STOCK_WEEKLY", "DWS_GOODS_STOCK_MONTHLY", ]) # ── INDEX 层:指数算法任务 ──────────────────────────────────── # CHANGE [2026-07-17] intent: 为指数任务添加 depends_on 声明(需求 8.1, 8.2) default_registry.register("DWS_WINBACK_INDEX", WinbackIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_VISIT", "DWS_MEMBER_CONSUMPTION"]) default_registry.register("DWS_NEWCONV_INDEX", NewconvIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_VISIT", "DWS_MEMBER_CONSUMPTION"]) default_registry.register("DWS_ML_MANUAL_IMPORT", MlManualImportTask, requires_db_config=False, layer="INDEX") default_registry.register("DWS_RELATION_INDEX", RelationIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_ASSISTANT_DAILY"]) default_registry.register("DWS_SPENDING_POWER_INDEX", SpendingPowerIndexTask, requires_db_config=False, layer="INDEX", depends_on=["DWS_MEMBER_CONSUMPTION"])