Files
feiqiu-ETL/etl_billiards/orchestration/task_registry.py
2025-11-20 01:27:33 +08:00

69 lines
2.8 KiB
Python

# -*- coding: utf-8 -*-
"""任务注册表"""
from tasks.orders_task import OrdersTask
from tasks.payments_task import PaymentsTask
from tasks.members_task import MembersTask
from tasks.products_task import ProductsTask
from tasks.tables_task import TablesTask
from tasks.assistants_task import AssistantsTask
from tasks.packages_task import PackagesDefTask
from tasks.refunds_task import RefundsTask
from tasks.coupon_usage_task import CouponUsageTask
from tasks.inventory_change_task import InventoryChangeTask
from tasks.topups_task import TopupsTask
from tasks.table_discount_task import TableDiscountTask
from tasks.assistant_abolish_task import AssistantAbolishTask
from tasks.ledger_task import LedgerTask
from tasks.ods_tasks import ODS_TASK_CLASSES
from tasks.ticket_dwd_task import TicketDwdTask
from tasks.manual_ingest_task import ManualIngestTask
from tasks.payments_dwd_task import PaymentsDwdTask
from tasks.members_dwd_task import MembersDwdTask
class TaskRegistry:
"""任务注册和工厂"""
def __init__(self):
self._tasks = {}
def register(self, task_code: str, task_class):
"""注册任务类"""
self._tasks[task_code.upper()] = task_class
def create_task(self, task_code: str, config, db_connection, api_client, logger):
"""创建任务实例"""
task_code = task_code.upper()
if task_code not in self._tasks:
raise ValueError(f"未知的任务类型: {task_code}")
task_class = self._tasks[task_code]
return task_class(config, db_connection, api_client, logger)
def get_all_task_codes(self) -> list:
"""获取所有已注册的任务代码"""
return list(self._tasks.keys())
# 默认注册表
default_registry = TaskRegistry()
default_registry.register("PRODUCTS", ProductsTask)
default_registry.register("TABLES", TablesTask)
default_registry.register("MEMBERS", MembersTask)
default_registry.register("ASSISTANTS", AssistantsTask)
default_registry.register("PACKAGES_DEF", PackagesDefTask)
default_registry.register("ORDERS", OrdersTask)
default_registry.register("PAYMENTS", PaymentsTask)
default_registry.register("REFUNDS", RefundsTask)
default_registry.register("COUPON_USAGE", CouponUsageTask)
default_registry.register("INVENTORY_CHANGE", InventoryChangeTask)
default_registry.register("TOPUPS", TopupsTask)
default_registry.register("TABLE_DISCOUNT", TableDiscountTask)
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask)
default_registry.register("LEDGER", LedgerTask)
default_registry.register("TICKET_DWD", TicketDwdTask)
default_registry.register("MANUAL_INGEST", ManualIngestTask)
default_registry.register("PAYMENTS_DWD", PaymentsDwdTask)
default_registry.register("MEMBERS_DWD", MembersDwdTask)
for code, task_cls in ODS_TASK_CLASSES.items():
default_registry.register(code, task_cls)