Files
Neo-ZQYY/scripts/ops/backfill_index_snapshots.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

211 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
回填脚本:为 3 张指数表生成历史日快照
按日期升序逐天执行 RelationIndexTask / WinbackIndexTask / NewconvIndexTask
写入 stat_date 列,支持多日快照共存。
用法:
cd C:\\Project\\NeoZQYY
uv run python scripts/ops/backfill_index_snapshots.py \\
--start 2025-08-01 --end 2026-04-11
# 仅回填 relation_index
uv run python scripts/ops/backfill_index_snapshots.py \\
--start 2025-08-01 --end 2026-04-11 --tasks RELATION
# 干跑模式(不写数据库)
uv run python scripts/ops/backfill_index_snapshots.py \\
--start 2025-08-01 --end 2025-08-03 --dry-run
"""
from __future__ import annotations
import argparse
import logging
import sys
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
# ── 环境加载 ──
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _env_paths import ensure_repo_root
ensure_repo_root()
# 切换到 ETL 目录以便导入任务模块
import os
_ETL_ROOT = Path(__file__).resolve().parents[2] / "apps" / "etl" / "connectors" / "feiqiu"
sys.path.insert(0, str(_ETL_ROOT))
os.chdir(_ETL_ROOT)
from dotenv import load_dotenv
_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(_ROOT / ".env", override=False)
load_dotenv(_ETL_ROOT / ".env", override=False)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("backfill_index")
# ── 导入 ETL 组件 ──
from config.settings import AppConfig
from database.connection import DatabaseConnection
from tasks.base_task import TaskContext
from tasks.dws.index.relation_index_task import RelationIndexTask
from tasks.dws.index.winback_index_task import WinbackIndexTask
from tasks.dws.index.newconv_index_task import NewconvIndexTask
TZ = ZoneInfo("Asia/Shanghai")
# 任务代码映射
TASK_MAP = {
"RELATION": RelationIndexTask,
"WINBACK": WinbackIndexTask,
"NEWCONV": NewconvIndexTask,
}
class DummyAPI:
"""指数任务不需要 API提供空壳。"""
pass
def build_context(d: date, store_id: int) -> TaskContext:
"""为指定日期构建 TaskContext。"""
as_of = datetime(d.year, d.month, d.day, 23, 59, 0, tzinfo=TZ)
window_end = as_of
window_start = as_of - timedelta(days=90)
return TaskContext(
store_id=store_id,
window_start=window_start,
window_end=window_end,
window_minutes=int((window_end - window_start).total_seconds() / 60),
as_of_date=as_of,
)
def run_backfill(
start_date: date,
end_date: date,
task_keys: list[str],
dry_run: bool = False,
):
"""执行回填主流程。"""
# 加载配置
config = AppConfig.load()
# 创建数据库连接
db_conn = DatabaseConnection(
dsn=config.config["db"]["dsn"],
session=config.config["db"].get("session"),
connect_timeout=config.config["db"].get("connect_timeout_sec"),
)
store_id = config.config["app"]["store_id"]
total_days = (end_date - start_date).days + 1
total_tasks = total_days * len(task_keys)
logger.info(
"回填参数: %s ~ %s (%d天), 任务=%s, store_id=%s, dry_run=%s",
start_date, end_date, total_days, task_keys, store_id, dry_run,
)
if dry_run:
logger.info("[DRY RUN] 仅计算不写入数据库")
completed = 0
errors = 0
t0 = time.time()
current = start_date
while current <= end_date:
ctx = build_context(current, store_id)
day_t0 = time.time()
for key in task_keys:
task_cls = TASK_MAP[key]
task = task_cls(config, db_conn, DummyAPI(), logger)
if dry_run:
logger.info("[DRY RUN] 跳过 %s %s", key, current)
completed += 1
continue
try:
result = task.execute(ctx)
records = result.get("records_inserted", result.get("member_count", "?"))
logger.info(
" %s %s%s 条记录",
key, current, records,
)
completed += 1
except Exception:
logger.exception(" %s %s 失败", key, current)
errors += 1
# 尝试恢复连接
try:
db_conn.close()
db_conn = DatabaseConnection(
dsn=config.config["db"]["dsn"],
session=config.config["db"].get("session"),
connect_timeout=config.config["db"].get("connect_timeout_sec"),
)
# 更新任务引用
except Exception:
logger.exception("数据库重连失败")
elapsed = time.time() - day_t0
progress = completed / total_tasks * 100 if total_tasks else 0
logger.info(
"%s 完成 (%.1fs) [%d/%d %.0f%%]",
current, elapsed, completed, total_tasks, progress,
)
current += timedelta(days=1)
total_elapsed = time.time() - t0
logger.info(
"回填完成: %d/%d 成功, %d 失败, 耗时 %.1f 秒 (%.1f 分钟)",
completed, total_tasks, errors, total_elapsed, total_elapsed / 60,
)
db_conn.close()
return errors == 0
def main():
parser = argparse.ArgumentParser(description="回填指数表历史日快照")
parser.add_argument("--start", required=True, help="起始日期 (YYYY-MM-DD)")
parser.add_argument("--end", required=True, help="结束日期 (YYYY-MM-DD)")
parser.add_argument(
"--tasks",
default="RELATION,WINBACK,NEWCONV",
help="任务列表,逗号分隔 (RELATION/WINBACK/NEWCONV)",
)
parser.add_argument("--dry-run", action="store_true", help="干跑模式,不写数据库")
args = parser.parse_args()
start_date = date.fromisoformat(args.start)
end_date = date.fromisoformat(args.end)
task_keys = [k.strip().upper() for k in args.tasks.split(",")]
for k in task_keys:
if k not in TASK_MAP:
print(f"错误:未知任务 {k},可选: {list(TASK_MAP.keys())}", file=sys.stderr)
sys.exit(1)
if start_date > end_date:
print("错误start 必须 <= end", file=sys.stderr)
sys.exit(1)
success = run_backfill(start_date, end_date, task_keys, dry_run=args.dry_run)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()