# -*- coding: utf-8 -*- """游标管理器""" from datetime import datetime class CursorManager: """ETL游标管理""" def __init__(self, db_connection): self.db = db_connection def get_or_create(self, task_id: int, store_id: int) -> dict: """获取或创建游标""" rows = self.db.query( "SELECT * FROM etl_admin.etl_cursor WHERE task_id=%s AND store_id=%s", (task_id, store_id) ) if rows: return rows[0] # 创建新游标 self.db.execute( """ INSERT INTO etl_admin.etl_cursor(task_id, store_id, last_start, last_end, last_id, extra) VALUES(%s, %s, NULL, NULL, NULL, '{}'::jsonb) """, (task_id, store_id) ) self.db.commit() rows = self.db.query( "SELECT * FROM etl_admin.etl_cursor WHERE task_id=%s AND store_id=%s", (task_id, store_id) ) return rows[0] if rows else None def advance(self, task_id: int, store_id: int, window_start: datetime, window_end: datetime, run_id: int, last_id: int = None): """推进游标""" if last_id is not None: sql = """ UPDATE etl_admin.etl_cursor SET last_start = %s, last_end = %s, last_id = GREATEST(COALESCE(last_id, 0), %s), last_run_id = %s, updated_at = now() WHERE task_id = %s AND store_id = %s """ self.db.execute(sql, (window_start, window_end, last_id, run_id, task_id, store_id)) else: sql = """ UPDATE etl_admin.etl_cursor SET last_start = %s, last_end = %s, last_run_id = %s, updated_at = now() WHERE task_id = %s AND store_id = %s """ self.db.execute(sql, (window_start, window_end, run_id, task_id, store_id)) self.db.commit()