# -*- coding: utf-8 -*- """历史数据回填脚本。 用法: cd C:\\Project\\NeoZQYY uv run python scripts/ops/ai_backfill.py [--dry-run] [--batch-size 10] [--interval 5] 功能: 1. 查询半年内活跃会员(消费/备注/任务变更三者并集) 2. 分批执行 AI 调用链(每批 10 人,间隔 5 秒) 3. 断点续跑(已完成 member_id 记录到本地文件) 4. App8 写 member_retention_clue(DELETE + INSERT 事务) 需求: D1.1, D1.2, D1.3, D1.4, D1.5, D1.6, D1.7, D2.1, D2.3 """ from __future__ import annotations import argparse import asyncio import json import logging import os import sys from datetime import datetime, timezone from pathlib import Path from dotenv import load_dotenv # ── 加载根 .env ────────────────────────────────────── _ROOT = Path(__file__).resolve().parent.parent.parent load_dotenv(_ROOT / ".env") # 必需环境变量校验 _APP_DB_DSN = os.environ.get("APP_DB_DSN") if not _APP_DB_DSN: sys.exit("ERROR: APP_DB_DSN 环境变量未设置,请检查根 .env 文件") logger = logging.getLogger("ai_backfill") # ── 常量 ───────────────────────────────────────────── SITE_ID = 2790685415443269 CHECKPOINT_FILE = "scripts/ops/_ai_backfill_checkpoint.json" BATCH_SIZE = 10 BATCH_INTERVAL = 5 # 秒 DATE_FROM = "2025-09-21" DATE_TO = "2026-03-21" # 预估每次调用消耗 token 数(用于 print_estimate) AVG_TOKENS_PER_CALL = 2000 # 每个会员平均调用 App 数(App3+App8+App7 = 3,加上可能的 App4+App5+App6 ≈ 5) AVG_APPS_PER_MEMBER = 5 # ── 活跃会员查询 SQL ───────────────────────────────── # 半年内消费 ∪ 备注 ∪ 任务变更三者并集 ACTIVE_MEMBERS_SQL = """ SELECT DISTINCT member_id FROM ( -- 消费记录 SELECT member_id FROM biz.settlement_orders WHERE site_id = %(site_id)s AND member_id > 0 AND settle_time >= %(date_from)s AND settle_time < %(date_to)s UNION -- 备注记录 SELECT member_id FROM biz.member_notes WHERE site_id = %(site_id)s AND member_id > 0 AND created_at >= %(date_from)s AND created_at < %(date_to)s UNION -- 任务变更(助教任务分配) SELECT member_id FROM biz.assistant_tasks WHERE site_id = %(site_id)s AND member_id > 0 AND updated_at >= %(date_from)s AND updated_at < %(date_to)s ) AS active ORDER BY member_id """ class AIBackfillRunner: """历史数据回填执行器。 对半年内活跃会员分批执行 AI 调用链,支持断点续跑。 """ def __init__( self, dry_run: bool = False, batch_size: int = BATCH_SIZE, interval: int = BATCH_INTERVAL, ): self.dry_run = dry_run self.batch_size = batch_size self.interval = interval self.completed: set[int] = set() self._checkpoint_path = Path(CHECKPOINT_FILE) # ── checkpoint 读写 ────────────────────────────── def load_checkpoint(self) -> None: """从本地 JSON 文件加载已完成的 member_id 集合。 文件不存在或格式异常时,completed 初始化为空集。 """ if not self._checkpoint_path.exists(): self.completed = set() return try: data = json.loads(self._checkpoint_path.read_text(encoding="utf-8")) self.completed = set(data.get("completed_member_ids", [])) except (json.JSONDecodeError, TypeError, KeyError): logger.warning("checkpoint 文件格式异常,从头开始") self.completed = set() def save_checkpoint(self) -> None: """保存已完成的 member_id 到本地 JSON 文件。""" data = { "completed_member_ids": sorted(self.completed), "last_updated": datetime.now(timezone.utc).isoformat(), } self._checkpoint_path.parent.mkdir(parents=True, exist_ok=True) self._checkpoint_path.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8", ) # ── 数据库查询 ─────────────────────────────────── async def query_active_members(self) -> list[int]: """查询半年内活跃会员(消费 ∪ 备注 ∪ 任务变更三者并集)。 使用 APP_DB_DSN 连接业务库。 """ import psycopg2 conn = psycopg2.connect(_APP_DB_DSN) try: with conn.cursor() as cur: cur.execute( ACTIVE_MEMBERS_SQL, { "site_id": SITE_ID, "date_from": DATE_FROM, "date_to": DATE_TO, }, ) rows = cur.fetchall() return [row[0] for row in rows] finally: conn.close() # ── 单会员调用链 ───────────────────────────────── async def run_member(self, member_id: int) -> None: """对单个会员执行调用链。 调用链: 1. App3(维客线索)→ App8(线索整理)→ App7(客户分析) 2. 如有助教关联:→ App4(关系分析)→ App5(话术参考) 3. 如有备注:→ App6(备注分析)→ App8(再次整合) App8 写入 member_retention_clue 使用 DELETE + INSERT 事务包裹。 验证 App5 输出包含分类字段(覆盖 P2-1 修复)。 注意:实际 AI 调用依赖后端 AIDispatcher,此处为占位实现。 正式使用时需要导入并调用 AIDispatcher 的相关方法。 """ if self.dry_run: logger.info("[DRY-RUN] 跳过会员 %d 的调用链执行", member_id) return # TODO: 接入后端 AIDispatcher 实现实际调用链 # 以下为占位逻辑,标明调用顺序和关键校验点 # 步骤 1: App3 → App8 → App7(基础链路) # app3_result = await dispatcher.run_app("app3_clue", member_id, SITE_ID) # app8_result = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID) # _write_retention_clue(member_id, app8_result) # DELETE + INSERT 事务 # app7_result = await dispatcher.run_app("app7_customer_analysis", member_id, SITE_ID) # 步骤 2: 如有助教关联 → App4 → App5 # if has_coach_relation(member_id): # app4_result = await dispatcher.run_app("app4_analysis", member_id, SITE_ID) # app5_result = await dispatcher.run_app("app5_tactics", member_id, SITE_ID) # # P2-1 修复验证:App5 输出必须包含分类字段 # if "category" not in app5_result: # logger.warning("App5 输出缺少分类字段,member_id=%d", member_id) # 步骤 3: 如有备注 → App6 → App8(再次整合) # if has_notes(member_id): # app6_result = await dispatcher.run_app("app6_note_analysis", member_id, SITE_ID) # app8_result2 = await dispatcher.run_app("app8_clue_consolidated", member_id, SITE_ID) # _write_retention_clue(member_id, app8_result2) # DELETE + INSERT 事务 logger.info("会员 %d 调用链执行完成(占位)", member_id) # ── 预估输出 ───────────────────────────────────── def print_estimate(self, member_count: int) -> None: """输出预估信息:会员数、预估调用次数、预估 Token 消耗。""" estimated_calls = member_count * AVG_APPS_PER_MEMBER estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL batch_count = (member_count + self.batch_size - 1) // self.batch_size estimated_time_sec = batch_count * self.interval print("=" * 60) print("回填预估信息") print("=" * 60) print(f" 门店 ID: {SITE_ID}") print(f" 日期范围: {DATE_FROM} ~ {DATE_TO}") print(f" 活跃会员数: {member_count}") print(f" 已完成: {len(self.completed)}") print(f" 待处理: {member_count - len(self.completed)}") print(f" 预估调用次数: {estimated_calls}") print(f" 预估 Token: {estimated_tokens:,}") print(f" 批次大小: {self.batch_size}") print(f" 批间间隔: {self.interval}s") print(f" 预估批次数: {batch_count}") print(f" 预估耗时: ~{estimated_time_sec}s") if self.dry_run: print(" 模式: DRY-RUN(不执行实际调用)") print("=" * 60) # ── 主入口 ─────────────────────────────────────── async def run(self) -> None: """主入口:加载断点 → 查询会员 → 分批执行 → 保存断点。 错误处理: - 单个会员失败:记录日志,跳过,继续下一个;不写入 checkpoint - 数据库连接中断:保存当前 checkpoint,退出 - 预算超限:保存 checkpoint,输出提示,退出 """ self.load_checkpoint() try: members = await self.query_active_members() except Exception as exc: logger.error("查询活跃会员失败: %s", exc) self.save_checkpoint() return # 过滤已完成的会员 pending = [m for m in members if m not in self.completed] self.print_estimate(len(members)) if not pending: logger.info("所有会员已完成,无需回填") return logger.info("开始回填:%d 个待处理会员", len(pending)) # 分批执行 for batch_idx in range(0, len(pending), self.batch_size): batch = pending[batch_idx : batch_idx + self.batch_size] batch_num = batch_idx // self.batch_size + 1 total_batches = (len(pending) + self.batch_size - 1) // self.batch_size logger.info("批次 %d/%d:处理 %d 个会员", batch_num, total_batches, len(batch)) for member_id in batch: try: await self.run_member(member_id) self.completed.add(member_id) except ConnectionError as exc: # 数据库连接中断:保存 checkpoint 并退出 logger.error("数据库连接中断 (member_id=%d): %s", member_id, exc) self.save_checkpoint() sys.exit(1) except Exception as exc: # 单个会员失败:记录日志,跳过,不写入 checkpoint logger.error("会员 %d 调用链失败: %s", member_id, exc) continue # 每批完成后保存 checkpoint self.save_checkpoint() # 批间间隔(最后一批不等待) if batch_idx + self.batch_size < len(pending): logger.info("等待 %ds 后继续下一批...", self.interval) await asyncio.sleep(self.interval) logger.info("回填完成:共处理 %d 个会员", len(self.completed)) # ── CLI 入口 ───────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser( description="AI 历史数据回填脚本(半年内活跃会员)", ) parser.add_argument( "--dry-run", action="store_true", help="试运行模式,不执行实际 AI 调用", ) parser.add_argument( "--batch-size", type=int, default=BATCH_SIZE, help=f"每批处理会员数(默认 {BATCH_SIZE})", ) parser.add_argument( "--interval", type=int, default=BATCH_INTERVAL, help=f"批间间隔秒数(默认 {BATCH_INTERVAL})", ) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) runner = AIBackfillRunner( dry_run=args.dry_run, batch_size=args.batch_size, interval=args.interval, ) asyncio.run(runner.run()) if __name__ == "__main__": main()