代码迁移
This commit is contained in:
0
etl_billiards/orchestration/__init__.py
Normal file
0
etl_billiards/orchestration/__init__.py
Normal file
62
etl_billiards/orchestration/cursor_manager.py
Normal file
62
etl_billiards/orchestration/cursor_manager.py
Normal file
@@ -0,0 +1,62 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""游标管理器"""
|
||||
from datetime import datetime
|
||||
|
||||
class CursorManager:
|
||||
"""ETL游标管理"""
|
||||
|
||||
def __init__(self, db_connection):
|
||||
self.db = db_connection
|
||||
|
||||
def get_or_create(self, task_id: int, store_id: int) -> dict:
|
||||
"""获取或创建游标"""
|
||||
rows = self.db.query(
|
||||
"SELECT * FROM etl_admin.etl_cursor WHERE task_id=%s AND store_id=%s",
|
||||
(task_id, store_id)
|
||||
)
|
||||
|
||||
if rows:
|
||||
return rows[0]
|
||||
|
||||
# 创建新游标
|
||||
self.db.execute(
|
||||
"""
|
||||
INSERT INTO etl_admin.etl_cursor(task_id, store_id, last_start, last_end, last_id, extra)
|
||||
VALUES(%s, %s, NULL, NULL, NULL, '{}'::jsonb)
|
||||
""",
|
||||
(task_id, store_id)
|
||||
)
|
||||
self.db.commit()
|
||||
|
||||
rows = self.db.query(
|
||||
"SELECT * FROM etl_admin.etl_cursor WHERE task_id=%s AND store_id=%s",
|
||||
(task_id, store_id)
|
||||
)
|
||||
return rows[0] if rows else None
|
||||
|
||||
def advance(self, task_id: int, store_id: int, window_start: datetime,
|
||||
window_end: datetime, run_id: int, last_id: int = None):
|
||||
"""推进游标"""
|
||||
if last_id is not None:
|
||||
sql = """
|
||||
UPDATE etl_admin.etl_cursor
|
||||
SET last_start = %s,
|
||||
last_end = %s,
|
||||
last_id = GREATEST(COALESCE(last_id, 0), %s),
|
||||
last_run_id = %s,
|
||||
updated_at = now()
|
||||
WHERE task_id = %s AND store_id = %s
|
||||
"""
|
||||
self.db.execute(sql, (window_start, window_end, last_id, run_id, task_id, store_id))
|
||||
else:
|
||||
sql = """
|
||||
UPDATE etl_admin.etl_cursor
|
||||
SET last_start = %s,
|
||||
last_end = %s,
|
||||
last_run_id = %s,
|
||||
updated_at = now()
|
||||
WHERE task_id = %s AND store_id = %s
|
||||
"""
|
||||
self.db.execute(sql, (window_start, window_end, run_id, task_id, store_id))
|
||||
|
||||
self.db.commit()
|
||||
70
etl_billiards/orchestration/run_tracker.py
Normal file
70
etl_billiards/orchestration/run_tracker.py
Normal file
@@ -0,0 +1,70 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""运行记录追踪器"""
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
class RunTracker:
|
||||
"""ETL运行记录管理"""
|
||||
|
||||
def __init__(self, db_connection):
|
||||
self.db = db_connection
|
||||
|
||||
def create_run(self, task_id: int, store_id: int, run_uuid: str,
|
||||
export_dir: str, log_path: str, status: str,
|
||||
window_start: datetime = None, window_end: datetime = None,
|
||||
window_minutes: int = None, overlap_seconds: int = None,
|
||||
request_params: dict = None) -> int:
|
||||
"""创建运行记录"""
|
||||
sql = """
|
||||
INSERT INTO etl_admin.etl_run(
|
||||
run_uuid, task_id, store_id, status, started_at, window_start, window_end,
|
||||
window_minutes, overlap_seconds, fetched_count, loaded_count, updated_count,
|
||||
skipped_count, error_count, unknown_fields, export_dir, log_path,
|
||||
request_params, manifest, error_message, extra
|
||||
) VALUES (
|
||||
%s, %s, %s, %s, now(), %s, %s, %s, %s, 0, 0, 0, 0, 0, 0, %s, %s, %s,
|
||||
'{}'::jsonb, NULL, '{}'::jsonb
|
||||
)
|
||||
RETURNING run_id
|
||||
"""
|
||||
|
||||
result = self.db.query(
|
||||
sql,
|
||||
(run_uuid, task_id, store_id, status, window_start, window_end,
|
||||
window_minutes, overlap_seconds, export_dir, log_path,
|
||||
json.dumps(request_params or {}, ensure_ascii=False))
|
||||
)
|
||||
|
||||
run_id = result[0]["run_id"]
|
||||
self.db.commit()
|
||||
return run_id
|
||||
|
||||
def update_run(self, run_id: int, counts: dict, status: str,
|
||||
ended_at: datetime = None, manifest: dict = None,
|
||||
error_message: str = None):
|
||||
"""更新运行记录"""
|
||||
sql = """
|
||||
UPDATE etl_admin.etl_run
|
||||
SET fetched_count = %s,
|
||||
loaded_count = %s,
|
||||
updated_count = %s,
|
||||
skipped_count = %s,
|
||||
error_count = %s,
|
||||
unknown_fields = %s,
|
||||
status = %s,
|
||||
ended_at = %s,
|
||||
manifest = %s,
|
||||
error_message = %s
|
||||
WHERE run_id = %s
|
||||
"""
|
||||
|
||||
self.db.execute(
|
||||
sql,
|
||||
(counts.get("fetched", 0), counts.get("inserted", 0),
|
||||
counts.get("updated", 0), counts.get("skipped", 0),
|
||||
counts.get("errors", 0), counts.get("unknown_fields", 0),
|
||||
status, ended_at,
|
||||
json.dumps(manifest or {}, ensure_ascii=False),
|
||||
error_message, run_id)
|
||||
)
|
||||
self.db.commit()
|
||||
131
etl_billiards/orchestration/scheduler.py
Normal file
131
etl_billiards/orchestration/scheduler.py
Normal file
@@ -0,0 +1,131 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""ETL调度器"""
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from database.connection import DatabaseConnection
|
||||
from database.operations import DatabaseOperations
|
||||
from api.client import APIClient
|
||||
from orchestration.cursor_manager import CursorManager
|
||||
from orchestration.run_tracker import RunTracker
|
||||
from orchestration.task_registry import default_registry
|
||||
|
||||
class ETLScheduler:
|
||||
"""ETL任务调度器"""
|
||||
|
||||
def __init__(self, config, logger):
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
self.tz = ZoneInfo(config.get("app.timezone", "Asia/Taipei"))
|
||||
|
||||
# 初始化组件
|
||||
self.db_conn = DatabaseConnection(
|
||||
dsn=config["db"]["dsn"],
|
||||
session=config["db"].get("session"),
|
||||
connect_timeout=config["db"].get("connect_timeout_sec")
|
||||
)
|
||||
self.db_ops = DatabaseOperations(self.db_conn)
|
||||
|
||||
self.api_client = APIClient(
|
||||
base_url=config["api"]["base_url"],
|
||||
token=config["api"]["token"],
|
||||
timeout=config["api"]["timeout_sec"],
|
||||
retry_max=config["api"]["retries"]["max_attempts"],
|
||||
headers_extra=config["api"].get("headers_extra")
|
||||
)
|
||||
|
||||
self.cursor_mgr = CursorManager(self.db_conn)
|
||||
self.run_tracker = RunTracker(self.db_conn)
|
||||
self.task_registry = default_registry
|
||||
|
||||
def run_tasks(self, task_codes: list = None):
|
||||
"""运行任务列表"""
|
||||
run_uuid = uuid.uuid4().hex
|
||||
store_id = self.config.get("app.store_id")
|
||||
|
||||
if not task_codes:
|
||||
task_codes = self.config.get("run.tasks", [])
|
||||
|
||||
self.logger.info(f"开始运行任务: {task_codes}, run_uuid={run_uuid}")
|
||||
|
||||
for task_code in task_codes:
|
||||
try:
|
||||
self._run_single_task(task_code, run_uuid, store_id)
|
||||
except Exception as e:
|
||||
self.logger.error(f"任务 {task_code} 失败: {e}", exc_info=True)
|
||||
continue
|
||||
|
||||
self.logger.info("所有任务执行完成")
|
||||
|
||||
def _run_single_task(self, task_code: str, run_uuid: str, store_id: int):
|
||||
"""运行单个任务"""
|
||||
# 创建任务实例
|
||||
task = self.task_registry.create_task(
|
||||
task_code, self.config, self.db_ops, self.api_client, self.logger
|
||||
)
|
||||
|
||||
# 获取任务配置(从数据库)
|
||||
task_cfg = self._load_task_config(task_code, store_id)
|
||||
if not task_cfg:
|
||||
self.logger.warning(f"任务 {task_code} 未启用或不存在")
|
||||
return
|
||||
|
||||
task_id = task_cfg["task_id"]
|
||||
|
||||
# 创建运行记录
|
||||
export_dir = Path(self.config["io"]["export_root"]) / datetime.now(self.tz).strftime("%Y%m%d")
|
||||
log_path = str(Path(self.config["io"]["log_root"]) / f"{run_uuid}.log")
|
||||
|
||||
run_id = self.run_tracker.create_run(
|
||||
task_id=task_id,
|
||||
store_id=store_id,
|
||||
run_uuid=run_uuid,
|
||||
export_dir=str(export_dir),
|
||||
log_path=log_path,
|
||||
status="RUNNING"
|
||||
)
|
||||
|
||||
# 执行任务
|
||||
try:
|
||||
result = task.execute()
|
||||
|
||||
# 更新运行记录
|
||||
self.run_tracker.update_run(
|
||||
run_id=run_id,
|
||||
counts=result["counts"],
|
||||
status=result["status"],
|
||||
ended_at=datetime.now(self.tz)
|
||||
)
|
||||
|
||||
# 推进游标
|
||||
if result["status"] == "SUCCESS":
|
||||
# TODO: 从任务结果中获取窗口信息
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
self.run_tracker.update_run(
|
||||
run_id=run_id,
|
||||
counts={},
|
||||
status="FAIL",
|
||||
ended_at=datetime.now(self.tz),
|
||||
error_message=str(e)
|
||||
)
|
||||
raise
|
||||
|
||||
def _load_task_config(self, task_code: str, store_id: int) -> dict:
|
||||
"""从数据库加载任务配置"""
|
||||
sql = """
|
||||
SELECT task_id, task_code, store_id, enabled, cursor_field,
|
||||
window_minutes_default, overlap_seconds, page_size, retry_max, params
|
||||
FROM etl_admin.etl_task
|
||||
WHERE store_id = %s AND task_code = %s AND enabled = TRUE
|
||||
"""
|
||||
|
||||
rows = self.db_conn.query(sql, (store_id, task_code))
|
||||
return rows[0] if rows else None
|
||||
|
||||
def close(self):
|
||||
"""关闭连接"""
|
||||
self.db_conn.close()
|
||||
36
etl_billiards/orchestration/task_registry.py
Normal file
36
etl_billiards/orchestration/task_registry.py
Normal file
@@ -0,0 +1,36 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""任务注册表"""
|
||||
from tasks.orders_task import OrdersTask
|
||||
from tasks.payments_task import PaymentsTask
|
||||
from tasks.members_task import MembersTask
|
||||
|
||||
class TaskRegistry:
|
||||
"""任务注册和工厂"""
|
||||
|
||||
def __init__(self):
|
||||
self._tasks = {}
|
||||
|
||||
def register(self, task_code: str, task_class):
|
||||
"""注册任务类"""
|
||||
self._tasks[task_code.upper()] = task_class
|
||||
|
||||
def create_task(self, task_code: str, config, db_connection, api_client, logger):
|
||||
"""创建任务实例"""
|
||||
task_code = task_code.upper()
|
||||
if task_code not in self._tasks:
|
||||
raise ValueError(f"未知的任务类型: {task_code}")
|
||||
|
||||
task_class = self._tasks[task_code]
|
||||
return task_class(config, db_connection, api_client, logger)
|
||||
|
||||
def get_all_task_codes(self) -> list:
|
||||
"""获取所有已注册的任务代码"""
|
||||
return list(self._tasks.keys())
|
||||
|
||||
|
||||
# 默认注册表
|
||||
default_registry = TaskRegistry()
|
||||
default_registry.register("ORDERS", OrdersTask)
|
||||
default_registry.register("PAYMENTS", PaymentsTask)
|
||||
default_registry.register("MEMBERS", MembersTask)
|
||||
# 可以继续注册其他任务...
|
||||
Reference in New Issue
Block a user