Files
Neo-ZQYY/scripts/ops/rebuild_dws_biz_date.py

376 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
DWS 历史数据重算脚本 — 按营业日Business Day Cutoff口径重新聚合
用途:
引入 BUSINESS_DAY_START_HOUR 后,需要将历史 DWS 数据按新口径重算。
脚本复用正式 ETL CLIcli.main按月分窗口执行避免单次事务过大。
用法:
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --dry-run
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --fail-fast
python scripts/ops/rebuild_dws_biz_date.py --start-date 2025-11-01 --end-date 2026-03-01 --tasks DWS_FINANCE_DAILY,DWS_MEMBER_VISIT
Requirements: 10.1, 10.2, 10.3, 10.4
"""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
from dotenv import load_dotenv
# ── 环境初始化 ────────────────────────────────────────────────
_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(_ROOT / ".env")
TZ = ZoneInfo("Asia/Shanghai")
ETL_CWD = _ROOT / "apps" / "etl" / "connectors" / "feiqiu"
# 所有需要重算的 DWS 任务代码(按依赖顺序排列)
# 不含 DWS_MAINTENANCE / INDEX 层——重算完成后可单独刷新
ALL_DWS_TASKS: list[str] = [
# 财务维度
"DWS_FINANCE_DAILY",
"DWS_FINANCE_RECHARGE",
"DWS_FINANCE_INCOME_STRUCTURE",
"DWS_FINANCE_DISCOUNT_DETAIL",
# 助教维度
"DWS_ASSISTANT_DAILY",
"DWS_ASSISTANT_ORDER_CONTRIBUTION",
"DWS_ASSISTANT_CUSTOMER",
"DWS_ASSISTANT_SALARY",
"DWS_ASSISTANT_FINANCE",
"DWS_ASSISTANT_MONTHLY",
# 会员维度
"DWS_MEMBER_VISIT",
"DWS_MEMBER_CONSUMPTION",
# 库存维度
"DWS_GOODS_STOCK_DAILY",
"DWS_GOODS_STOCK_WEEKLY",
"DWS_GOODS_STOCK_MONTHLY",
]
# ── 月窗口拆分 ────────────────────────────────────────────────
def split_by_month(start: date, end: date) -> list[tuple[date, date]]:
"""将 [start, end) 按自然月拆分为多个 (window_start, window_end) 窗口。
每个窗口window_start = 当月1日, window_end = 次月1日或 end取较小值
"""
windows: list[tuple[date, date]] = []
cursor = start.replace(day=1) # 对齐到月初
if cursor < start:
cursor = start
while cursor < end:
# 次月1日
if cursor.month == 12:
next_month_first = date(cursor.year + 1, 1, 1)
else:
next_month_first = date(cursor.year, cursor.month + 1, 1)
window_end = min(next_month_first, end)
windows.append((cursor, window_end))
cursor = next_month_first
return windows
# ── 行数统计 ──────────────────────────────────────────────────
def _get_row_count_sql(task_code: str) -> str | None:
"""根据任务代码返回对应 DWS 表的行数查询 SQL。
返回 None 表示该任务无法直接统计行数(如 MV 刷新)。
"""
# 任务代码 → DWS 表名映射
table_map: dict[str, str] = {
"DWS_FINANCE_DAILY": "dws.dws_finance_daily_summary",
"DWS_FINANCE_RECHARGE": "dws.dws_finance_recharge_summary",
"DWS_FINANCE_INCOME_STRUCTURE": "dws.dws_finance_income_structure",
"DWS_FINANCE_DISCOUNT_DETAIL": "dws.dws_finance_discount_detail",
"DWS_ASSISTANT_DAILY": "dws.dws_assistant_daily_detail",
"DWS_ASSISTANT_ORDER_CONTRIBUTION": "dws.dws_assistant_order_contribution",
"DWS_ASSISTANT_CUSTOMER": "dws.dws_assistant_customer_daily",
"DWS_ASSISTANT_SALARY": "dws.dws_assistant_salary",
"DWS_ASSISTANT_FINANCE": "dws.dws_assistant_finance_analysis",
"DWS_ASSISTANT_MONTHLY": "dws.dws_assistant_monthly_summary",
"DWS_MEMBER_VISIT": "dws.dws_member_visit_daily",
"DWS_MEMBER_CONSUMPTION": "dws.dws_member_consumption_daily",
"DWS_GOODS_STOCK_DAILY": "dws.dws_goods_stock_daily",
"DWS_GOODS_STOCK_WEEKLY": "dws.dws_goods_stock_weekly",
"DWS_GOODS_STOCK_MONTHLY": "dws.dws_goods_stock_monthly",
}
table = table_map.get(task_code)
if not table:
return None
return f"SELECT COUNT(*) AS cnt FROM {table}"
def query_row_count(task_code: str, dsn: str) -> int | None:
"""查询指定任务对应 DWS 表的行数。需要 psycopg2。"""
sql = _get_row_count_sql(task_code)
if not sql:
return None
try:
import psycopg2
with psycopg2.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(sql)
row = cur.fetchone()
return row[0] if row else 0
except Exception:
return None
# ── 单任务单窗口执行 ──────────────────────────────────────────
def run_task_window(
task_code: str,
window_start: date,
window_end: date,
dry_run: bool = False,
timeout_sec: int = 600,
) -> dict:
"""通过 CLI 子进程执行单个任务的单个月窗口。
返回 dict: task, window, success, elapsed, output
"""
cmd = [
"uv", "run", "--package", "etl-feiqiu",
"python", "-m", "cli.main",
"--layers", "DWS",
"--tasks", task_code,
"--window-start", f"{window_start} 00:00:00",
"--window-end", f"{window_end} 00:00:00",
"--force-full",
]
if dry_run:
cmd.append("--dry-run")
start_ts = time.time()
try:
proc = subprocess.run(
cmd,
cwd=str(ETL_CWD),
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout_sec,
)
elapsed = time.time() - start_ts
output = proc.stdout + proc.stderr
has_error = any(
kw in line
for line in output.splitlines()
for kw in ("ERROR", "CRITICAL", "Traceback")
)
success = proc.returncode == 0 and not has_error
except subprocess.TimeoutExpired:
elapsed = time.time() - start_ts
output = f"TIMEOUT after {timeout_sec}s"
success = False
return {
"task": task_code,
"window": f"{window_start} ~ {window_end}",
"success": success,
"elapsed": elapsed,
"output": output,
}
# ── 主流程 ────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="DWS 历史数据重算(按营业日口径)",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--start-date", required=True,
help="重算起始日期 (YYYY-MM-DD)",
)
parser.add_argument(
"--end-date", required=True,
help="重算结束日期 (YYYY-MM-DD)",
)
parser.add_argument(
"--dry-run", action="store_true",
help="预览模式,不实际执行写入",
)
parser.add_argument(
"--fail-fast", action="store_true",
help="遇错即停(默认继续下一窗口)",
)
parser.add_argument(
"--tasks",
help="指定要重算的任务列表(逗号分隔),不指定则全部",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
# 解析日期
try:
start_date = date.fromisoformat(args.start_date)
end_date = date.fromisoformat(args.end_date)
except ValueError as e:
print(f"日期格式错误: {e}", file=sys.stderr)
return 1
if start_date >= end_date:
print("错误: --start-date 必须早于 --end-date", file=sys.stderr)
return 1
# 确定任务列表
if args.tasks:
task_codes = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
# 校验任务代码合法性
invalid = [t for t in task_codes if t not in ALL_DWS_TASKS]
if invalid:
print(f"未知任务代码: {', '.join(invalid)}", file=sys.stderr)
print(f"可用任务: {', '.join(ALL_DWS_TASKS)}", file=sys.stderr)
return 1
else:
task_codes = list(ALL_DWS_TASKS)
# 拆分月窗口
windows = split_by_month(start_date, end_date)
# 日志目录
log_root = os.environ.get("SYSTEM_LOG_ROOT")
if not log_root:
raise RuntimeError(
"环境变量 SYSTEM_LOG_ROOT 未定义。"
"请在根 .env 中配置,参考 .env.template"
)
log_dir = Path(log_root)
log_dir.mkdir(parents=True, exist_ok=True)
# 数据库 DSN用于行数统计
pg_dsn = os.environ.get("PG_DSN", "")
now = datetime.now(TZ)
mode_label = "[DRY-RUN] " if args.dry_run else ""
fail_fast_label = " [FAIL-FAST]" if args.fail_fast else ""
print(f"{'='*70}")
print(f"{mode_label}DWS 历史数据重算 — 营业日口径{fail_fast_label}")
print(f"时间范围: {start_date} ~ {end_date}")
print(f"月窗口数: {len(windows)}")
print(f"任务数量: {len(task_codes)}")
print(f"任务列表: {', '.join(task_codes)}")
print(f"开始时间: {now.isoformat()}")
print(f"{'='*70}\n")
# 日志文件
log_file = log_dir / f"{now.strftime('%Y%m%d_%H%M%S')}_rebuild_dws_biz_date.log"
log_lines: list[str] = [
f"{mode_label}DWS 历史数据重算 — 营业日口径{fail_fast_label}\n",
f"时间范围: {start_date} ~ {end_date}\n",
f"月窗口数: {len(windows)}\n",
f"任务列表: {', '.join(task_codes)}\n",
f"开始时间: {now.isoformat()}\n",
f"{'='*70}\n\n",
]
# ── 执行前行数统计 ────────────────────────────────────────
row_counts_before: dict[str, int | None] = {}
if pg_dsn and not args.dry_run:
print("统计执行前行数...")
for tc in task_codes:
cnt = query_row_count(tc, pg_dsn)
row_counts_before[tc] = cnt
if cnt is not None:
log_lines.append(f"[BEFORE] {tc}: {cnt:,}\n")
log_lines.append("\n")
# ── 逐任务逐窗口执行 ──────────────────────────────────────
results: list[dict] = []
total_steps = len(task_codes) * len(windows)
step = 0
aborted = False
for task_code in task_codes:
if aborted:
break
for w_start, w_end in windows:
step += 1
label = f"[{step}/{total_steps}] {task_code} | {w_start} ~ {w_end}"
print(f"{label} ...", end=" ", flush=True)
result = run_task_window(
task_code, w_start, w_end,
dry_run=args.dry_run,
)
results.append(result)
icon = "OK" if result["success"] else "FAIL"
print(f"{icon} ({result['elapsed']:.0f}s)")
log_lines.append(f"[{icon}] {label} ({result['elapsed']:.0f}s)\n")
if not result["success"]:
# 输出最后 30 行错误信息
tail = result["output"].splitlines()[-30:]
for line in tail:
print(f" | {line}")
log_lines.append(f" | {line}\n")
log_lines.append("\n")
if args.fail_fast:
print("\n--fail-fast 已启用,中止后续执行")
log_lines.append("\n--fail-fast 中止\n")
aborted = True
break
# ── 执行后行数统计 ────────────────────────────────────────
row_counts_after: dict[str, int | None] = {}
if pg_dsn and not args.dry_run:
print("\n统计执行后行数...")
log_lines.append("\n")
for tc in task_codes:
cnt = query_row_count(tc, pg_dsn)
row_counts_after[tc] = cnt
before = row_counts_before.get(tc)
if cnt is not None:
diff = ""
if before is not None:
delta = cnt - before
sign = "+" if delta >= 0 else ""
diff = f" (差异: {sign}{delta:,})"
log_lines.append(f"[AFTER] {tc}: {cnt:,}{diff}\n")
# ── 汇总 ──────────────────────────────────────────────────
ok_count = sum(1 for r in results if r["success"])
fail_count = sum(1 for r in results if not r["success"])
total_elapsed = sum(r["elapsed"] for r in results)
summary = (
f"\n{'='*70}\n"
f"重算完成: {ok_count}/{len(results)} 成功, {fail_count} 失败"
f" (总耗时 {total_elapsed:.0f}s)\n"
)
print(summary)
log_lines.append(summary)
# 写入日志
with open(log_file, "w", encoding="utf-8") as f:
f.writelines(log_lines)
print(f"日志: {log_file}")
return 0 if fail_count == 0 else 1
if __name__ == "__main__":
sys.exit(main())