主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
- 新增 GET /xcx/coaches/{id}/banner 轻量接口
- performance/records 加 coach_id 参数 + view_board_coach 权限分流
- coach/customer/performance/board/task 服务层重构
- fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
- task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
- recall_detector settle_type=3 双重限制 + 门店级 resolved
主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
- perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
- isScattered 散客标记端到端
- foodDetail/phoneFull/creator* 字段透传
主线 3: P19 指数回测框架 Phase 1+2
- 3 个指数表 stat_date 日快照模式
- 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
- task_engine 升级 HTTP 实时 + 推演回测双模式
主线 4: Core 维度层启用
- 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
- 修复 app 视图空查询问题
主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口
主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
- schema 基线与 DDL 快照同步
主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)
附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具
合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
337 lines
13 KiB
Python
337 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""历史数据回填脚本。
|
||
|
||
用法:
|
||
cd C:\\Project\\NeoZQYY
|
||
uv run python scripts/ops/ai_backfill.py [--dry-run] [--batch-size 10] [--interval 5]
|
||
|
||
功能:
|
||
1. 查询半年内活跃会员(消费/备注/任务变更三者并集)
|
||
2. 分批执行 AI 调用链(每批 10 人,间隔 5 秒)
|
||
3. 断点续跑(已完成 member_id 记录到本地文件)
|
||
4. App8 写 member_retention_clue(DELETE + INSERT 事务)
|
||
|
||
需求: D1.1, D1.2, D1.3, D1.4, D1.5, D1.6, D1.7, D2.1, D2.3
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from dotenv import load_dotenv
|
||
|
||
# ── 加载根 .env ──────────────────────────────────────
|
||
_ROOT = Path(__file__).resolve().parent.parent.parent
|
||
load_dotenv(_ROOT / ".env")
|
||
|
||
# 必需环境变量校验
|
||
_APP_DB_DSN = os.environ.get("APP_DB_DSN")
|
||
if not _APP_DB_DSN:
|
||
sys.exit("ERROR: APP_DB_DSN 环境变量未设置,请检查根 .env 文件")
|
||
|
||
logger = logging.getLogger("ai_backfill")
|
||
|
||
# ── 常量 ─────────────────────────────────────────────
|
||
SITE_ID = 2790685415443269
|
||
CHECKPOINT_FILE = "scripts/ops/_ai_backfill_checkpoint.json"
|
||
BATCH_SIZE = 10
|
||
BATCH_INTERVAL = 5 # 秒
|
||
DATE_FROM = "2025-09-21"
|
||
DATE_TO = "2026-03-21"
|
||
|
||
# 预估每次调用消耗 token 数(用于 print_estimate)
|
||
AVG_TOKENS_PER_CALL = 2000
|
||
# 每个会员平均调用 App 数(App3+App8+App7 = 3,加上可能的 App4+App5+App6 ≈ 5)
|
||
AVG_APPS_PER_MEMBER = 5
|
||
|
||
|
||
# ── 活跃会员查询 SQL ─────────────────────────────────
|
||
# 半年内消费 ∪ 备注 ∪ 任务变更三者并集
|
||
ACTIVE_MEMBERS_SQL = """
|
||
SELECT DISTINCT member_id FROM (
|
||
-- 消费记录
|
||
SELECT member_id
|
||
FROM biz.settlement_orders
|
||
WHERE site_id = %(site_id)s
|
||
AND member_id > 0
|
||
AND settle_time >= %(date_from)s
|
||
AND settle_time < %(date_to)s
|
||
|
||
UNION
|
||
|
||
-- 备注记录
|
||
SELECT member_id
|
||
FROM biz.member_notes
|
||
WHERE site_id = %(site_id)s
|
||
AND member_id > 0
|
||
AND created_at >= %(date_from)s
|
||
AND created_at < %(date_to)s
|
||
|
||
UNION
|
||
|
||
-- 任务变更(助教任务分配)
|
||
SELECT member_id
|
||
FROM biz.assistant_tasks
|
||
WHERE site_id = %(site_id)s
|
||
AND member_id > 0
|
||
AND updated_at >= %(date_from)s
|
||
AND updated_at < %(date_to)s
|
||
) AS active
|
||
ORDER BY member_id
|
||
"""
|
||
|
||
|
||
class AIBackfillRunner:
|
||
"""历史数据回填执行器。
|
||
|
||
对半年内活跃会员分批执行 AI 调用链,支持断点续跑。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
dry_run: bool = False,
|
||
batch_size: int = BATCH_SIZE,
|
||
interval: int = BATCH_INTERVAL,
|
||
):
|
||
self.dry_run = dry_run
|
||
self.batch_size = batch_size
|
||
self.interval = interval
|
||
self.completed: set[int] = set()
|
||
self._checkpoint_path = Path(CHECKPOINT_FILE)
|
||
|
||
# ── checkpoint 读写 ──────────────────────────────
|
||
|
||
def load_checkpoint(self) -> None:
|
||
"""从本地 JSON 文件加载已完成的 member_id 集合。
|
||
|
||
文件不存在或格式异常时,completed 初始化为空集。
|
||
"""
|
||
if not self._checkpoint_path.exists():
|
||
self.completed = set()
|
||
return
|
||
try:
|
||
data = json.loads(self._checkpoint_path.read_text(encoding="utf-8"))
|
||
self.completed = set(data.get("completed_member_ids", []))
|
||
except (json.JSONDecodeError, TypeError, KeyError):
|
||
logger.warning("checkpoint 文件格式异常,从头开始")
|
||
self.completed = set()
|
||
|
||
def save_checkpoint(self) -> None:
|
||
"""保存已完成的 member_id 到本地 JSON 文件。"""
|
||
data = {
|
||
"completed_member_ids": sorted(self.completed),
|
||
"last_updated": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
self._checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
|
||
self._checkpoint_path.write_text(
|
||
json.dumps(data, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
# ── 数据库查询 ───────────────────────────────────
|
||
|
||
async def query_active_members(self) -> list[int]:
|
||
"""查询半年内活跃会员(消费 ∪ 备注 ∪ 任务变更三者并集)。
|
||
|
||
使用 APP_DB_DSN 连接业务库。
|
||
"""
|
||
import psycopg2
|
||
|
||
conn = psycopg2.connect(_APP_DB_DSN)
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
ACTIVE_MEMBERS_SQL,
|
||
{
|
||
"site_id": SITE_ID,
|
||
"date_from": DATE_FROM,
|
||
"date_to": DATE_TO,
|
||
},
|
||
)
|
||
rows = cur.fetchall()
|
||
return [row[0] for row in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
# ── 单会员调用链 ─────────────────────────────────
|
||
|
||
async def run_member(self, member_id: int) -> None:
|
||
"""对单个会员执行调用链。
|
||
|
||
调用链:
|
||
1. App3(维客线索)→ App8(线索整理)→ App7(客户分析)
|
||
2. 如有助教关联:→ App4(关系分析)→ App5(话术参考)
|
||
3. 如有备注:→ App6(备注分析)→ App8(再次整合)
|
||
|
||
App8 写入 member_retention_clue 使用 DELETE + INSERT 事务包裹。
|
||
验证 App5 输出包含分类字段(覆盖 P2-1 修复)。
|
||
|
||
注意:实际 AI 调用依赖后端 AIDispatcher,此处为占位实现。
|
||
正式使用时需要导入并调用 AIDispatcher 的相关方法。
|
||
"""
|
||
if self.dry_run:
|
||
logger.info("[DRY-RUN] 跳过会员 %d 的调用链执行", member_id)
|
||
return
|
||
|
||
# TODO: 接入后端 AIDispatcher 实现实际调用链
|
||
# 以下为占位逻辑,标明调用顺序和关键校验点
|
||
|
||
# 步骤 1: App3 → App8 → App7(基础链路)
|
||
# app3_result = await dispatcher.run_app("app3_clue", member_id, SITE_ID)
|
||
# app8_result = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID)
|
||
# _write_retention_clue(member_id, app8_result) # DELETE + INSERT 事务
|
||
# app7_result = await dispatcher.run_app("app7_customer_analysis", member_id, SITE_ID)
|
||
|
||
# 步骤 2: 如有助教关联 → App4 → App5
|
||
# if has_coach_relation(member_id):
|
||
# app4_result = await dispatcher.run_app("app4_analysis", member_id, SITE_ID)
|
||
# app5_result = await dispatcher.run_app("app5_tactics", member_id, SITE_ID)
|
||
# # P2-1 修复验证:App5 输出必须包含分类字段
|
||
# if "category" not in app5_result:
|
||
# logger.warning("App5 输出缺少分类字段,member_id=%d", member_id)
|
||
|
||
# 步骤 3: 如有备注 → App6 → App8(再次整合)
|
||
# if has_notes(member_id):
|
||
# app6_result = await dispatcher.run_app("app6_note_analysis", member_id, SITE_ID)
|
||
# app8_result2 = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID)
|
||
# _write_retention_clue(member_id, app8_result2) # DELETE + INSERT 事务
|
||
|
||
logger.info("会员 %d 调用链执行完成(占位)", member_id)
|
||
|
||
# ── 预估输出 ─────────────────────────────────────
|
||
|
||
def print_estimate(self, member_count: int) -> None:
|
||
"""输出预估信息:会员数、预估调用次数、预估 Token 消耗。"""
|
||
estimated_calls = member_count * AVG_APPS_PER_MEMBER
|
||
estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL
|
||
batch_count = (member_count + self.batch_size - 1) // self.batch_size
|
||
estimated_time_sec = batch_count * self.interval
|
||
|
||
print("=" * 60)
|
||
print("回填预估信息")
|
||
print("=" * 60)
|
||
print(f" 门店 ID: {SITE_ID}")
|
||
print(f" 日期范围: {DATE_FROM} ~ {DATE_TO}")
|
||
print(f" 活跃会员数: {member_count}")
|
||
print(f" 已完成: {len(self.completed)}")
|
||
print(f" 待处理: {member_count - len(self.completed)}")
|
||
print(f" 预估调用次数: {estimated_calls}")
|
||
print(f" 预估 Token: {estimated_tokens:,}")
|
||
print(f" 批次大小: {self.batch_size}")
|
||
print(f" 批间间隔: {self.interval}s")
|
||
print(f" 预估批次数: {batch_count}")
|
||
print(f" 预估耗时: ~{estimated_time_sec}s")
|
||
if self.dry_run:
|
||
print(" 模式: DRY-RUN(不执行实际调用)")
|
||
print("=" * 60)
|
||
|
||
# ── 主入口 ───────────────────────────────────────
|
||
|
||
async def run(self) -> None:
|
||
"""主入口:加载断点 → 查询会员 → 分批执行 → 保存断点。
|
||
|
||
错误处理:
|
||
- 单个会员失败:记录日志,跳过,继续下一个;不写入 checkpoint
|
||
- 数据库连接中断:保存当前 checkpoint,退出
|
||
- 预算超限:保存 checkpoint,输出提示,退出
|
||
"""
|
||
self.load_checkpoint()
|
||
|
||
try:
|
||
members = await self.query_active_members()
|
||
except Exception as exc:
|
||
logger.error("查询活跃会员失败: %s", exc)
|
||
self.save_checkpoint()
|
||
return
|
||
|
||
# 过滤已完成的会员
|
||
pending = [m for m in members if m not in self.completed]
|
||
|
||
self.print_estimate(len(members))
|
||
|
||
if not pending:
|
||
logger.info("所有会员已完成,无需回填")
|
||
return
|
||
|
||
logger.info("开始回填:%d 个待处理会员", len(pending))
|
||
|
||
# 分批执行
|
||
for batch_idx in range(0, len(pending), self.batch_size):
|
||
batch = pending[batch_idx : batch_idx + self.batch_size]
|
||
batch_num = batch_idx // self.batch_size + 1
|
||
total_batches = (len(pending) + self.batch_size - 1) // self.batch_size
|
||
logger.info("批次 %d/%d:处理 %d 个会员", batch_num, total_batches, len(batch))
|
||
|
||
for member_id in batch:
|
||
try:
|
||
await self.run_member(member_id)
|
||
self.completed.add(member_id)
|
||
except ConnectionError as exc:
|
||
# 数据库连接中断:保存 checkpoint 并退出
|
||
logger.error("数据库连接中断 (member_id=%d): %s", member_id, exc)
|
||
self.save_checkpoint()
|
||
sys.exit(1)
|
||
except Exception as exc:
|
||
# 单个会员失败:记录日志,跳过,不写入 checkpoint
|
||
logger.error("会员 %d 调用链失败: %s", member_id, exc)
|
||
continue
|
||
|
||
# 每批完成后保存 checkpoint
|
||
self.save_checkpoint()
|
||
|
||
# 批间间隔(最后一批不等待)
|
||
if batch_idx + self.batch_size < len(pending):
|
||
logger.info("等待 %ds 后继续下一批...", self.interval)
|
||
await asyncio.sleep(self.interval)
|
||
|
||
logger.info("回填完成:共处理 %d 个会员", len(self.completed))
|
||
|
||
|
||
# ── CLI 入口 ─────────────────────────────────────────
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="AI 历史数据回填脚本(半年内活跃会员)",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="试运行模式,不执行实际 AI 调用",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-size",
|
||
type=int,
|
||
default=BATCH_SIZE,
|
||
help=f"每批处理会员数(默认 {BATCH_SIZE})",
|
||
)
|
||
parser.add_argument(
|
||
"--interval",
|
||
type=int,
|
||
default=BATCH_INTERVAL,
|
||
help=f"批间间隔秒数(默认 {BATCH_INTERVAL})",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
)
|
||
|
||
runner = AIBackfillRunner(
|
||
dry_run=args.dry_run,
|
||
batch_size=args.batch_size,
|
||
interval=args.interval,
|
||
)
|
||
asyncio.run(runner.run())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|