# -*- coding: utf-8 -*- """ETL 调度器(薄包装层) 已弃用:请直接使用 TaskExecutor 和 PipelineRunner。 保留此类以兼容 GUI 层、run_update.py 等现有调用方。 """ from __future__ import annotations import logging import warnings from typing import Any, Dict, List, Optional from api.client import APIClient from database.connection import DatabaseConnection from database.operations import DatabaseOperations from orchestration.cursor_manager import CursorManager from orchestration.run_tracker import RunTracker from orchestration.task_registry import default_registry from orchestration.task_executor import TaskExecutor from orchestration.pipeline_runner import PipelineRunner # 保留模块级常量以兼容外部引用 PIPELINE_LAYERS = PipelineRunner.PIPELINE_LAYERS class ETLScheduler: """调度器薄包装层(已弃用)。 内部委托 TaskExecutor 和 PipelineRunner 执行。 保留公共接口以兼容现有调用方(run_update.py、GUI 等)。 """ def __init__(self, config, logger): warnings.warn( "ETLScheduler 已弃用,请直接使用 TaskExecutor 和 PipelineRunner", DeprecationWarning, stacklevel=2, ) self.config = config self.logger = logger # 创建资源(与原实现一致) self.db_conn = DatabaseConnection( dsn=config["db"]["dsn"], session=config["db"].get("session"), connect_timeout=config["db"].get("connect_timeout_sec"), ) self.db_ops = DatabaseOperations(self.db_conn) self.api_client = APIClient( base_url=config["api"]["base_url"], token=config["api"]["token"], timeout=config["api"]["timeout_sec"], retry_max=config["api"]["retries"]["max_attempts"], headers_extra=config["api"].get("headers_extra"), ) cursor_mgr = CursorManager(self.db_conn) run_tracker = RunTracker(self.db_conn) self.task_registry = default_registry # 内部组件 self.task_executor = TaskExecutor( config, self.db_ops, self.api_client, cursor_mgr, run_tracker, self.task_registry, logger, ) self.pipeline_runner = PipelineRunner( config, self.task_executor, self.task_registry, self.db_conn, self.api_client, logger, ) def run_tasks(self, task_codes=None) -> list: """执行任务列表(委托 TaskExecutor)。""" if not task_codes: task_codes = self.config.get("run.tasks", []) data_source = str(self.config.get("run.data_source", "hybrid") or "hybrid") return self.task_executor.run_tasks(task_codes, data_source=data_source) def run_pipeline_with_verification(self, **kwargs) -> dict: """执行管道(委托 PipelineRunner)。""" # 从配置读取 data_source(如果调用方未传入) if "data_source" not in kwargs: kwargs["data_source"] = str( self.config.get("run.data_source", "hybrid") or "hybrid" ) return self.pipeline_runner.run(**kwargs) def close(self): """关闭数据库连接。""" self.db_conn.close()