# -*- coding: utf-8 -*- """Task: report last successful cursor cutoff times from etl_admin.""" from __future__ import annotations from typing import Any from .base_task import BaseTask class CheckCutoffTask(BaseTask): """Report per-task cursor cutoff times (etl_admin.etl_cursor.last_end).""" def get_task_code(self) -> str: return "CHECK_CUTOFF" def execute(self, cursor_data: dict | None = None) -> dict: store_id = int(self.config.get("app.store_id")) filter_codes = self.config.get("run.cutoff_task_codes") or None if isinstance(filter_codes, str): filter_codes = [c.strip().upper() for c in filter_codes.split(",") if c.strip()] sql = """ SELECT t.task_code, c.last_start, c.last_end, c.last_id, c.last_run_id, c.updated_at FROM etl_admin.etl_task t LEFT JOIN etl_admin.etl_cursor c ON c.task_id = t.task_id AND c.store_id = t.store_id WHERE t.store_id = %s AND t.enabled = TRUE ORDER BY t.task_code """ rows = self.db.query(sql, (store_id,)) if filter_codes: wanted = {str(c).upper() for c in filter_codes} rows = [r for r in rows if str(r.get("task_code", "")).upper() in wanted] def _ts(v: Any) -> str: return "-" if not v else str(v) self.logger.info("截止时间检查: 门店ID=%s 启用任务数=%s", store_id, len(rows)) for r in rows: self.logger.info( "截止时间检查: %-24s 结束时间=%s 开始时间=%s 运行ID=%s", str(r.get("task_code") or ""), _ts(r.get("last_end")), _ts(r.get("last_start")), _ts(r.get("last_run_id")), ) cutoff_candidates = [ r.get("last_end") for r in rows if r.get("last_end") is not None and not str(r.get("task_code", "")).upper().startswith("INIT_") ] cutoff = min(cutoff_candidates) if cutoff_candidates else None self.logger.info("截止时间检查: 总体截止时间(最小结束时间,排除INIT_*)=%s", _ts(cutoff)) ods_fetched = self._probe_ods_fetched_at(store_id) if ods_fetched: non_null = [v["max_fetched_at"] for v in ods_fetched.values() if v.get("max_fetched_at") is not None] ods_cutoff = min(non_null) if non_null else None self.logger.info("截止时间检查: ODS截止时间(最小抓取时间)=%s", _ts(ods_cutoff)) worst = sorted( ((k, v.get("max_fetched_at")) for k, v in ods_fetched.items()), key=lambda kv: (kv[1] is None, kv[1]), )[:8] for table, mx in worst: self.logger.info("截止时间检查: ODS表=%s 最大抓取时间=%s", table, _ts(mx)) dw_checks = self._probe_dw_time_columns() for name, value in dw_checks.items(): self.logger.info("截止时间检查: %s=%s", name, _ts(value)) return { "status": "SUCCESS", "counts": {"fetched": len(rows), "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}, "window": None, "request_params": {"store_id": store_id, "filter_task_codes": filter_codes or []}, "report": { "rows": rows, "overall_cutoff": cutoff, "ods_fetched_at": ods_fetched, "dw_max_times": dw_checks, }, } def _probe_ods_fetched_at(self, store_id: int) -> dict[str, dict[str, Any]]: try: from tasks.dwd_load_task import DwdLoadTask # local import to avoid circulars except Exception: return {} ods_tables = sorted({str(t) for t in DwdLoadTask.TABLE_MAP.values() if str(t).startswith("billiards_ods.")}) results: dict[str, dict[str, Any]] = {} for table in ods_tables: try: row = self.db.query(f"SELECT MAX(fetched_at) AS mx, COUNT(*) AS cnt FROM {table}")[0] results[table] = {"max_fetched_at": row.get("mx"), "count": row.get("cnt")} except Exception as exc: # noqa: BLE001 results[table] = {"max_fetched_at": None, "count": None, "error": str(exc)} return results def _probe_dw_time_columns(self) -> dict[str, Any]: checks: dict[str, Any] = {} probes = { "DWD.max_settlement_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_settlement_head", "DWD.max_payment_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_payment", "DWD.max_refund_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_refund", "DWS.max_order_date": "SELECT MAX(order_date) AS mx FROM billiards_dws.dws_order_summary", "DWS.max_updated_at": "SELECT MAX(updated_at) AS mx FROM billiards_dws.dws_order_summary", } for name, sql2 in probes.items(): try: row = self.db.query(sql2)[0] checks[name] = row.get("mx") except Exception as exc: # noqa: BLE001 checks[name] = f"ERROR: {exc}" return checks