376 lines
14 KiB
Python
376 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
DWS 历史数据重算脚本 — 按营业日(Business Day Cutoff)口径重新聚合
|
||
|
||
用途:
|
||
引入 BUSINESS_DAY_START_HOUR 后,需要将历史 DWS 数据按新口径重算。
|
||
脚本复用正式 ETL CLI(cli.main),按月分窗口执行,避免单次事务过大。
|
||
|
||
用法:
|
||
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01
|
||
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --dry-run
|
||
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --fail-fast
|
||
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --tasks DWS_FINANCE_DAILY,DWS_MEMBER_VISIT
|
||
|
||
Requirements: 10.1, 10.2, 10.3, 10.4
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from datetime import date, datetime, timedelta
|
||
from pathlib import Path
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from dotenv import load_dotenv
|
||
|
||
# ── 环境初始化 ────────────────────────────────────────────────
|
||
_ROOT = Path(__file__).resolve().parents[2]
|
||
load_dotenv(_ROOT / ".env")
|
||
|
||
TZ = ZoneInfo("Asia/Shanghai")
|
||
ETL_CWD = _ROOT / "apps" / "etl" / "connectors" / "feiqiu"
|
||
|
||
# 所有需要重算的 DWS 任务代码(按依赖顺序排列)
|
||
# 不含 DWS_MAINTENANCE / INDEX 层——重算完成后可单独刷新
|
||
ALL_DWS_TASKS: list[str] = [
|
||
# 财务维度
|
||
"DWS_FINANCE_DAILY",
|
||
"DWS_FINANCE_RECHARGE",
|
||
"DWS_FINANCE_INCOME_STRUCTURE",
|
||
"DWS_FINANCE_DISCOUNT_DETAIL",
|
||
# 助教维度
|
||
"DWS_ASSISTANT_DAILY",
|
||
"DWS_ASSISTANT_ORDER_CONTRIBUTION",
|
||
"DWS_ASSISTANT_CUSTOMER",
|
||
"DWS_ASSISTANT_SALARY",
|
||
"DWS_ASSISTANT_FINANCE",
|
||
"DWS_ASSISTANT_MONTHLY",
|
||
# 会员维度
|
||
"DWS_MEMBER_VISIT",
|
||
"DWS_MEMBER_CONSUMPTION",
|
||
# 库存维度
|
||
"DWS_GOODS_STOCK_DAILY",
|
||
"DWS_GOODS_STOCK_WEEKLY",
|
||
"DWS_GOODS_STOCK_MONTHLY",
|
||
]
|
||
|
||
|
||
# ── 月窗口拆分 ────────────────────────────────────────────────
|
||
|
||
def split_by_month(start: date, end: date) -> list[tuple[date, date]]:
|
||
"""将 [start, end) 按自然月拆分为多个 (window_start, window_end) 窗口。
|
||
|
||
每个窗口:window_start = 当月1日, window_end = 次月1日(或 end,取较小值)。
|
||
"""
|
||
windows: list[tuple[date, date]] = []
|
||
cursor = start.replace(day=1) # 对齐到月初
|
||
if cursor < start:
|
||
cursor = start
|
||
while cursor < end:
|
||
# 次月1日
|
||
if cursor.month == 12:
|
||
next_month_first = date(cursor.year + 1, 1, 1)
|
||
else:
|
||
next_month_first = date(cursor.year, cursor.month + 1, 1)
|
||
window_end = min(next_month_first, end)
|
||
windows.append((cursor, window_end))
|
||
cursor = next_month_first
|
||
return windows
|
||
|
||
|
||
# ── 行数统计 ──────────────────────────────────────────────────
|
||
|
||
def _get_row_count_sql(task_code: str) -> str | None:
|
||
"""根据任务代码返回对应 DWS 表的行数查询 SQL。
|
||
|
||
返回 None 表示该任务无法直接统计行数(如 MV 刷新)。
|
||
"""
|
||
# 任务代码 → DWS 表名映射
|
||
table_map: dict[str, str] = {
|
||
"DWS_FINANCE_DAILY": "dws.dws_finance_daily_summary",
|
||
"DWS_FINANCE_RECHARGE": "dws.dws_finance_recharge_summary",
|
||
"DWS_FINANCE_INCOME_STRUCTURE": "dws.dws_finance_income_structure",
|
||
"DWS_FINANCE_DISCOUNT_DETAIL": "dws.dws_finance_discount_detail",
|
||
"DWS_ASSISTANT_DAILY": "dws.dws_assistant_daily_detail",
|
||
"DWS_ASSISTANT_ORDER_CONTRIBUTION": "dws.dws_assistant_order_contribution",
|
||
"DWS_ASSISTANT_CUSTOMER": "dws.dws_assistant_customer_daily",
|
||
"DWS_ASSISTANT_SALARY": "dws.dws_assistant_salary",
|
||
"DWS_ASSISTANT_FINANCE": "dws.dws_assistant_finance_analysis",
|
||
"DWS_ASSISTANT_MONTHLY": "dws.dws_assistant_monthly_summary",
|
||
"DWS_MEMBER_VISIT": "dws.dws_member_visit_daily",
|
||
"DWS_MEMBER_CONSUMPTION": "dws.dws_member_consumption_daily",
|
||
"DWS_GOODS_STOCK_DAILY": "dws.dws_goods_stock_daily",
|
||
"DWS_GOODS_STOCK_WEEKLY": "dws.dws_goods_stock_weekly",
|
||
"DWS_GOODS_STOCK_MONTHLY": "dws.dws_goods_stock_monthly",
|
||
}
|
||
table = table_map.get(task_code)
|
||
if not table:
|
||
return None
|
||
return f"SELECT COUNT(*) AS cnt FROM {table}"
|
||
|
||
|
||
def query_row_count(task_code: str, dsn: str) -> int | None:
|
||
"""查询指定任务对应 DWS 表的行数。需要 psycopg2。"""
|
||
sql = _get_row_count_sql(task_code)
|
||
if not sql:
|
||
return None
|
||
try:
|
||
import psycopg2
|
||
with psycopg2.connect(dsn) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
row = cur.fetchone()
|
||
return row[0] if row else 0
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ── 单任务单窗口执行 ──────────────────────────────────────────
|
||
|
||
def run_task_window(
|
||
task_code: str,
|
||
window_start: date,
|
||
window_end: date,
|
||
dry_run: bool = False,
|
||
timeout_sec: int = 600,
|
||
) -> dict:
|
||
"""通过 CLI 子进程执行单个任务的单个月窗口。
|
||
|
||
返回 dict: task, window, success, elapsed, output
|
||
"""
|
||
cmd = [
|
||
"uv", "run", "--package", "etl-feiqiu",
|
||
"python", "-m", "cli.main",
|
||
"--layers", "DWS",
|
||
"--tasks", task_code,
|
||
"--window-start", f"{window_start} 00:00:00",
|
||
"--window-end", f"{window_end} 00:00:00",
|
||
"--force-full",
|
||
]
|
||
if dry_run:
|
||
cmd.append("--dry-run")
|
||
|
||
start_ts = time.time()
|
||
try:
|
||
proc = subprocess.run(
|
||
cmd,
|
||
cwd=str(ETL_CWD),
|
||
capture_output=True,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
timeout=timeout_sec,
|
||
)
|
||
elapsed = time.time() - start_ts
|
||
output = proc.stdout + proc.stderr
|
||
has_error = any(
|
||
kw in line
|
||
for line in output.splitlines()
|
||
for kw in ("ERROR", "CRITICAL", "Traceback")
|
||
)
|
||
success = proc.returncode == 0 and not has_error
|
||
except subprocess.TimeoutExpired:
|
||
elapsed = time.time() - start_ts
|
||
output = f"TIMEOUT after {timeout_sec}s"
|
||
success = False
|
||
|
||
return {
|
||
"task": task_code,
|
||
"window": f"{window_start} ~ {window_end}",
|
||
"success": success,
|
||
"elapsed": elapsed,
|
||
"output": output,
|
||
}
|
||
|
||
|
||
# ── 主流程 ────────────────────────────────────────────────────
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(
|
||
description="DWS 历史数据重算(按营业日口径)",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
)
|
||
parser.add_argument(
|
||
"--start-date", required=True,
|
||
help="重算起始日期 (YYYY-MM-DD)",
|
||
)
|
||
parser.add_argument(
|
||
"--end-date", required=True,
|
||
help="重算结束日期 (YYYY-MM-DD)",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run", action="store_true",
|
||
help="预览模式,不实际执行写入",
|
||
)
|
||
parser.add_argument(
|
||
"--fail-fast", action="store_true",
|
||
help="遇错即停(默认继续下一窗口)",
|
||
)
|
||
parser.add_argument(
|
||
"--tasks",
|
||
help="指定要重算的任务列表(逗号分隔),不指定则全部",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def main() -> int:
|
||
args = parse_args()
|
||
|
||
# 解析日期
|
||
try:
|
||
start_date = date.fromisoformat(args.start_date)
|
||
end_date = date.fromisoformat(args.end_date)
|
||
except ValueError as e:
|
||
print(f"日期格式错误: {e}", file=sys.stderr)
|
||
return 1
|
||
|
||
if start_date >= end_date:
|
||
print("错误: --start-date 必须早于 --end-date", file=sys.stderr)
|
||
return 1
|
||
|
||
# 确定任务列表
|
||
if args.tasks:
|
||
task_codes = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
|
||
# 校验任务代码合法性
|
||
invalid = [t for t in task_codes if t not in ALL_DWS_TASKS]
|
||
if invalid:
|
||
print(f"未知任务代码: {', '.join(invalid)}", file=sys.stderr)
|
||
print(f"可用任务: {', '.join(ALL_DWS_TASKS)}", file=sys.stderr)
|
||
return 1
|
||
else:
|
||
task_codes = list(ALL_DWS_TASKS)
|
||
|
||
# 拆分月窗口
|
||
windows = split_by_month(start_date, end_date)
|
||
|
||
# 日志目录
|
||
log_root = os.environ.get("SYSTEM_LOG_ROOT")
|
||
if not log_root:
|
||
raise RuntimeError(
|
||
"环境变量 SYSTEM_LOG_ROOT 未定义。"
|
||
"请在根 .env 中配置,参考 .env.template"
|
||
)
|
||
log_dir = Path(log_root)
|
||
log_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 数据库 DSN(用于行数统计)
|
||
pg_dsn = os.environ.get("PG_DSN", "")
|
||
|
||
now = datetime.now(TZ)
|
||
mode_label = "[DRY-RUN] " if args.dry_run else ""
|
||
fail_fast_label = " [FAIL-FAST]" if args.fail_fast else ""
|
||
|
||
print(f"{'='*70}")
|
||
print(f"{mode_label}DWS 历史数据重算 — 营业日口径{fail_fast_label}")
|
||
print(f"时间范围: {start_date} ~ {end_date}")
|
||
print(f"月窗口数: {len(windows)}")
|
||
print(f"任务数量: {len(task_codes)}")
|
||
print(f"任务列表: {', '.join(task_codes)}")
|
||
print(f"开始时间: {now.isoformat()}")
|
||
print(f"{'='*70}\n")
|
||
|
||
# 日志文件
|
||
log_file = log_dir / f"{now.strftime('%Y%m%d_%H%M%S')}_rebuild_dws_biz_date.log"
|
||
log_lines: list[str] = [
|
||
f"{mode_label}DWS 历史数据重算 — 营业日口径{fail_fast_label}\n",
|
||
f"时间范围: {start_date} ~ {end_date}\n",
|
||
f"月窗口数: {len(windows)}\n",
|
||
f"任务列表: {', '.join(task_codes)}\n",
|
||
f"开始时间: {now.isoformat()}\n",
|
||
f"{'='*70}\n\n",
|
||
]
|
||
|
||
# ── 执行前行数统计 ────────────────────────────────────────
|
||
row_counts_before: dict[str, int | None] = {}
|
||
if pg_dsn and not args.dry_run:
|
||
print("统计执行前行数...")
|
||
for tc in task_codes:
|
||
cnt = query_row_count(tc, pg_dsn)
|
||
row_counts_before[tc] = cnt
|
||
if cnt is not None:
|
||
log_lines.append(f"[BEFORE] {tc}: {cnt:,} 行\n")
|
||
log_lines.append("\n")
|
||
|
||
# ── 逐任务逐窗口执行 ──────────────────────────────────────
|
||
results: list[dict] = []
|
||
total_steps = len(task_codes) * len(windows)
|
||
step = 0
|
||
aborted = False
|
||
|
||
for task_code in task_codes:
|
||
if aborted:
|
||
break
|
||
for w_start, w_end in windows:
|
||
step += 1
|
||
label = f"[{step}/{total_steps}] {task_code} | {w_start} ~ {w_end}"
|
||
print(f"{label} ...", end=" ", flush=True)
|
||
|
||
result = run_task_window(
|
||
task_code, w_start, w_end,
|
||
dry_run=args.dry_run,
|
||
)
|
||
results.append(result)
|
||
|
||
icon = "OK" if result["success"] else "FAIL"
|
||
print(f"{icon} ({result['elapsed']:.0f}s)")
|
||
|
||
log_lines.append(f"[{icon}] {label} ({result['elapsed']:.0f}s)\n")
|
||
if not result["success"]:
|
||
# 输出最后 30 行错误信息
|
||
tail = result["output"].splitlines()[-30:]
|
||
for line in tail:
|
||
print(f" | {line}")
|
||
log_lines.append(f" | {line}\n")
|
||
log_lines.append("\n")
|
||
|
||
if args.fail_fast:
|
||
print("\n--fail-fast 已启用,中止后续执行")
|
||
log_lines.append("\n--fail-fast 中止\n")
|
||
aborted = True
|
||
break
|
||
|
||
# ── 执行后行数统计 ────────────────────────────────────────
|
||
row_counts_after: dict[str, int | None] = {}
|
||
if pg_dsn and not args.dry_run:
|
||
print("\n统计执行后行数...")
|
||
log_lines.append("\n")
|
||
for tc in task_codes:
|
||
cnt = query_row_count(tc, pg_dsn)
|
||
row_counts_after[tc] = cnt
|
||
before = row_counts_before.get(tc)
|
||
if cnt is not None:
|
||
diff = ""
|
||
if before is not None:
|
||
delta = cnt - before
|
||
sign = "+" if delta >= 0 else ""
|
||
diff = f" (差异: {sign}{delta:,})"
|
||
log_lines.append(f"[AFTER] {tc}: {cnt:,} 行{diff}\n")
|
||
|
||
# ── 汇总 ──────────────────────────────────────────────────
|
||
ok_count = sum(1 for r in results if r["success"])
|
||
fail_count = sum(1 for r in results if not r["success"])
|
||
total_elapsed = sum(r["elapsed"] for r in results)
|
||
|
||
summary = (
|
||
f"\n{'='*70}\n"
|
||
f"重算完成: {ok_count}/{len(results)} 成功, {fail_count} 失败"
|
||
f" (总耗时 {total_elapsed:.0f}s)\n"
|
||
)
|
||
print(summary)
|
||
log_lines.append(summary)
|
||
|
||
# 写入日志
|
||
with open(log_file, "w", encoding="utf-8") as f:
|
||
f.writelines(log_lines)
|
||
print(f"日志: {log_file}")
|
||
|
||
return 0 if fail_count == 0 else 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|