改 相对路径 完成客户端

This commit is contained in:
Neo
2026-01-27 22:14:01 +08:00
parent 04c064793a
commit 9f8976e75a
292 changed files with 307062 additions and 678 deletions

View File

@@ -7,6 +7,8 @@ from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from dateutil import parser as dtparser
from utils.windowing import build_window_segments, calc_window_minutes
@dataclass(frozen=True)
class TaskContext:
@@ -50,35 +52,72 @@ class BaseTask:
# ------------------------------------------------------------------ 主流程
def execute(self, cursor_data: dict | None = None) -> dict:
"""统一 orchestrate Extract → Transform → Load"""
context = self._build_context(cursor_data)
base_context = self._build_context(cursor_data)
task_code = self.get_task_code()
self.logger.info(
"%s: 开始执行,窗口[%s ~ %s]",
task_code,
context.window_start,
context.window_end,
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
counts = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 窗口拆分为 %s", task_code, total_segments)
result = self._build_result("SUCCESS", counts)
total_counts: dict = {}
segment_results: list[dict] = []
for idx, (window_start, window_end) in enumerate(segments, start=1):
context = self._build_context_for_window(window_start, window_end, cursor_data)
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
task_code,
idx,
total_segments,
context.window_start,
context.window_end,
)
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
counts = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
self._accumulate_counts(total_counts, counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": counts,
}
)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result = self._build_result("SUCCESS", total_counts)
result["window"] = {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if segment_results:
result["segments"] = segment_results
self.logger.info("%s: 完成,统计=%s", task_code, result["counts"])
return result
# ------------------------------------------------------------------ 辅助方法
def _build_context(self, cursor_data: dict | None) -> TaskContext:
window_start, window_end, window_minutes = self._get_time_window(cursor_data)
return TaskContext(
@@ -89,6 +128,29 @@ class BaseTask:
cursor=cursor_data,
)
def _build_context_for_window(
self,
window_start: datetime,
window_end: datetime,
cursor_data: dict | None,
) -> TaskContext:
return TaskContext(
store_id=self.config.get("app.store_id"),
window_start=window_start,
window_end=window_end,
window_minutes=calc_window_minutes(window_start, window_end),
cursor=cursor_data,
)
@staticmethod
def _accumulate_counts(total: dict, current: dict) -> dict:
for key, value in (current or {}).items():
if isinstance(value, (int, float)):
total[key] = (total.get(key) or 0) + value
else:
total.setdefault(key, value)
return total
def _get_time_window(self, cursor_data: dict = None) -> tuple:
"""计算时间窗口"""
now = datetime.now(self.tz)

View File

@@ -44,10 +44,10 @@ class CheckCutoffTask(BaseTask):
def _ts(v: Any) -> str:
return "-" if not v else str(v)
self.logger.info("CHECK_CUTOFF: store_id=%s enabled_tasks=%s", store_id, len(rows))
self.logger.info("截止时间检查: 门店ID=%s 启用任务数=%s", store_id, len(rows))
for r in rows:
self.logger.info(
"CHECK_CUTOFF: %-24s last_end=%s last_start=%s last_run_id=%s",
"截止时间检查: %-24s 结束时间=%s 开始时间=%s 运行ID=%s",
str(r.get("task_code") or ""),
_ts(r.get("last_end")),
_ts(r.get("last_start")),
@@ -60,23 +60,23 @@ class CheckCutoffTask(BaseTask):
if r.get("last_end") is not None and not str(r.get("task_code", "")).upper().startswith("INIT_")
]
cutoff = min(cutoff_candidates) if cutoff_candidates else None
self.logger.info("CHECK_CUTOFF: overall_cutoff(min last_end, excl INIT_*)=%s", _ts(cutoff))
self.logger.info("截止时间检查: 总体截止时间(最小结束时间,排除INIT_*)=%s", _ts(cutoff))
ods_fetched = self._probe_ods_fetched_at(store_id)
if ods_fetched:
non_null = [v["max_fetched_at"] for v in ods_fetched.values() if v.get("max_fetched_at") is not None]
ods_cutoff = min(non_null) if non_null else None
self.logger.info("CHECK_CUTOFF: ODS cutoff(min MAX(fetched_at))=%s", _ts(ods_cutoff))
self.logger.info("截止时间检查: ODS截止时间(最小抓取时间)=%s", _ts(ods_cutoff))
worst = sorted(
((k, v.get("max_fetched_at")) for k, v in ods_fetched.items()),
key=lambda kv: (kv[1] is None, kv[1]),
)[:8]
for table, mx in worst:
self.logger.info("CHECK_CUTOFF: ODS table=%s max_fetched_at=%s", table, _ts(mx))
self.logger.info("截止时间检查: ODS表=%s 最大抓取时间=%s", table, _ts(mx))
dw_checks = self._probe_dw_time_columns()
for name, value in dw_checks.items():
self.logger.info("CHECK_CUTOFF: %s=%s", name, _ts(value))
self.logger.info("截止时间检查: %s=%s", name, _ts(value))
return {
"status": "SUCCESS",

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
"""Data integrity task that checks API -> ODS -> DWD completeness."""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from dateutil import parser as dtparser
import json
from pathlib import Path
from utils.windowing import build_window_segments, calc_window_minutes
from .base_task import BaseTask
from quality.integrity_checker import (
IntegrityWindow,
compute_last_etl_end,
run_integrity_history,
run_integrity_window,
)
class DataIntegrityTask(BaseTask):
"""Check data completeness across API -> ODS -> DWD."""
def get_task_code(self) -> str:
return "DATA_INTEGRITY_CHECK"
def execute(self, cursor_data: dict | None = None) -> dict:
tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei"))
mode = str(self.config.get("integrity.mode", "history") or "history").lower()
include_dimensions = bool(self.config.get("integrity.include_dimensions", False))
task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip()
auto_backfill = bool(self.config.get("integrity.auto_backfill", False))
# 检测是否通过 CLI 传入了时间窗口参数window_override
# 如果有,自动切换到 window 模式
window_override_start = self.config.get("run.window_override.start")
window_override_end = self.config.get("run.window_override.end")
if window_override_start or window_override_end:
self.logger.info(
"检测到 CLI 时间窗口参数,自动切换到 window 模式: %s ~ %s",
window_override_start, window_override_end
)
mode = "window"
if mode == "window":
base_start, base_end, _ = self._get_time_window(cursor_data)
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info("数据完整性检查: 分段执行 共%s", total_segments)
window_reports = []
total_missing = 0
total_errors = 0
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
window = IntegrityWindow(
start=seg_start,
end=seg_end,
label=f"segment_{idx}",
granularity="window",
)
payload = run_integrity_window(
cfg=self.config,
window=window,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=self.logger,
write_report=False,
window_split_unit="none",
window_compensation_hours=0,
)
window_reports.append(payload)
total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0)
total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0)
overall_start = segments[0][0]
overall_end = segments[-1][1]
report = {
"mode": "window",
"window": {
"start": overall_start.isoformat(),
"end": overall_end.isoformat(),
"segments": total_segments,
},
"windows": window_reports,
"api_to_ods": {
"total_missing": total_missing,
"total_errors": total_errors,
},
"total_missing": total_missing,
"total_errors": total_errors,
"generated_at": datetime.now(tz).isoformat(),
}
report_path = self._write_report(report, "data_integrity_window")
report["report_path"] = report_path
missing_count = int(total_missing or 0)
counts = {
"missing": missing_count,
"errors": int(total_errors or 0),
}
# ????
backfill_result = None
if auto_backfill and missing_count > 0:
backfill_result = self._run_backfill(base_start, base_end, task_codes)
counts["backfilled"] = backfill_result.get("backfilled", 0)
return {
"status": "SUCCESS",
"counts": counts,
"window": {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
},
"report_path": report_path,
"backfill_result": backfill_result,
}
history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01")
history_end = str(self.config.get("integrity.history_end", "") or "").strip()
start_dt = dtparser.parse(history_start)
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=tz)
else:
start_dt = start_dt.astimezone(tz)
if history_end:
end_dt = dtparser.parse(history_end)
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=tz)
else:
end_dt = end_dt.astimezone(tz)
else:
end_dt = compute_last_etl_end(self.config) or datetime.now(tz)
report = run_integrity_history(
cfg=self.config,
start_dt=start_dt,
end_dt=end_dt,
include_dimensions=include_dimensions,
task_codes=task_codes,
logger=self.logger,
write_report=True,
)
missing_count = int(report.get("total_missing") or 0)
counts = {
"missing": missing_count,
"errors": int(report.get("total_errors") or 0),
}
# 自动补全
backfill_result = None
if auto_backfill and missing_count > 0:
backfill_result = self._run_backfill(start_dt, end_dt, task_codes)
counts["backfilled"] = backfill_result.get("backfilled", 0)
return {
"status": "SUCCESS",
"counts": counts,
"window": {
"start": start_dt,
"end": end_dt,
"minutes": int((end_dt - start_dt).total_seconds() // 60) if end_dt > start_dt else 0,
},
"report_path": report.get("report_path"),
"backfill_result": backfill_result,
}
def _write_report(self, report: dict, prefix: str) -> str:
root = Path(__file__).resolve().parents[1]
stamp = datetime.now(self.tz).strftime("%Y%m%d_%H%M%S")
path = root / "reports" / f"{prefix}_{stamp}.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return str(path)
def _run_backfill(self, start_dt: datetime, end_dt: datetime, task_codes: str) -> dict:
"""运行数据补全"""
self.logger.info("自动补全开始 起始=%s 结束=%s", start_dt, end_dt)
try:
from scripts.backfill_missing_data import run_backfill
result = run_backfill(
cfg=self.config,
start=start_dt,
end=end_dt,
task_codes=task_codes or None,
dry_run=False,
page_size=200,
chunk_size=500,
logger=self.logger,
)
self.logger.info(
"自动补全完成 已补全=%s 错误数=%s",
result.get("backfilled", 0),
result.get("errors", 0),
)
return result
except Exception as exc:
self.logger.exception("自动补全失败")
return {"backfilled": 0, "errors": 1, "error": str(exc)}

View File

@@ -72,6 +72,10 @@ class DwdLoadTask(BaseTask):
"start_use_time",
"fetched_at",
]
# 对于会出现“回补旧记录”的事实表,额外补齐缺失主键记录
FACT_MISSING_FILL_TABLES = {
"billiards_dwd.dwd_assistant_service_log",
}
# 特殊列映射dwd 列名 -> 源列表达式(可选 CAST
FACT_MAPPINGS: dict[str, list[tuple[str, str, str | None]]] = {
@@ -697,7 +701,14 @@ class DwdLoadTask(BaseTask):
if not select_exprs:
return 0
cur.execute(f"SELECT {', '.join(select_exprs)} FROM {ods_table_sql}")
# 对于 dim_site 和 dim_site_ex使用 DISTINCT ON 优化查询
# 避免从大表 table_fee_transactions 全表扫描,只获取每个 site_id 的最新记录
if dwd_table in ("billiards_dwd.dim_site", "billiards_dwd.dim_site_ex"):
sql = f"SELECT DISTINCT ON (site_id) {', '.join(select_exprs)} FROM {ods_table_sql} ORDER BY site_id, fetched_at DESC NULLS LAST"
else:
sql = f"SELECT {', '.join(select_exprs)} FROM {ods_table_sql}"
cur.execute(sql)
rows = [{k.lower(): v for k, v in r.items()} for r in cur.fetchall()]
if dwd_table == "billiards_dwd.dim_goods_category":
@@ -1081,7 +1092,7 @@ class DwdLoadTask(BaseTask):
self.logger.warning("跳过 %s:未找到可插入的列", dwd_table)
return 0
order_col = self._pick_order_column(dwd_cols, ods_cols)
order_col = self._pick_order_column(dwd_table, dwd_cols, ods_cols)
where_sql = ""
params: List[Any] = []
dwd_table_sql = self._format_table(dwd_table, "billiards_dwd")
@@ -1090,9 +1101,7 @@ class DwdLoadTask(BaseTask):
where_sql = f'WHERE "{order_col}" >= %s AND "{order_col}" < %s'
params.extend([window_start, window_end])
elif order_col:
cur.execute(f'SELECT COALESCE(MAX("{order_col}"), %s) FROM {dwd_table_sql}', ("1970-01-01",))
row = cur.fetchone() or {}
watermark = list(row.values())[0] if row else "1970-01-01"
watermark = self._get_fact_watermark(cur, dwd_table, ods_table, order_col, dwd_cols, ods_cols)
where_sql = f'WHERE "{order_col}" > %s'
params.append(watermark)
@@ -1121,16 +1130,149 @@ class DwdLoadTask(BaseTask):
sql += f" ON CONFLICT ({pk_sql}) DO NOTHING"
cur.execute(sql, params)
return cur.rowcount
inserted = cur.rowcount
def _pick_order_column(self, dwd_cols: Iterable[str], ods_cols: Iterable[str]) -> str | None:
"""选择用于增量的时间列(需同时存在于 DWD 与 ODS"""
# 回补缺失主键记录处理历史回补导致的“create_time 水位”遗漏)
if dwd_table.lower() in self.FACT_MISSING_FILL_TABLES:
inserted += self._insert_missing_by_pk(
cur,
dwd_table,
ods_table,
dwd_cols,
ods_cols,
mapping,
insert_cols,
dwd_types,
ods_types,
)
return inserted
def _pick_order_column(self, dwd_table: str, dwd_cols: Iterable[str], ods_cols: Iterable[str]) -> str | None:
"""Pick an incremental order column that exists in both DWD and ODS."""
lower_cols = {c.lower() for c in dwd_cols} & {c.lower() for c in ods_cols}
for candidate in self.FACT_ORDER_CANDIDATES:
if candidate.lower() in lower_cols:
return candidate.lower()
return None
def _get_fact_watermark(
self,
cur,
dwd_table: str,
ods_table: str,
order_col: str,
dwd_cols: Iterable[str],
ods_cols: Iterable[str],
) -> Any:
"""Fetch incremental watermark; default from DWD, fallback from ODS join."""
dwd_table_sql = self._format_table(dwd_table, "billiards_dwd")
ods_table_sql = self._format_table(ods_table, "billiards_ods")
dwd_set = {c.lower() for c in dwd_cols}
ods_set = {c.lower() for c in ods_cols}
if order_col.lower() in dwd_set:
cur.execute(
f'SELECT COALESCE(MAX("{order_col}"), %s) FROM {dwd_table_sql}', ("1970-01-01",)
)
row = cur.fetchone() or {}
return list(row.values())[0] if row else "1970-01-01"
pk_cols = self._get_primary_keys(cur, dwd_table)
if not pk_cols or order_col.lower() not in ods_set:
return "1970-01-01"
join_cond = " AND ".join(f'd."{pk}" = o."{pk}"' for pk in pk_cols if pk.lower() in ods_set)
if not join_cond:
return "1970-01-01"
cur.execute(
f'SELECT COALESCE(MAX(o."{order_col}"), %s) FROM {ods_table_sql} o JOIN {dwd_table_sql} d ON {join_cond}',
("1970-01-01",),
)
row = cur.fetchone() or {}
return list(row.values())[0] if row else "1970-01-01"
def _insert_missing_by_pk(
self,
cur,
dwd_table: str,
ods_table: str,
dwd_cols: Sequence[str],
ods_cols: Sequence[str],
mapping: Dict[str, tuple[str, str | None]],
insert_cols: Sequence[str],
dwd_types: Dict[str, str],
ods_types: Dict[str, str],
) -> int:
"""Backfill missing PK rows for facts that can receive late data."""
pk_cols = self._get_primary_keys(cur, dwd_table)
if not pk_cols:
return 0
ods_set = {c.lower() for c in ods_cols}
dwd_table_sql = self._format_table(dwd_table, "billiards_dwd")
ods_table_sql = self._format_table(ods_table, "billiards_ods")
join_pairs = []
for pk in pk_cols:
pk_lower = pk.lower()
if pk_lower in mapping:
src, _ = mapping[pk_lower]
elif pk_lower in ods_set:
src = pk
elif "id" in ods_set:
src = "id"
else:
src = None
if not src:
return 0
join_pairs.append((pk, src))
join_cond = " AND ".join(
f'd."{pk}" = o."{src}"' for pk, src in join_pairs
)
null_cond = " AND ".join(f'd."{pk}" IS NULL' for pk, _ in join_pairs)
# 类型转换需要的类型集合
numeric_types = {"integer", "bigint", "smallint", "numeric", "double precision", "real", "decimal"}
text_types = {"text", "character varying", "varchar"}
select_exprs = []
for col in insert_cols:
key = col.lower()
if key in mapping:
src, cast_type = mapping[key]
if src.isidentifier():
expr = self._cast_expr(f'o."{src}"', cast_type)
else:
expr = self._cast_expr(src, cast_type)
select_exprs.append(expr)
elif key in ods_set:
# 检查是否需要类型转换 (ODS text -> DWD numeric)
d_type = dwd_types.get(col)
o_type = ods_types.get(col)
if d_type in numeric_types and o_type in text_types:
select_exprs.append(f'CAST(NULLIF(CAST(o."{col}" AS text), \'\') AS {d_type})')
else:
select_exprs.append(f'o."{col}"')
else:
select_exprs.append("NULL")
select_cols_sql = ", ".join(select_exprs)
insert_cols_sql = ", ".join(f'"{c}"' for c in insert_cols)
sql = (
f'INSERT INTO {dwd_table_sql} ({insert_cols_sql}) '
f'SELECT {select_cols_sql} '
f'FROM {ods_table_sql} o '
f'LEFT JOIN {dwd_table_sql} d ON {join_cond} '
f'WHERE {null_cond}'
)
pk_sql = ", ".join(f'"{c}"' for c in pk_cols)
sql += f" ON CONFLICT ({pk_sql}) DO NOTHING"
cur.execute(sql)
return cur.rowcount
def _build_fact_select_exprs(
self,
insert_cols: Sequence[str],

View File

@@ -7,6 +7,7 @@ from datetime import date
from typing import Any
from .base_task import BaseTask, TaskContext
from utils.windowing import build_window_segments, calc_window_minutes
from scripts.build_dws_order_summary import SQL_BUILD_SUMMARY
@@ -17,37 +18,86 @@ class DwsBuildOrderSummaryTask(BaseTask):
return "DWS_BUILD_ORDER_SUMMARY"
def execute(self, cursor_data: dict | None = None) -> dict:
context = self._build_context(cursor_data)
base_context = self._build_context(cursor_data)
task_code = self.get_task_code()
self.logger.info(
"%s: start, window[%s ~ %s]",
task_code,
context.window_start,
context.window_end,
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
load_result = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: failed", task_code, exc_info=True)
raise
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 分段执行 共%s", task_code, total_segments)
counts = load_result.get("counts") or {}
result = {"status": "SUCCESS", "counts": counts}
total_counts: dict = {}
segment_results: list[dict] = []
request_params_list: list[dict] = []
total_deleted = 0
for idx, (window_start, window_end) in enumerate(segments, start=1):
context = self._build_context_for_window(window_start, window_end, cursor_data)
self.logger.info(
"%s: 开始执行(%s/%s), 窗口[%s ~ %s]",
task_code,
idx,
total_segments,
context.window_start,
context.window_end,
)
try:
extracted = self.extract(context)
transformed = self.transform(extracted, context)
load_result = self.load(transformed, context) or {}
self.db.commit()
except Exception:
self.db.rollback()
self.logger.error("%s: 执行失败", task_code, exc_info=True)
raise
counts = load_result.get("counts") or {}
self._accumulate_counts(total_counts, counts)
extra = load_result.get("extra") or {}
deleted = int(extra.get("deleted") or 0)
total_deleted += deleted
request_params = load_result.get("request_params")
if request_params:
request_params_list.append(request_params)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": counts,
"extra": extra,
}
)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result = {"status": "SUCCESS", "counts": total_counts}
result["window"] = {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if "request_params" in load_result:
result["request_params"] = load_result["request_params"]
if "extra" in load_result:
result["extra"] = load_result["extra"]
self.logger.info("%s: done, counts=%s", task_code, counts)
if segment_results:
result["segments"] = segment_results
if request_params_list:
result["request_params"] = request_params_list[0] if len(request_params_list) == 1 else request_params_list
if total_deleted:
result["extra"] = {"deleted": total_deleted}
self.logger.info("%s: 完成, 统计=%s", task_code, total_counts)
return result
def extract(self, context: TaskContext) -> dict[str, Any]:
@@ -101,7 +151,7 @@ class DwsBuildOrderSummaryTask(BaseTask):
if extracted["delete_before_insert"]:
if extracted["full_refresh"] and extracted["site_id"] is None:
cur.execute("TRUNCATE TABLE billiards_dws.dws_order_summary;")
self.logger.info("DWS_BUILD_ORDER_SUMMARY: truncated billiards_dws.dws_order_summary")
self.logger.info("DWS订单汇总: 已清空 billiards_dws.dws_order_summary")
else:
delete_sql = "DELETE FROM billiards_dws.dws_order_summary WHERE 1=1"
delete_args: list[Any] = []
@@ -116,7 +166,7 @@ class DwsBuildOrderSummaryTask(BaseTask):
delete_args.append(_as_date(extracted["end_date"]))
cur.execute(delete_sql, delete_args)
deleted = cur.rowcount
self.logger.info("DWS_BUILD_ORDER_SUMMARY: deleted=%s sql=%s", deleted, delete_sql)
self.logger.info("DWS订单汇总: 删除=%s 语句=%s", deleted, delete_sql)
cur.execute(SQL_BUILD_SUMMARY, sql_params)
affected = cur.rowcount

View File

@@ -79,7 +79,7 @@ class ManualIngestTask(BaseTask):
data_dir = (
self.config.get("manual.data_dir")
or self.config.get("pipeline.ingest_source_dir")
or r"c:\dev\LLTQ\ETL\feiqiu-ETL\etl_billiards\tests\testdata_json"
or os.path.join("tests", "testdata_json")
)
if not os.path.exists(data_dir):
self.logger.error("Data directory not found: %s", data_dir)

View File

@@ -3,6 +3,7 @@ from .base_dwd_task import BaseDwdTask
from loaders.dimensions.member import MemberLoader
from models.parsers import TypeParser
import json
from utils.windowing import build_window_segments
class MembersDwdTask(BaseDwdTask):
"""
@@ -17,53 +18,72 @@ class MembersDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
loader = MemberLoader(self.db)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_updated = 0
total_errors = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.member_profiles",
columns=["site_id", "member_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.member_profiles",
columns=["site_id", "member_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_member(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}"
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_member(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}")
return {
"status": "success",
"inserted": total_inserted,
"updated": total_updated,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
"window_start": overall_start.isoformat(),
"window_end": overall_end.isoformat()
}
def _parse_member(self, raw: dict, store_id: int) -> dict:

View File

@@ -11,6 +11,7 @@ from psycopg2.extras import Json, execute_values
from models.parsers import TypeParser
from .base_task import BaseTask
from utils.windowing import build_window_segments, calc_window_minutes
ColumnTransform = Callable[[Any], Any]
@@ -64,64 +65,112 @@ class BaseOdsTask(BaseTask):
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
self.logger.info("寮€濮嬫墽琛?%s (ODS)", spec.code)
self.logger.info("开始执行%s (ODS)", spec.code)
window_start, window_end, window_minutes = self._resolve_window(cursor_data)
segments = build_window_segments(
self.config,
window_start,
window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(window_start, window_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 窗口拆分为 %s", spec.code, total_segments)
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
if not store_id:
raise ValueError("app.store_id 鏈厤缃紝鏃犳硶鎵ц ODS 浠诲姟")
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
page_size = self.config.get("api.page_size", 200)
params = self._build_params(
spec,
store_id,
window_start=window_start,
window_end=window_end,
)
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
total_counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
segment_results: list[dict] = []
params_list: list[dict] = []
source_file = self._resolve_source_file_hint(spec)
try:
for _, page_records, _, response_payload in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
inserted, skipped = self._insert_records_schema_aware(
table=spec.table_name,
records=page_records,
response_payload=response_payload,
source_file=source_file,
source_endpoint=spec.endpoint if spec.include_source_endpoint else None,
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
params = self._build_params(
spec,
store_id,
window_start=seg_start,
window_end=seg_end,
)
counts["fetched"] += len(page_records)
counts["inserted"] += inserted
counts["skipped"] += skipped
params_list.append(params)
segment_counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
self.db.commit()
self.logger.info("%s ODS 浠诲姟瀹屾垚: %s", spec.code, counts)
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
spec.code,
idx,
total_segments,
seg_start,
seg_end,
)
for _, page_records, _, response_payload in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
inserted, skipped = self._insert_records_schema_aware(
table=spec.table_name,
records=page_records,
response_payload=response_payload,
source_file=source_file,
source_endpoint=spec.endpoint if spec.include_source_endpoint else None,
)
segment_counts["fetched"] += len(page_records)
segment_counts["inserted"] += inserted
segment_counts["skipped"] += skipped
self.db.commit()
self._accumulate_counts(total_counts, segment_counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": seg_start,
"end": seg_end,
"minutes": calc_window_minutes(seg_start, seg_end),
},
"counts": segment_counts,
}
)
self.logger.info("%s ODS 任务完成: %s", spec.code, total_counts)
allow_empty_advance = bool(self.config.get("run.allow_empty_result_advance", False))
status = "SUCCESS"
if counts["fetched"] == 0 and not allow_empty_advance:
if total_counts["fetched"] == 0 and not allow_empty_advance:
status = "PARTIAL"
result = self._build_result(status, counts)
result = self._build_result(status, total_counts)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result["window"] = {
"start": window_start,
"end": window_end,
"minutes": window_minutes,
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
result["request_params"] = params
if total_segments > 1:
result["segments"] = segment_results
if len(params_list) == 1:
result["request_params"] = params_list[0]
else:
result["request_params"] = params_list
return result
except Exception:
self.db.rollback()
counts["errors"] += 1
self.logger.error("%s ODS 浠诲姟澶辫触", spec.code, exc_info=True)
total_counts["errors"] += 1
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
raise
def _resolve_window(self, cursor_data: dict | None) -> tuple[datetime, datetime, int]:
@@ -966,72 +1015,121 @@ class OdsSettlementTicketTask(BaseOdsTask):
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
context = self._build_context(cursor_data)
store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0
base_context = self._build_context(cursor_data)
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 窗口拆分为 %s", spec.code, total_segments)
store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0
counts_total = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
segment_results: list[dict] = []
source_file = self._resolve_source_file_hint(spec)
try:
existing_ids = self._fetch_existing_ticket_ids()
candidates = self._collect_settlement_ids(
store_id, existing_ids, context.window_start, context.window_end
)
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
counts["fetched"] = len(candidates)
if not candidates:
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
context = self._build_context_for_window(seg_start, seg_end, cursor_data)
self.logger.info(
"%s: 绐楀彛[%s ~ %s] 鏈彂鐜伴渶瑕佹姄鍙栫殑灏忕エ",
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
spec.code,
idx,
total_segments,
context.window_start,
context.window_end,
)
result = self._build_result("SUCCESS", counts)
result["window"] = {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
}
result["request_params"] = {"candidates": 0}
return result
payloads, skipped = self._fetch_ticket_payloads(candidates)
counts["skipped"] += skipped
inserted, skipped2 = self._insert_records_schema_aware(
table=spec.table_name,
records=payloads,
response_payload=None,
source_file=source_file,
source_endpoint=spec.endpoint,
)
counts["inserted"] += inserted
counts["skipped"] += skipped2
self.db.commit()
candidates = self._collect_settlement_ids(
store_id, existing_ids, context.window_start, context.window_end
)
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
segment_counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
segment_counts["fetched"] = len(candidates)
if not candidates:
self.logger.info(
"%s: 窗口[%s ~ %s] 未发现需要抓取的小票",
spec.code,
context.window_start,
context.window_end,
)
self._accumulate_counts(counts_total, segment_counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": segment_counts,
}
)
continue
payloads, skipped = self._fetch_ticket_payloads(candidates)
segment_counts["skipped"] += skipped
inserted, skipped2 = self._insert_records_schema_aware(
table=spec.table_name,
records=payloads,
response_payload=None,
source_file=source_file,
source_endpoint=spec.endpoint,
)
segment_counts["inserted"] += inserted
segment_counts["skipped"] += skipped2
self.db.commit()
existing_ids.update(candidates)
self._accumulate_counts(counts_total, segment_counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": segment_counts,
}
)
self.logger.info(
"%s: 灏忕エ鎶撳彇瀹屾垚锛屽€欓€?%s 鎻掑叆=%s 鏇存柊=%s 璺宠繃=%s",
"%s: 小票抓取完成,抓取=%s 插入=%s 更新=%s 跳过=%s",
spec.code,
len(candidates),
inserted,
counts_total["fetched"],
counts_total["inserted"],
0,
counts["skipped"],
counts_total["skipped"],
)
result = self._build_result("SUCCESS", counts)
result = self._build_result("SUCCESS", counts_total)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result["window"] = {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
result["request_params"] = {"candidates": len(candidates)}
if segment_results:
result["segments"] = segment_results
result["request_params"] = {"candidates": counts_total["fetched"]}
return result
except Exception:
counts["errors"] += 1
counts_total["errors"] += 1
self.db.rollback()
self.logger.error("%s: 灏忕エ鎶撳彇澶辫触", spec.code, exc_info=True)
self.logger.error("%s: 小票抓取失败", spec.code, exc_info=True)
raise
# ------------------------------------------------------------------ helpers
def _fetch_existing_ticket_ids(self) -> set[int]:
sql = """
SELECT DISTINCT

View File

@@ -3,6 +3,7 @@ from .base_dwd_task import BaseDwdTask
from loaders.facts.payment import PaymentLoader
from models.parsers import TypeParser
import json
from utils.windowing import build_window_segments
class PaymentsDwdTask(BaseDwdTask):
"""
@@ -17,8 +18,20 @@ class PaymentsDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
loader = PaymentLoader(self.db, logger=self.logger)
store_id = self.config.get("app.store_id")
@@ -27,36 +40,42 @@ class PaymentsDwdTask(BaseDwdTask):
total_updated = 0
total_skipped = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.payment_transactions",
columns=["site_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.payment_transactions",
columns=["site_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
for batch in batches:
if not batch:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
total_skipped += skipped
self.db.commit()
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
total_skipped += skipped
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
"Task %s completed. inserted=%s updated=%s skipped=%s",
self.get_task_code(),
@@ -64,7 +83,7 @@ class PaymentsDwdTask(BaseDwdTask):
total_updated,
total_skipped,
)
return {
"status": "SUCCESS",
"counts": {
@@ -72,8 +91,8 @@ class PaymentsDwdTask(BaseDwdTask):
"updated": total_updated,
"skipped": total_skipped,
},
"window_start": window_start,
"window_end": window_end,
"window_start": overall_start,
"window_end": overall_end,
}
def _parse_payment(self, raw: dict, store_id: int) -> dict:

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.ticket import TicketLoader
from utils.windowing import build_window_segments
class TicketDwdTask(BaseDwdTask):
"""
@@ -19,51 +20,66 @@ class TicketDwdTask(BaseDwdTask):
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
# 1. Get Time Window (Incremental Load)
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
base_start, base_end, _ = self._get_time_window()
segments = build_window_segments(
self.config,
base_start,
base_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_start, base_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info(f"{self.get_task_code()}: ????? {total_segments} ?")
# 2. Initialize Loader
loader = TicketLoader(self.db, logger=self.logger)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_errors = 0
# 3. Iterate ODS Data
# We query ods_ticket_detail based on fetched_at
batches = self.iter_ods_rows(
table_name="billiards_ods.settlement_ticket_details",
columns=["payload", "fetched_at", "source_file", "record_index"],
start_time=window_start,
end_time=window_end
for idx, (window_start, window_end) in enumerate(segments, start=1):
self.logger.info(
f"Processing window {idx}/{total_segments}: {window_start} to {window_end}"
)
batches = self.iter_ods_rows(
table_name="billiards_ods.settlement_ticket_details",
columns=["payload", "fetched_at", "source_file", "record_index"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
tickets = []
for row in batch:
payload = self.parse_payload(row)
if payload:
tickets.append(payload)
inserted, errors = loader.process_tickets(tickets, store_id)
total_inserted += inserted
total_errors += errors
self.db.commit()
overall_start = segments[0][0]
overall_end = segments[-1][1]
self.logger.info(
f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}"
)
for batch in batches:
if not batch:
continue
# Extract payloads
tickets = []
for row in batch:
payload = self.parse_payload(row)
if payload:
tickets.append(payload)
# Process Batch
inserted, errors = loader.process_tickets(tickets, store_id)
total_inserted += inserted
total_errors += errors
# 4. Commit
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
return {
"status": "success",
"inserted": total_inserted,
"errors": total_errors,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
"window_start": overall_start.isoformat(),
"window_end": overall_end.isoformat()
}