Files
feiqiu-ETL/etl_billiards/orchestration/scheduler.py
2025-11-18 02:32:00 +08:00

132 lines
4.5 KiB
Python

# -*- coding: utf-8 -*-
"""ETL调度器"""
import uuid
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from database.connection import DatabaseConnection
from database.operations import DatabaseOperations
from api.client import APIClient
from orchestration.cursor_manager import CursorManager
from orchestration.run_tracker import RunTracker
from orchestration.task_registry import default_registry
class ETLScheduler:
"""ETL任务调度器"""
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.tz = ZoneInfo(config.get("app.timezone", "Asia/Taipei"))
# 初始化组件
self.db_conn = DatabaseConnection(
dsn=config["db"]["dsn"],
session=config["db"].get("session"),
connect_timeout=config["db"].get("connect_timeout_sec")
)
self.db_ops = DatabaseOperations(self.db_conn)
self.api_client = APIClient(
base_url=config["api"]["base_url"],
token=config["api"]["token"],
timeout=config["api"]["timeout_sec"],
retry_max=config["api"]["retries"]["max_attempts"],
headers_extra=config["api"].get("headers_extra")
)
self.cursor_mgr = CursorManager(self.db_conn)
self.run_tracker = RunTracker(self.db_conn)
self.task_registry = default_registry
def run_tasks(self, task_codes: list = None):
"""运行任务列表"""
run_uuid = uuid.uuid4().hex
store_id = self.config.get("app.store_id")
if not task_codes:
task_codes = self.config.get("run.tasks", [])
self.logger.info(f"开始运行任务: {task_codes}, run_uuid={run_uuid}")
for task_code in task_codes:
try:
self._run_single_task(task_code, run_uuid, store_id)
except Exception as e:
self.logger.error(f"任务 {task_code} 失败: {e}", exc_info=True)
continue
self.logger.info("所有任务执行完成")
def _run_single_task(self, task_code: str, run_uuid: str, store_id: int):
"""运行单个任务"""
# 创建任务实例
task = self.task_registry.create_task(
task_code, self.config, self.db_ops, self.api_client, self.logger
)
# 获取任务配置(从数据库)
task_cfg = self._load_task_config(task_code, store_id)
if not task_cfg:
self.logger.warning(f"任务 {task_code} 未启用或不存在")
return
task_id = task_cfg["task_id"]
# 创建运行记录
export_dir = Path(self.config["io"]["export_root"]) / datetime.now(self.tz).strftime("%Y%m%d")
log_path = str(Path(self.config["io"]["log_root"]) / f"{run_uuid}.log")
run_id = self.run_tracker.create_run(
task_id=task_id,
store_id=store_id,
run_uuid=run_uuid,
export_dir=str(export_dir),
log_path=log_path,
status="RUNNING"
)
# 执行任务
try:
result = task.execute()
# 更新运行记录
self.run_tracker.update_run(
run_id=run_id,
counts=result["counts"],
status=result["status"],
ended_at=datetime.now(self.tz)
)
# 推进游标
if result["status"] == "SUCCESS":
# TODO: 从任务结果中获取窗口信息
pass
except Exception as e:
self.run_tracker.update_run(
run_id=run_id,
counts={},
status="FAIL",
ended_at=datetime.now(self.tz),
error_message=str(e)
)
raise
def _load_task_config(self, task_code: str, store_id: int) -> dict:
"""从数据库加载任务配置"""
sql = """
SELECT task_id, task_code, store_id, enabled, cursor_field,
window_minutes_default, overlap_seconds, page_size, retry_max, params
FROM etl_admin.etl_task
WHERE store_id = %s AND task_code = %s AND enabled = TRUE
"""
rows = self.db_conn.query(sql, (store_id, task_code))
return rows[0] if rows else None
def close(self):
"""关闭连接"""
self.db_conn.close()