This commit is contained in:
Neo
2026-01-27 22:45:50 +08:00
parent a6ad343092
commit 4c192e921c
476 changed files with 381543 additions and 5819 deletions

View File

@@ -5,6 +5,9 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from dateutil import parser as dtparser
from utils.windowing import build_window_segments, calc_window_minutes
@dataclass(frozen=True)
@@ -49,35 +52,72 @@ class BaseTask:
# ------------------------------------------------------------------ 主流程
def execute(self, cursor_data: dict | None = None) -> dict:
"""统一 orchestrate Extract → Transform → Load"""
context = self._build_context(cursor_data)
base_context = self._build_context(cursor_data)
task_code = self.get_task_code()
self.logger.info(
"%s: 开始执行,窗口[%s ~ %s]",
task_code,
context.window_start,
context.window_end,
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
counts = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 窗口拆分为 %s", task_code, total_segments)
result = self._build_result("SUCCESS", counts)
total_counts: dict = {}
segment_results: list[dict] = []
for idx, (window_start, window_end) in enumerate(segments, start=1):
context = self._build_context_for_window(window_start, window_end, cursor_data)
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
task_code,
idx,
total_segments,
context.window_start,
context.window_end,
)
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
counts = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
self._accumulate_counts(total_counts, counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": counts,
}
)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result = self._build_result("SUCCESS", total_counts)
result["window"] = {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if segment_results:
result["segments"] = segment_results
self.logger.info("%s: 完成,统计=%s", task_code, result["counts"])
return result
# ------------------------------------------------------------------ 辅助方法
def _build_context(self, cursor_data: dict | None) -> TaskContext:
window_start, window_end, window_minutes = self._get_time_window(cursor_data)
return TaskContext(
@@ -88,10 +128,63 @@ class BaseTask:
cursor=cursor_data,
)
def _build_context_for_window(
self,
window_start: datetime,
window_end: datetime,
cursor_data: dict | None,
) -> TaskContext:
return TaskContext(
store_id=self.config.get("app.store_id"),
window_start=window_start,
window_end=window_end,
window_minutes=calc_window_minutes(window_start, window_end),
cursor=cursor_data,
)
@staticmethod
def _accumulate_counts(total: dict, current: dict) -> dict:
for key, value in (current or {}).items():
if isinstance(value, (int, float)):
total[key] = (total.get(key) or 0) + value
else:
total.setdefault(key, value)
return total
def _get_time_window(self, cursor_data: dict = None) -> tuple:
"""计算时间窗口"""
now = datetime.now(self.tz)
override_start = self.config.get("run.window_override.start")
override_end = self.config.get("run.window_override.end")
if override_start or override_end:
if not (override_start and override_end):
raise ValueError("run.window_override.start/end 需要同时提供")
window_start = override_start
if isinstance(window_start, str):
window_start = dtparser.parse(window_start)
if isinstance(window_start, datetime) and window_start.tzinfo is None:
window_start = window_start.replace(tzinfo=self.tz)
elif isinstance(window_start, datetime):
window_start = window_start.astimezone(self.tz)
window_end = override_end
if isinstance(window_end, str):
window_end = dtparser.parse(window_end)
if isinstance(window_end, datetime) and window_end.tzinfo is None:
window_end = window_end.replace(tzinfo=self.tz)
elif isinstance(window_end, datetime):
window_end = window_end.astimezone(self.tz)
if not isinstance(window_start, datetime) or not isinstance(window_end, datetime):
raise ValueError("run.window_override.start/end 解析失败")
if window_end <= window_start:
raise ValueError("run.window_override.end 必须大于 start")
window_minutes = max(1, int((window_end - window_start).total_seconds() // 60))
return window_start, window_end, window_minutes
idle_start = self.config.get("run.idle_window.start", "04:00")
idle_end = self.config.get("run.idle_window.end", "16:00")
is_idle = self._is_in_idle_window(now, idle_start, idle_end)

View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
"""Task: report last successful cursor cutoff times from etl_admin."""
from __future__ import annotations
from typing import Any
from .base_task import BaseTask
class CheckCutoffTask(BaseTask):
"""Report per-task cursor cutoff times (etl_admin.etl_cursor.last_end)."""
def get_task_code(self) -> str:
return "CHECK_CUTOFF"
def execute(self, cursor_data: dict | None = None) -> dict:
store_id = int(self.config.get("app.store_id"))
filter_codes = self.config.get("run.cutoff_task_codes") or None
if isinstance(filter_codes, str):
filter_codes = [c.strip().upper() for c in filter_codes.split(",") if c.strip()]
sql = """
SELECT
t.task_code,
c.last_start,
c.last_end,
c.last_id,
c.last_run_id,
c.updated_at
FROM etl_admin.etl_task t
LEFT JOIN etl_admin.etl_cursor c
ON c.task_id = t.task_id AND c.store_id = t.store_id
WHERE t.store_id = %s
AND t.enabled = TRUE
ORDER BY t.task_code
"""
rows = self.db.query(sql, (store_id,))
if filter_codes:
wanted = {str(c).upper() for c in filter_codes}
rows = [r for r in rows if str(r.get("task_code", "")).upper() in wanted]
def _ts(v: Any) -> str:
return "-" if not v else str(v)
self.logger.info("截止时间检查: 门店ID=%s 启用任务数=%s", store_id, len(rows))
for r in rows:
self.logger.info(
"截止时间检查: %-24s 结束时间=%s 开始时间=%s 运行ID=%s",
str(r.get("task_code") or ""),
_ts(r.get("last_end")),
_ts(r.get("last_start")),
_ts(r.get("last_run_id")),
)
cutoff_candidates = [
r.get("last_end")
for r in rows
if r.get("last_end") is not None and not str(r.get("task_code", "")).upper().startswith("INIT_")
]
cutoff = min(cutoff_candidates) if cutoff_candidates else None
self.logger.info("截止时间检查: 总体截止时间(最小结束时间,排除INIT_*)=%s", _ts(cutoff))
ods_fetched = self._probe_ods_fetched_at(store_id)
if ods_fetched:
non_null = [v["max_fetched_at"] for v in ods_fetched.values() if v.get("max_fetched_at") is not None]
ods_cutoff = min(non_null) if non_null else None
self.logger.info("截止时间检查: ODS截止时间(最小抓取时间)=%s", _ts(ods_cutoff))
worst = sorted(
((k, v.get("max_fetched_at")) for k, v in ods_fetched.items()),
key=lambda kv: (kv[1] is None, kv[1]),
)[:8]
for table, mx in worst:
self.logger.info("截止时间检查: ODS表=%s 最大抓取时间=%s", table, _ts(mx))
dw_checks = self._probe_dw_time_columns()
for name, value in dw_checks.items():
self.logger.info("截止时间检查: %s=%s", name, _ts(value))
return {
"status": "SUCCESS",
"counts": {"fetched": len(rows), "inserted": 0, "updated": 0, "skipped": 0, "errors": 0},
"window": None,
"request_params": {"store_id": store_id, "filter_task_codes": filter_codes or []},
"report": {
"rows": rows,
"overall_cutoff": cutoff,
"ods_fetched_at": ods_fetched,
"dw_max_times": dw_checks,
},
}
def _probe_ods_fetched_at(self, store_id: int) -> dict[str, dict[str, Any]]:
try:
from tasks.dwd_load_task import DwdLoadTask # local import to avoid circulars
except Exception:
return {}
ods_tables = sorted({str(t) for t in DwdLoadTask.TABLE_MAP.values() if str(t).startswith("billiards_ods.")})
results: dict[str, dict[str, Any]] = {}
for table in ods_tables:
try:
row = self.db.query(f"SELECT MAX(fetched_at) AS mx, COUNT(*) AS cnt FROM {table}")[0]
results[table] = {"max_fetched_at": row.get("mx"), "count": row.get("cnt")}
except Exception as exc: # noqa: BLE001
results[table] = {"max_fetched_at": None, "count": None, "error": str(exc)}
return results
def _probe_dw_time_columns(self) -> dict[str, Any]:
checks: dict[str, Any] = {}
probes = {
"DWD.max_settlement_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_settlement_head",
"DWD.max_payment_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_payment",
"DWD.max_refund_pay_time": "SELECT MAX(pay_time) AS mx FROM billiards_dwd.dwd_refund",
"DWS.max_order_date": "SELECT MAX(order_date) AS mx FROM billiards_dws.dws_order_summary",
"DWS.max_updated_at": "SELECT MAX(updated_at) AS mx FROM billiards_dws.dws_order_summary",
}
for name, sql2 in probes.items():
try:
row = self.db.query(sql2)[0]
checks[name] = row.get("mx")
except Exception as exc: # noqa: BLE001
checks[name] = f"ERROR: {exc}"
return checks

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
"""Data integrity task that checks API -> ODS -> DWD completeness."""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from dateutil import parser as dtparser
import json
from pathlib import Path
from utils.windowing import build_window_segments, calc_window_minutes
from .base_task import BaseTask
from quality.integrity_checker import (
IntegrityWindow,
compute_last_etl_end,
run_integrity_history,
run_integrity_window,
)
class DataIntegrityTask(BaseTask):
"""Check data completeness across API -> ODS -> DWD."""
def get_task_code(self) -> str:
return "DATA_INTEGRITY_CHECK"
def execute(self, cursor_data: dict | None = None) -> dict:
tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei"))
mode = str(self.config.get("integrity.mode", "history") or "history").lower()
include_dimensions = bool(self.config.get("integrity.include_dimensions", False))
task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip()
auto_backfill = bool(self.config.get("integrity.auto_backfill", False))
# 检测是否通过 CLI 传入了时间窗口参数window_override
# 如果有,自动切换到 window 模式
window_override_start = self.config.get("run.window_override.start")
window_override_end = self.config.get("run.window_override.end")
if window_override_start or window_override_end:
self.logger.info(
"检测到 CLI 时间窗口参数,自动切换到 window 模式: %s ~ %s",
window_override_start, window_override_end
)
mode = "window"
if mode == "window":
base_start, base_end, _ = self._get_time_window(cursor_data)
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info("数据完整性检查: 分段执行 共%s", total_segments)
window_reports = []
total_missing = 0
total_errors = 0
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
window = IntegrityWindow(
start=seg_start,
end=seg_end,
label=f"segment_{idx}",
granularity="window",
)
payload = run_integrity_window(
cfg=self.config,
window=window,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=self.logger,
write_report=False,
window_split_unit="none",
window_compensation_hours=0,
)
window_reports.append(payload)
total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0)
total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0)
overall_start = segments[0][0]
overall_end = segments[-1][1]
report = {
"mode": "window",
"window": {
"start": overall_start.isoformat(),
"end": overall_end.isoformat(),
"segments": total_segments,
},
"windows": window_reports,
"api_to_ods": {
"total_missing": total_missing,
"total_errors": total_errors,
},
"total_missing": total_missing,
"total_errors": total_errors,
"generated_at": datetime.now(tz).isoformat(),
}
report_path = self._write_report(report, "data_integrity_window")
report["report_path"] = report_path
missing_count = int(total_missing or 0)
counts = {
"missing": missing_count,
"errors": int(total_errors or 0),
}
# ????
backfill_result = None
if auto_backfill and missing_count > 0:
backfill_result = self._run_backfill(base_start, base_end, task_codes)
counts["backfilled"] = backfill_result.get("backfilled", 0)
return {
"status": "SUCCESS",
"counts": counts,
"window": {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
},
"report_path": report_path,
"backfill_result": backfill_result,
}
history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01")
history_end = str(self.config.get("integrity.history_end", "") or "").strip()
start_dt = dtparser.parse(history_start)
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=tz)
else:
start_dt = start_dt.astimezone(tz)
if history_end:
end_dt = dtparser.parse(history_end)
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=tz)
else:
end_dt = end_dt.astimezone(tz)
else:
end_dt = compute_last_etl_end(self.config) or datetime.now(tz)
report = run_integrity_history(
cfg=self.config,
start_dt=start_dt,
end_dt=end_dt,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=self.logger,
write_report=True,
)
missing_count = int(report.get("total_missing") or 0)
counts = {
"missing": missing_count,
"errors": int(report.get("total_errors") or 0),
}
# 自动补全
backfill_result = None
if auto_backfill and missing_count > 0:
backfill_result = self._run_backfill(start_dt, end_dt, task_codes)
counts["backfilled"] = backfill_result.get("backfilled", 0)
return {
"status": "SUCCESS",
"counts": counts,
"window": {
"start": start_dt,
"end": end_dt,
"minutes": int((end_dt - start_dt).total_seconds() // 60) if end_dt > start_dt else 0,
},
"report_path": report.get("report_path"),
"backfill_result": backfill_result,
}
def _write_report(self, report: dict, prefix: str) -> str:
root = Path(__file__).resolve().parents[1]
stamp = datetime.now(self.tz).strftime("%Y%m%d_%H%M%S")
path = root / "reports" / f"{prefix}_{stamp}.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return str(path)
def _run_backfill(self, start_dt: datetime, end_dt: datetime, task_codes: str) -> dict:
"""运行数据补全"""
self.logger.info("自动补全开始 起始=%s 结束=%s", start_dt, end_dt)
try:
from scripts.backfill_missing_data import run_backfill
result = run_backfill(
cfg=self.config,
start=start_dt,
end=end_dt,
task_codes=task_codes or None,
dry_run=False,
page_size=200,
chunk_size=500,
logger=self.logger,
)
self.logger.info(
"自动补全完成 已补全=%s 错误数=%s",
result.get("backfilled", 0),
result.get("errors", 0),
)
return result
except Exception as exc:
self.logger.exception("自动补全失败")
return {"backfilled": 0, "errors": 1, "error": str(exc)}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""DWD 质量核对任务:按 dwd_quality_check.md 输出行数/金额对照报表。"""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Sequence, Tuple
from psycopg2.extras import RealDictCursor
from .base_task import BaseTask, TaskContext
from .dwd_load_task import DwdLoadTask
class DwdQualityTask(BaseTask):
"""对 ODS 与 DWD 进行行数、金额对照核查,生成 JSON 报表。"""
REPORT_PATH = Path("etl_billiards/reports/dwd_quality_report.json")
AMOUNT_KEYWORDS = ("amount", "money", "fee", "balance")
def get_task_code(self) -> str:
"""返回任务编码。"""
return "DWD_QUALITY_CHECK"
def extract(self, context: TaskContext) -> dict[str, Any]:
"""准备运行时上下文。"""
return {"now": datetime.now()}
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict[str, Any]:
"""输出行数/金额差异报表到本地文件。"""
report: Dict[str, Any] = {
"generated_at": extracted["now"].isoformat(),
"tables": [],
"note": "行数/金额核对,金额字段基于列名包含 amount/money/fee/balance 的数值列自动扫描。",
}
with self.db.conn.cursor(cursor_factory=RealDictCursor) as cur:
for dwd_table, ods_table in DwdLoadTask.TABLE_MAP.items():
count_info = self._compare_counts(cur, dwd_table, ods_table)
amount_info = self._compare_amounts(cur, dwd_table, ods_table)
report["tables"].append(
{
"dwd_table": dwd_table,
"ods_table": ods_table,
"count": count_info,
"amounts": amount_info,
}
)
self.REPORT_PATH.parent.mkdir(parents=True, exist_ok=True)
self.REPORT_PATH.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
self.logger.info("DWD 质检报表已生成:%s", self.REPORT_PATH)
return {"report_path": str(self.REPORT_PATH)}
# ---------------------- helpers ----------------------
def _compare_counts(self, cur, dwd_table: str, ods_table: str) -> Dict[str, Any]:
"""统计两端行数并返回差异。"""
dwd_schema, dwd_name = self._split_table_name(dwd_table, default_schema="billiards_dwd")
ods_schema, ods_name = self._split_table_name(ods_table, default_schema="billiards_ods")
cur.execute(f'SELECT COUNT(1) AS cnt FROM "{dwd_schema}"."{dwd_name}"')
dwd_cnt = cur.fetchone()["cnt"]
cur.execute(f'SELECT COUNT(1) AS cnt FROM "{ods_schema}"."{ods_name}"')
ods_cnt = cur.fetchone()["cnt"]
return {"dwd": dwd_cnt, "ods": ods_cnt, "diff": dwd_cnt - ods_cnt}
def _compare_amounts(self, cur, dwd_table: str, ods_table: str) -> List[Dict[str, Any]]:
"""扫描金额相关列,生成 ODS 与 DWD 的汇总对照。"""
dwd_schema, dwd_name = self._split_table_name(dwd_table, default_schema="billiards_dwd")
ods_schema, ods_name = self._split_table_name(ods_table, default_schema="billiards_ods")
dwd_amount_cols = self._get_numeric_amount_columns(cur, dwd_schema, dwd_name)
ods_amount_cols = self._get_numeric_amount_columns(cur, ods_schema, ods_name)
common_amount_cols = sorted(set(dwd_amount_cols) & set(ods_amount_cols))
results: List[Dict[str, Any]] = []
for col in common_amount_cols:
cur.execute(f'SELECT COALESCE(SUM("{col}"),0) AS val FROM "{dwd_schema}"."{dwd_name}"')
dwd_sum = cur.fetchone()["val"]
cur.execute(f'SELECT COALESCE(SUM("{col}"),0) AS val FROM "{ods_schema}"."{ods_name}"')
ods_sum = cur.fetchone()["val"]
results.append({"column": col, "dwd_sum": float(dwd_sum or 0), "ods_sum": float(ods_sum or 0), "diff": float(dwd_sum or 0) - float(ods_sum or 0)})
return results
def _get_numeric_amount_columns(self, cur, schema: str, table: str) -> List[str]:
"""获取列名包含金额关键词的数值型字段。"""
cur.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s
AND table_name = %s
AND data_type IN ('numeric','double precision','integer','bigint','smallint','real','decimal')
""",
(schema, table),
)
cols = [r["column_name"].lower() for r in cur.fetchall()]
return [c for c in cols if any(key in c for key in self.AMOUNT_KEYWORDS)]
def _split_table_name(self, name: str, default_schema: str) -> Tuple[str, str]:
"""拆分 schema 与表名,缺省使用 default_schema。"""
parts = name.split(".")
if len(parts) == 2:
return parts[0], parts[1]
return default_schema, name

View File

@@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-
"""Build DWS order summary table from DWD fact tables."""
from __future__ import annotations
from datetime import date
from typing import Any
from .base_task import BaseTask, TaskContext
from utils.windowing import build_window_segments, calc_window_minutes
from scripts.build_dws_order_summary import SQL_BUILD_SUMMARY
class DwsBuildOrderSummaryTask(BaseTask):
"""Recompute/refresh `billiards_dws.dws_order_summary` for a date window."""
def get_task_code(self) -> str:
return "DWS_BUILD_ORDER_SUMMARY"
def execute(self, cursor_data: dict | None = None) -> dict:
base_context = self._build_context(cursor_data)
task_code = self.get_task_code()
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 分段执行 共%s", task_code, total_segments)
total_counts: dict = {}
segment_results: list[dict] = []
request_params_list: list[dict] = []
total_deleted = 0
for idx, (window_start, window_end) in enumerate(segments, start=1):
context = self._build_context_for_window(window_start, window_end, cursor_data)
self.logger.info(
"%s: 开始执行(%s/%s), 窗口[%s ~ %s]",
task_code,
idx,
total_segments,
context.window_start,
context.window_end,
)
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
load_result = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
counts = load_result.get("counts") or {}
self._accumulate_counts(total_counts, counts)
extra = load_result.get("extra") or {}
deleted = int(extra.get("deleted") or 0)
total_deleted += deleted
request_params = load_result.get("request_params")
if request_params:
request_params_list.append(request_params)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": counts,
"extra": extra,
}
)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result = {"status": "SUCCESS", "counts": total_counts}
result["window"] = {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if segment_results:
result["segments"] = segment_results
if request_params_list:
result["request_params"] = request_params_list[0] if len(request_params_list) == 1 else request_params_list
if total_deleted:
result["extra"] = {"deleted": total_deleted}
self.logger.info("%s: 完成, 统计=%s", task_code, total_counts)
return result
def extract(self, context: TaskContext) -> dict[str, Any]:
store_id = int(self.config.get("app.store_id"))
full_refresh = bool(self.config.get("dws.order_summary.full_refresh", False))
site_id = self.config.get("dws.order_summary.site_id", store_id)
if site_id in ("", None, "null", "NULL"):
site_id = None
start_date = self.config.get("dws.order_summary.start_date")
end_date = self.config.get("dws.order_summary.end_date")
if not full_refresh:
if not start_date:
start_date = context.window_start.date()
if not end_date:
end_date = context.window_end.date()
else:
start_date = None
end_date = None
delete_before_insert = bool(self.config.get("dws.order_summary.delete_before_insert", True))
return {
"site_id": site_id,
"start_date": start_date,
"end_date": end_date,
"full_refresh": full_refresh,
"delete_before_insert": delete_before_insert,
}
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict:
sql_params = {
"site_id": extracted["site_id"],
"start_date": extracted["start_date"],
"end_date": extracted["end_date"],
}
request_params = {
"site_id": extracted["site_id"],
"start_date": _jsonable_date(extracted["start_date"]),
"end_date": _jsonable_date(extracted["end_date"]),
}
with self.db.conn.cursor() as cur:
cur.execute("SELECT to_regclass('billiards_dws.dws_order_summary') AS reg;")
row = cur.fetchone()
reg = row[0] if row else None
if not reg:
raise RuntimeError("DWS 表不存在:请先运行任务 INIT_DWS_SCHEMA")
deleted = 0
if extracted["delete_before_insert"]:
if extracted["full_refresh"] and extracted["site_id"] is None:
cur.execute("TRUNCATE TABLE billiards_dws.dws_order_summary;")
self.logger.info("DWS订单汇总: 已清空 billiards_dws.dws_order_summary")
else:
delete_sql = "DELETE FROM billiards_dws.dws_order_summary WHERE 1=1"
delete_args: list[Any] = []
if extracted["site_id"] is not None:
delete_sql += " AND site_id = %s"
delete_args.append(extracted["site_id"])
if extracted["start_date"] is not None:
delete_sql += " AND order_date >= %s"
delete_args.append(_as_date(extracted["start_date"]))
if extracted["end_date"] is not None:
delete_sql += " AND order_date <= %s"
delete_args.append(_as_date(extracted["end_date"]))
cur.execute(delete_sql, delete_args)
deleted = cur.rowcount
self.logger.info("DWS订单汇总: 删除=%s 语句=%s", deleted, delete_sql)
cur.execute(SQL_BUILD_SUMMARY, sql_params)
affected = cur.rowcount
return {
"counts": {"fetched": 0, "inserted": affected, "updated": 0, "skipped": 0, "errors": 0},
"request_params": request_params,
"extra": {"deleted": deleted},
}
def _as_date(v: Any) -> date:
if isinstance(v, date):
return v
return date.fromisoformat(str(v))
def _jsonable_date(v: Any):
if v is None:
return None
if isinstance(v, date):
return v.isoformat()
return str(v)

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""初始化 DWD Schema执行 schema_dwd_doc.sql可选先 DROP SCHEMA。"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from .base_task import BaseTask, TaskContext
class InitDwdSchemaTask(BaseTask):
"""通过调度执行 DWD schema 初始化。"""
def get_task_code(self) -> str:
"""返回任务编码。"""
return "INIT_DWD_SCHEMA"
def extract(self, context: TaskContext) -> dict[str, Any]:
"""读取 DWD SQL 文件与参数。"""
base_dir = Path(__file__).resolve().parents[1] / "database"
dwd_path = Path(self.config.get("schema.dwd_file", base_dir / "schema_dwd_doc.sql"))
if not dwd_path.exists():
raise FileNotFoundError(f"未找到 DWD schema 文件: {dwd_path}")
drop_first = self.config.get("dwd.drop_schema_first", False)
return {"dwd_sql": dwd_path.read_text(encoding="utf-8"), "dwd_file": str(dwd_path), "drop_first": drop_first}
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict:
"""可选 DROP schema再执行 DWD DDL。"""
with self.db.conn.cursor() as cur:
if extracted["drop_first"]:
cur.execute("DROP SCHEMA IF EXISTS billiards_dwd CASCADE;")
self.logger.info("已执行 DROP SCHEMA billiards_dwd CASCADE")
self.logger.info("执行 DWD schema 文件: %s", extracted["dwd_file"])
cur.execute(extracted["dwd_sql"])
return {"executed": 1, "files": [extracted["dwd_file"]]}

View File

@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""Initialize DWS schema (billiards_dws)."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from .base_task import BaseTask, TaskContext
class InitDwsSchemaTask(BaseTask):
"""Apply DWS schema SQL."""
def get_task_code(self) -> str:
return "INIT_DWS_SCHEMA"
def extract(self, context: TaskContext) -> dict[str, Any]:
base_dir = Path(__file__).resolve().parents[1] / "database"
dws_path = Path(self.config.get("schema.dws_file", base_dir / "schema_dws.sql"))
if not dws_path.exists():
raise FileNotFoundError(f"未找到 DWS schema 文件: {dws_path}")
drop_first = bool(self.config.get("dws.drop_schema_first", False))
return {"dws_sql": dws_path.read_text(encoding="utf-8"), "dws_file": str(dws_path), "drop_first": drop_first}
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict:
with self.db.conn.cursor() as cur:
if extracted["drop_first"]:
cur.execute("DROP SCHEMA IF EXISTS billiards_dws CASCADE;")
self.logger.info("已执行 DROP SCHEMA billiards_dws CASCADE")
self.logger.info("执行 DWS schema 文件: %s", extracted["dws_file"])
cur.execute(extracted["dws_sql"])
return {"executed": 1, "files": [extracted["dws_file"]]}

View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""任务:初始化运行环境,执行 ODS 与 etl_admin 的 DDL并准备日志/导出目录。"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from .base_task import BaseTask, TaskContext
class InitOdsSchemaTask(BaseTask):
"""通过调度执行初始化:创建必要目录,执行 ODS 与 etl_admin 的 DDL。"""
def get_task_code(self) -> str:
"""返回任务编码。"""
return "INIT_ODS_SCHEMA"
def extract(self, context: TaskContext) -> dict[str, Any]:
"""读取 SQL 文件路径,收集需创建的目录。"""
base_dir = Path(__file__).resolve().parents[1] / "database"
ods_path = Path(self.config.get("schema.ods_file", base_dir / "schema_ODS_doc.sql"))
admin_path = Path(self.config.get("schema.etl_admin_file", base_dir / "schema_etl_admin.sql"))
if not ods_path.exists():
raise FileNotFoundError(f"找不到 ODS schema 文件: {ods_path}")
if not admin_path.exists():
raise FileNotFoundError(f"找不到 etl_admin schema 文件: {admin_path}")
log_root = Path(self.config.get("io.log_root") or self.config["io"]["log_root"])
export_root = Path(self.config.get("io.export_root") or self.config["io"]["export_root"])
fetch_root = Path(self.config.get("pipeline.fetch_root") or self.config["pipeline"]["fetch_root"])
ingest_dir = Path(self.config.get("pipeline.ingest_source_dir") or fetch_root)
return {
"ods_sql": ods_path.read_text(encoding="utf-8"),
"admin_sql": admin_path.read_text(encoding="utf-8"),
"ods_file": str(ods_path),
"admin_file": str(admin_path),
"dirs": [log_root, export_root, fetch_root, ingest_dir],
}
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict:
"""执行 DDL 并创建必要目录。
安全提示:
ODS DDL 文件可能携带头部说明或异常注释,为避免因非 SQL 文本导致执行失败,这里会做一次轻量清洗后再执行。
"""
for d in extracted["dirs"]:
Path(d).mkdir(parents=True, exist_ok=True)
self.logger.info("已确保目录存在: %s", d)
# 处理 ODS SQL去掉头部说明行以及易出错的 COMMENT ON 行(如 CamelCase 未加引号)
ods_sql_raw: str = extracted["ods_sql"]
drop_idx = ods_sql_raw.find("DROP SCHEMA")
if drop_idx > 0:
ods_sql_raw = ods_sql_raw[drop_idx:]
cleaned_lines: list[str] = []
for line in ods_sql_raw.splitlines():
if line.strip().upper().startswith("COMMENT ON "):
continue
cleaned_lines.append(line)
ods_sql = "\n".join(cleaned_lines)
with self.db.conn.cursor() as cur:
self.logger.info("执行 etl_admin schema 文件: %s", extracted["admin_file"])
cur.execute(extracted["admin_sql"])
self.logger.info("执行 ODS schema 文件: %s", extracted["ods_file"])
cur.execute(ods_sql)
return {
"executed": 2,
"files": [extracted["admin_file"], extracted["ods_file"]],
"dirs_prepared": [str(p) for p in extracted["dirs"]],
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.dimensions.member import MemberLoader
from models.parsers import TypeParser
import json
from utils.windowing import build_window_segments
class MembersDwdTask(BaseDwdTask):
"""
DWD Task: Process Member Records from ODS to Dimension Table
Source: billiards_ods.ods_member_profile
Source: billiards_ods.member_profiles
Target: billiards.dim_member
"""
@@ -17,53 +18,72 @@ class MembersDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
loader = MemberLoader(self.db)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_updated = 0
total_errors = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_member_profile",
columns=["site_id", "member_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.member_profiles",
columns=["site_id", "member_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_member(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}"
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_member(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}")
return {
"status": "success",
"inserted": total_inserted,
"updated": total_updated,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
"window_start": overall_start.isoformat(),
"window_end": overall_end.isoformat()
}
def _parse_member(self, raw: dict, store_id: int) -> dict:
@@ -87,3 +107,4 @@ class MembersDwdTask(BaseDwdTask):
except Exception as e:
self.logger.warning(f"Error parsing member: {e}")
return None

View File

@@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
"""在线抓取 ODS 相关接口并落盘为 JSON用于后续离线回放/入库)。"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from api.client import APIClient
from models.parsers import TypeParser
from utils.json_store import dump_json, endpoint_to_filename
from .base_task import BaseTask, TaskContext
@dataclass(frozen=True)
class EndpointSpec:
endpoint: str
window_style: str # site | start_end | range | pay | none
data_path: tuple[str, ...] = ("data",)
list_key: str | None = None
class OdsJsonArchiveTask(BaseTask):
"""
抓取一组 ODS 所需接口并落盘为“简化 JSON”
{"code": 0, "data": [...records...]}
说明:
- 该输出格式与 tasks/manual_ingest_task.py 的解析逻辑兼容;
- 默认每页一个文件,避免单文件过大;
- 结算小票(/Order/GetOrderSettleTicketNew按 orderSettleId 分文件写入。
"""
ENDPOINTS: tuple[EndpointSpec, ...] = (
EndpointSpec("/MemberProfile/GetTenantMemberList", "site", list_key="tenantMemberInfos"),
EndpointSpec("/MemberProfile/GetTenantMemberCardList", "site", list_key="tenantMemberCards"),
EndpointSpec("/MemberProfile/GetMemberCardBalanceChange", "start_end"),
EndpointSpec("/PersonnelManagement/SearchAssistantInfo", "site", list_key="assistantInfos"),
EndpointSpec(
"/AssistantPerformance/GetOrderAssistantDetails",
"start_end",
list_key="orderAssistantDetails",
),
EndpointSpec(
"/AssistantPerformance/GetAbolitionAssistant",
"start_end",
list_key="abolitionAssistants",
),
EndpointSpec("/Table/GetSiteTables", "site", list_key="siteTables"),
EndpointSpec(
"/TenantGoodsCategory/QueryPrimarySecondaryCategory",
"site",
list_key="goodsCategoryList",
),
EndpointSpec("/TenantGoods/QueryTenantGoods", "site", list_key="tenantGoodsList"),
EndpointSpec("/TenantGoods/GetGoodsInventoryList", "site", list_key="orderGoodsList"),
EndpointSpec("/TenantGoods/GetGoodsStockReport", "site"),
EndpointSpec("/TenantGoods/GetGoodsSalesList", "start_end", list_key="orderGoodsLedgers"),
EndpointSpec(
"/PackageCoupon/QueryPackageCouponList",
"site",
list_key="packageCouponList",
),
EndpointSpec("/Site/GetSiteTableUseDetails", "start_end", list_key="siteTableUseDetailsList"),
EndpointSpec("/Site/GetSiteTableOrderDetails", "start_end", list_key="siteTableUseDetailsList"),
EndpointSpec("/Site/GetTaiFeeAdjustList", "start_end", list_key="taiFeeAdjustInfos"),
EndpointSpec(
"/GoodsStockManage/QueryGoodsOutboundReceipt",
"start_end",
list_key="queryDeliveryRecordsList",
),
EndpointSpec("/Promotion/GetOfflineCouponConsumePageList", "start_end"),
EndpointSpec("/Order/GetRefundPayLogList", "start_end"),
EndpointSpec("/Site/GetAllOrderSettleList", "range", list_key="settleList"),
EndpointSpec("/Site/GetRechargeSettleList", "range", list_key="settleList"),
EndpointSpec("/PayLog/GetPayLogListPage", "pay"),
)
TICKET_ENDPOINT = "/Order/GetOrderSettleTicketNew"
def get_task_code(self) -> str:
return "ODS_JSON_ARCHIVE"
def extract(self, context: TaskContext) -> dict:
base_client = getattr(self.api, "base", None) or self.api
if not isinstance(base_client, APIClient):
raise TypeError("ODS_JSON_ARCHIVE 需要 APIClient在线抓取")
output_dir = getattr(self.api, "output_dir", None)
if output_dir:
out = Path(output_dir)
else:
out = Path(self.config.get("pipeline.fetch_root") or self.config["pipeline"]["fetch_root"])
out.mkdir(parents=True, exist_ok=True)
write_pretty = bool(self.config.get("io.write_pretty_json", False))
page_size = int(self.config.get("api.page_size", 200) or 200)
store_id = int(context.store_id)
total_records = 0
ticket_ids: set[int] = set()
per_endpoint: list[dict] = []
self.logger.info(
"ODS_JSON_ARCHIVE: 开始抓取,窗口[%s ~ %s] 输出目录=%s",
context.window_start,
context.window_end,
out,
)
for spec in self.ENDPOINTS:
self.logger.info("ODS_JSON_ARCHIVE: 抓取 endpoint=%s", spec.endpoint)
built_params = self._build_params(
spec.window_style, store_id, context.window_start, context.window_end
)
# /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400
if spec.endpoint == "/TenantGoods/GetGoodsInventoryList":
built_params["siteId"] = [store_id]
params = self._merge_common_params(built_params)
base_filename = endpoint_to_filename(spec.endpoint)
stem = Path(base_filename).stem
suffix = Path(base_filename).suffix or ".json"
endpoint_records = 0
endpoint_pages = 0
endpoint_error: str | None = None
try:
for page_no, records, _, _ in base_client.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
endpoint_pages += 1
total_records += len(records)
endpoint_records += len(records)
if spec.endpoint == "/PayLog/GetPayLogListPage":
for rec in records or []:
relate_id = TypeParser.parse_int(
(rec or {}).get("relateId")
or (rec or {}).get("orderSettleId")
or (rec or {}).get("order_settle_id")
)
if relate_id:
ticket_ids.add(relate_id)
out_path = out / f"{stem}__p{int(page_no):04d}{suffix}"
dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty)
except Exception as exc: # noqa: BLE001
endpoint_error = f"{type(exc).__name__}: {exc}"
self.logger.error("ODS_JSON_ARCHIVE: 接口抓取失败 endpoint=%s err=%s", spec.endpoint, endpoint_error)
per_endpoint.append(
{
"endpoint": spec.endpoint,
"file_stem": stem,
"pages": endpoint_pages,
"records": endpoint_records,
"error": endpoint_error,
}
)
if endpoint_error:
self.logger.warning(
"ODS_JSON_ARCHIVE: endpoint=%s 完成失败pages=%s records=%s err=%s",
spec.endpoint,
endpoint_pages,
endpoint_records,
endpoint_error,
)
else:
self.logger.info(
"ODS_JSON_ARCHIVE: endpoint=%s 完成 pages=%s records=%s",
spec.endpoint,
endpoint_pages,
endpoint_records,
)
# Ticket details: per orderSettleId
ticket_ids_sorted = sorted(ticket_ids)
self.logger.info("ODS_JSON_ARCHIVE: 小票候选数=%s", len(ticket_ids_sorted))
ticket_file_stem = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).stem
ticket_file_suffix = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).suffix or ".json"
ticket_records = 0
for order_settle_id in ticket_ids_sorted:
params = self._merge_common_params({"orderSettleId": int(order_settle_id)})
try:
records, _ = base_client.get_paginated(
endpoint=self.TICKET_ENDPOINT,
params=params,
page_size=None,
data_path=("data",),
list_key=None,
)
if not records:
continue
ticket_records += len(records)
out_path = out / f"{ticket_file_stem}__{int(order_settle_id)}{ticket_file_suffix}"
dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty)
except Exception as exc: # noqa: BLE001
self.logger.error(
"ODS_JSON_ARCHIVE: 小票抓取失败 orderSettleId=%s err=%s",
order_settle_id,
exc,
)
continue
total_records += ticket_records
manifest = {
"task": self.get_task_code(),
"store_id": store_id,
"window_start": context.window_start.isoformat(),
"window_end": context.window_end.isoformat(),
"page_size": page_size,
"total_records": total_records,
"ticket_ids": len(ticket_ids_sorted),
"ticket_records": ticket_records,
"endpoints": per_endpoint,
}
manifest_path = out / "manifest.json"
dump_json(manifest_path, manifest, pretty=True)
if hasattr(self.api, "last_dump"):
try:
self.api.last_dump = {"file": str(manifest_path), "records": total_records, "pages": None}
except Exception:
pass
self.logger.info("ODS_JSON_ARCHIVE: 抓取完成,总记录数=%s(含小票=%s", total_records, ticket_records)
return {"fetched": total_records, "ticket_ids": len(ticket_ids_sorted)}
def _build_params(self, window_style: str, store_id: int, window_start, window_end) -> dict:
if window_style == "none":
return {}
if window_style == "site":
return {"siteId": store_id}
if window_style == "range":
return {
"siteId": store_id,
"rangeStartTime": TypeParser.format_timestamp(window_start, self.tz),
"rangeEndTime": TypeParser.format_timestamp(window_end, self.tz),
}
if window_style == "pay":
return {
"siteId": store_id,
"StartPayTime": TypeParser.format_timestamp(window_start, self.tz),
"EndPayTime": TypeParser.format_timestamp(window_end, self.tz),
}
# default: startTime/endTime
return {
"siteId": store_id,
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.payment import PaymentLoader
from models.parsers import TypeParser
import json
from utils.windowing import build_window_segments
class PaymentsDwdTask(BaseDwdTask):
"""
@@ -17,8 +18,20 @@ class PaymentsDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
loader = PaymentLoader(self.db, logger=self.logger)
store_id = self.config.get("app.store_id")
@@ -27,36 +40,42 @@ class PaymentsDwdTask(BaseDwdTask):
total_updated = 0
total_skipped = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_payment_record",
columns=["site_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.payment_transactions",
columns=["site_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
for batch in batches:
if not batch:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
total_skipped += skipped
self.db.commit()
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
total_skipped += skipped
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
"Task %s completed. inserted=%s updated=%s skipped=%s",
self.get_task_code(),
@@ -64,7 +83,7 @@ class PaymentsDwdTask(BaseDwdTask):
total_updated,
total_skipped,
)
return {
"status": "SUCCESS",
"counts": {
@@ -72,8 +91,8 @@ class PaymentsDwdTask(BaseDwdTask):
"updated": total_updated,
"skipped": total_skipped,
},
"window_start": window_start,
"window_end": window_end,
"window_start": overall_start,
"window_end": overall_end,
}
def _parse_payment(self, raw: dict, store_id: int) -> dict:
@@ -136,3 +155,4 @@ class PaymentsDwdTask(BaseDwdTask):
except Exception as e:
self.logger.warning(f"Error parsing payment: {e}")
return None

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.ticket import TicketLoader
from utils.windowing import build_window_segments
class TicketDwdTask(BaseDwdTask):
"""
@@ -19,51 +20,66 @@ class TicketDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
# 1. Get Time Window (Incremental Load)
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
# 2. Initialize Loader
loader = TicketLoader(self.db, logger=self.logger)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_errors = 0
# 3. Iterate ODS Data
# We query ods_ticket_detail based on fetched_at
batches = self.iter_ods_rows(
table_name="billiards_ods.settlement_ticket_details",
columns=["payload", "fetched_at", "source_file", "record_index"],
start_time=window_start,
end_time=window_end
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.settlement_ticket_details",
columns=["payload", "fetched_at", "source_file", "record_index"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
tickets = []
for row in batch:
payload = self.parse_payload(row)
if payload:
tickets.append(payload)
inserted, errors = loader.process_tickets(tickets, store_id)
total_inserted += inserted
total_errors += errors
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}"
)
for batch in batches:
if not batch:
continue
# Extract payloads
tickets = []
for row in batch:
payload = self.parse_payload(row)
if payload:
tickets.append(payload)
# Process Batch
inserted, errors = loader.process_tickets(tickets, store_id)
total_inserted += inserted
total_errors += errors
# 4. Commit
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
return {
"status": "success",
"inserted": total_inserted,
"errors": total_errors,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
"window_start": overall_start.isoformat(),
"window_end": overall_end.isoformat()
}