Files
Neo-ZQYY/scripts/ops/ai_backfill.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

337 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""历史数据回填脚本。
用法:
cd C:\\Project\\NeoZQYY
uv run python scripts/ops/ai_backfill.py [--dry-run] [--batch-size 10] [--interval 5]
功能:
1. 查询半年内活跃会员(消费/备注/任务变更三者并集)
2. 分批执行 AI 调用链(每批 10 人,间隔 5 秒)
3. 断点续跑(已完成 member_id 记录到本地文件)
4. App8 写 member_retention_clueDELETE + INSERT 事务)
需求: D1.1, D1.2, D1.3, D1.4, D1.5, D1.6, D1.7, D2.1, D2.3
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
# ── 加载根 .env ──────────────────────────────────────
_ROOT = Path(__file__).resolve().parent.parent.parent
load_dotenv(_ROOT / ".env")
# 必需环境变量校验
_APP_DB_DSN = os.environ.get("APP_DB_DSN")
if not _APP_DB_DSN:
sys.exit("ERROR: APP_DB_DSN 环境变量未设置,请检查根 .env 文件")
logger = logging.getLogger("ai_backfill")
# ── 常量 ─────────────────────────────────────────────
SITE_ID = 2790685415443269
CHECKPOINT_FILE = "scripts/ops/_ai_backfill_checkpoint.json"
BATCH_SIZE = 10
BATCH_INTERVAL = 5 # 秒
DATE_FROM = "2025-09-21"
DATE_TO = "2026-03-21"
# 预估每次调用消耗 token 数(用于 print_estimate
AVG_TOKENS_PER_CALL = 2000
# 每个会员平均调用 App 数App3+App8+App7 = 3加上可能的 App4+App5+App6 ≈ 5
AVG_APPS_PER_MEMBER = 5
# ── 活跃会员查询 SQL ─────────────────────────────────
# 半年内消费 备注 任务变更三者并集
ACTIVE_MEMBERS_SQL = """
SELECT DISTINCT member_id FROM (
-- 消费记录
SELECT member_id
FROM biz.settlement_orders
WHERE site_id = %(site_id)s
AND member_id > 0
AND settle_time >= %(date_from)s
AND settle_time < %(date_to)s
UNION
-- 备注记录
SELECT member_id
FROM biz.member_notes
WHERE site_id = %(site_id)s
AND member_id > 0
AND created_at >= %(date_from)s
AND created_at < %(date_to)s
UNION
-- 任务变更(助教任务分配)
SELECT member_id
FROM biz.assistant_tasks
WHERE site_id = %(site_id)s
AND member_id > 0
AND updated_at >= %(date_from)s
AND updated_at < %(date_to)s
) AS active
ORDER BY member_id
"""
class AIBackfillRunner:
"""历史数据回填执行器。
对半年内活跃会员分批执行 AI 调用链,支持断点续跑。
"""
def __init__(
self,
dry_run: bool = False,
batch_size: int = BATCH_SIZE,
interval: int = BATCH_INTERVAL,
):
self.dry_run = dry_run
self.batch_size = batch_size
self.interval = interval
self.completed: set[int] = set()
self._checkpoint_path = Path(CHECKPOINT_FILE)
# ── checkpoint 读写 ──────────────────────────────
def load_checkpoint(self) -> None:
"""从本地 JSON 文件加载已完成的 member_id 集合。
文件不存在或格式异常时completed 初始化为空集。
"""
if not self._checkpoint_path.exists():
self.completed = set()
return
try:
data = json.loads(self._checkpoint_path.read_text(encoding="utf-8"))
self.completed = set(data.get("completed_member_ids", []))
except (json.JSONDecodeError, TypeError, KeyError):
logger.warning("checkpoint 文件格式异常,从头开始")
self.completed = set()
def save_checkpoint(self) -> None:
"""保存已完成的 member_id 到本地 JSON 文件。"""
data = {
"completed_member_ids": sorted(self.completed),
"last_updated": datetime.now(timezone.utc).isoformat(),
}
self._checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
self._checkpoint_path.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# ── 数据库查询 ───────────────────────────────────
async def query_active_members(self) -> list[int]:
"""查询半年内活跃会员(消费 备注 任务变更三者并集)。
使用 APP_DB_DSN 连接业务库。
"""
import psycopg2
conn = psycopg2.connect(_APP_DB_DSN)
try:
with conn.cursor() as cur:
cur.execute(
ACTIVE_MEMBERS_SQL,
{
"site_id": SITE_ID,
"date_from": DATE_FROM,
"date_to": DATE_TO,
},
)
rows = cur.fetchall()
return [row[0] for row in rows]
finally:
conn.close()
# ── 单会员调用链 ─────────────────────────────────
async def run_member(self, member_id: int) -> None:
"""对单个会员执行调用链。
调用链:
1. App3维客线索→ App8线索整理→ App7客户分析
2. 如有助教关联:→ App4关系分析→ App5话术参考
3. 如有备注:→ App6备注分析→ App8再次整合
App8 写入 member_retention_clue 使用 DELETE + INSERT 事务包裹。
验证 App5 输出包含分类字段(覆盖 P2-1 修复)。
注意:实际 AI 调用依赖后端 AIDispatcher此处为占位实现。
正式使用时需要导入并调用 AIDispatcher 的相关方法。
"""
if self.dry_run:
logger.info("[DRY-RUN] 跳过会员 %d 的调用链执行", member_id)
return
# TODO: 接入后端 AIDispatcher 实现实际调用链
# 以下为占位逻辑,标明调用顺序和关键校验点
# 步骤 1: App3 → App8 → App7基础链路
# app3_result = await dispatcher.run_app("app3_clue", member_id, SITE_ID)
# app8_result = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID)
# _write_retention_clue(member_id, app8_result) # DELETE + INSERT 事务
# app7_result = await dispatcher.run_app("app7_customer_analysis", member_id, SITE_ID)
# 步骤 2: 如有助教关联 → App4 → App5
# if has_coach_relation(member_id):
# app4_result = await dispatcher.run_app("app4_analysis", member_id, SITE_ID)
# app5_result = await dispatcher.run_app("app5_tactics", member_id, SITE_ID)
# # P2-1 修复验证App5 输出必须包含分类字段
# if "category" not in app5_result:
# logger.warning("App5 输出缺少分类字段member_id=%d", member_id)
# 步骤 3: 如有备注 → App6 → App8再次整合
# if has_notes(member_id):
# app6_result = await dispatcher.run_app("app6_note_analysis", member_id, SITE_ID)
# app8_result2 = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID)
# _write_retention_clue(member_id, app8_result2) # DELETE + INSERT 事务
logger.info("会员 %d 调用链执行完成(占位)", member_id)
# ── 预估输出 ─────────────────────────────────────
def print_estimate(self, member_count: int) -> None:
"""输出预估信息:会员数、预估调用次数、预估 Token 消耗。"""
estimated_calls = member_count * AVG_APPS_PER_MEMBER
estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL
batch_count = (member_count + self.batch_size - 1) // self.batch_size
estimated_time_sec = batch_count * self.interval
print("=" * 60)
print("回填预估信息")
print("=" * 60)
print(f" 门店 ID: {SITE_ID}")
print(f" 日期范围: {DATE_FROM} ~ {DATE_TO}")
print(f" 活跃会员数: {member_count}")
print(f" 已完成: {len(self.completed)}")
print(f" 待处理: {member_count - len(self.completed)}")
print(f" 预估调用次数: {estimated_calls}")
print(f" 预估 Token: {estimated_tokens:,}")
print(f" 批次大小: {self.batch_size}")
print(f" 批间间隔: {self.interval}s")
print(f" 预估批次数: {batch_count}")
print(f" 预估耗时: ~{estimated_time_sec}s")
if self.dry_run:
print(" 模式: DRY-RUN不执行实际调用")
print("=" * 60)
# ── 主入口 ───────────────────────────────────────
async def run(self) -> None:
"""主入口:加载断点 → 查询会员 → 分批执行 → 保存断点。
错误处理:
- 单个会员失败:记录日志,跳过,继续下一个;不写入 checkpoint
- 数据库连接中断:保存当前 checkpoint退出
- 预算超限:保存 checkpoint输出提示退出
"""
self.load_checkpoint()
try:
members = await self.query_active_members()
except Exception as exc:
logger.error("查询活跃会员失败: %s", exc)
self.save_checkpoint()
return
# 过滤已完成的会员
pending = [m for m in members if m not in self.completed]
self.print_estimate(len(members))
if not pending:
logger.info("所有会员已完成,无需回填")
return
logger.info("开始回填:%d 个待处理会员", len(pending))
# 分批执行
for batch_idx in range(0, len(pending), self.batch_size):
batch = pending[batch_idx : batch_idx + self.batch_size]
batch_num = batch_idx // self.batch_size + 1
total_batches = (len(pending) + self.batch_size - 1) // self.batch_size
logger.info("批次 %d/%d:处理 %d 个会员", batch_num, total_batches, len(batch))
for member_id in batch:
try:
await self.run_member(member_id)
self.completed.add(member_id)
except ConnectionError as exc:
# 数据库连接中断:保存 checkpoint 并退出
logger.error("数据库连接中断 (member_id=%d): %s", member_id, exc)
self.save_checkpoint()
sys.exit(1)
except Exception as exc:
# 单个会员失败:记录日志,跳过,不写入 checkpoint
logger.error("会员 %d 调用链失败: %s", member_id, exc)
continue
# 每批完成后保存 checkpoint
self.save_checkpoint()
# 批间间隔(最后一批不等待)
if batch_idx + self.batch_size < len(pending):
logger.info("等待 %ds 后继续下一批...", self.interval)
await asyncio.sleep(self.interval)
logger.info("回填完成:共处理 %d 个会员", len(self.completed))
# ── CLI 入口 ─────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="AI 历史数据回填脚本(半年内活跃会员)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="试运行模式,不执行实际 AI 调用",
)
parser.add_argument(
"--batch-size",
type=int,
default=BATCH_SIZE,
help=f"每批处理会员数(默认 {BATCH_SIZE}",
)
parser.add_argument(
"--interval",
type=int,
default=BATCH_INTERVAL,
help=f"批间间隔秒数(默认 {BATCH_INTERVAL}",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
runner = AIBackfillRunner(
dry_run=args.dry_run,
batch_size=args.batch_size,
interval=args.interval,
)
asyncio.run(runner.run())
if __name__ == "__main__":
main()