126 lines
5.2 KiB
Python
126 lines
5.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Task: report last successful cursor cutoff times from etl_admin."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from .base_task import BaseTask
|
|
|
|
|
|
class CheckCutoffTask(BaseTask):
|
|
"""Report per-task cursor cutoff times (etl_admin.etl_cursor.last_end)."""
|
|
|
|
def get_task_code(self) -> str:
|
|
return "CHECK_CUTOFF"
|
|
|
|
def execute(self, cursor_data: dict | None = None) -> dict:
|
|
store_id = int(self.config.get("app.store_id"))
|
|
filter_codes = self.config.get("run.cutoff_task_codes") or None
|
|
if isinstance(filter_codes, str):
|
|
filter_codes = [c.strip().upper() for c in filter_codes.split(",") if c.strip()]
|
|
|
|
sql = """
|
|
SELECT
|
|
t.task_code,
|
|
c.last_start,
|
|
c.last_end,
|
|
c.last_id,
|
|
c.last_run_id,
|
|
c.updated_at
|
|
FROM etl_admin.etl_task t
|
|
LEFT JOIN etl_admin.etl_cursor c
|
|
ON c.task_id = t.task_id AND c.store_id = t.store_id
|
|
WHERE t.store_id = %s
|
|
AND t.enabled = TRUE
|
|
ORDER BY t.task_code
|
|
"""
|
|
rows = self.db.query(sql, (store_id,))
|
|
|
|
if filter_codes:
|
|
wanted = {str(c).upper() for c in filter_codes}
|
|
rows = [r for r in rows if str(r.get("task_code", "")).upper() in wanted]
|
|
|
|
def _ts(v: Any) -> str:
|
|
return "-" if not v else str(v)
|
|
|
|
self.logger.info("截止时间检查: 门店ID=%s 启用任务数=%s", store_id, len(rows))
|
|
for r in rows:
|
|
self.logger.info(
|
|
"截止时间检查: %-24s 结束时间=%s 开始时间=%s 运行ID=%s",
|
|
str(r.get("task_code") or ""),
|
|
_ts(r.get("last_end")),
|
|
_ts(r.get("last_start")),
|
|
_ts(r.get("last_run_id")),
|
|
)
|
|
|
|
cutoff_candidates = [
|
|
r.get("last_end")
|
|
for r in rows
|
|
if r.get("last_end") is not None and not str(r.get("task_code", "")).upper().startswith("INIT_")
|
|
]
|
|
cutoff = min(cutoff_candidates) if cutoff_candidates else None
|
|
self.logger.info("截止时间检查: 总体截止时间(最小结束时间,排除INIT_*)=%s", _ts(cutoff))
|
|
|
|
ods_fetched = self._probe_ods_fetched_at(store_id)
|
|
if ods_fetched:
|
|
non_null = [v["max_fetched_at"] for v in ods_fetched.values() if v.get("max_fetched_at") is not None]
|
|
ods_cutoff = min(non_null) if non_null else None
|
|
self.logger.info("截止时间检查: ODS截止时间(最小抓取时间)=%s", _ts(ods_cutoff))
|
|
worst = sorted(
|
|
((k, v.get("max_fetched_at")) for k, v in ods_fetched.items()),
|
|
key=lambda kv: (kv[1] is None, kv[1]),
|
|
)[:8]
|
|
for table, mx in worst:
|
|
self.logger.info("截止时间检查: ODS表=%s 最大抓取时间=%s", table, _ts(mx))
|
|
|
|
dw_checks = self._probe_dw_time_columns()
|
|
for name, value in dw_checks.items():
|
|
self.logger.info("截止时间检查: %s=%s", name, _ts(value))
|
|
|
|
return {
|
|
"status": "SUCCESS",
|
|
"counts": {"fetched": len(rows), "inserted": 0, "updated": 0, "skipped": 0, "errors": 0},
|
|
"window": None,
|
|
"request_params": {"store_id": store_id, "filter_task_codes": filter_codes or []},
|
|
"report": {
|
|
"rows": rows,
|
|
"overall_cutoff": cutoff,
|
|
"ods_fetched_at": ods_fetched,
|
|
"dw_max_times": dw_checks,
|
|
},
|
|
}
|
|
|
|
def _probe_ods_fetched_at(self, store_id: int) -> dict[str, dict[str, Any]]:
|
|
try:
|
|
from tasks.dwd_load_task import DwdLoadTask # local import to avoid circulars
|
|
except Exception:
|
|
return {}
|
|
|
|
ods_tables = sorted({str(t) for t in DwdLoadTask.TABLE_MAP.values() if str(t).startswith("billiards_ods.")})
|
|
results: dict[str, dict[str, Any]] = {}
|
|
for table in ods_tables:
|
|
try:
|
|
row = self.db.query(f"SELECT MAX(fetched_at) AS mx, COUNT(*) AS cnt FROM {table}")[0]
|
|
results[table] = {"max_fetched_at": row.get("mx"), "count": row.get("cnt")}
|
|
except Exception as exc: # noqa: BLE001
|
|
results[table] = {"max_fetched_at": None, "count": None, "error": str(exc)}
|
|
return results
|
|
|
|
def _probe_dw_time_columns(self) -> dict[str, Any]:
|
|
checks: dict[str, Any] = {}
|
|
probes = {
|
|
"DWD.max_settlement_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_settlement_head",
|
|
"DWD.max_payment_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_payment",
|
|
"DWD.max_refund_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_refund",
|
|
"DWS.max_order_date": "SELECT MAX(order_date) AS mx FROM billiards_dws.dws_order_summary",
|
|
"DWS.max_updated_at": "SELECT MAX(updated_at) AS mx FROM billiards_dws.dws_order_summary",
|
|
}
|
|
for name, sql2 in probes.items():
|
|
try:
|
|
row = self.db.query(sql2)[0]
|
|
checks[name] = row.get("mx")
|
|
except Exception as exc: # noqa: BLE001
|
|
checks[name] = f"ERROR: {exc}"
|
|
return checks
|