Files
Neo-ZQYY/scripts/ops/simulate_historical_tasks.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

525 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
历史任务推演脚本:基于日快照指数重放 task_generator + recall_detector 逻辑
前置条件:先运行 backfill_index_snapshots.py 生成历史指数快照。
用法:
cd C:\\Project\\NeoZQYY
uv run python scripts/ops/simulate_historical_tasks.py \\
--start 2025-08-01 --end 2026-03-28
# 干跑模式
uv run python scripts/ops/simulate_historical_tasks.py \\
--start 2025-08-01 --end 2026-03-28 --dry-run
# 清理之前的模拟数据后重跑
uv run python scripts/ops/simulate_historical_tasks.py \\
--start 2025-08-01 --end 2026-03-28 --clean
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import time
from datetime import date, datetime, timedelta
from decimal import Decimal
from pathlib import Path
from zoneinfo import ZoneInfo
# ── 环境加载 ──
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _env_paths import ensure_repo_root
ensure_repo_root()
import os
from dotenv import load_dotenv
_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(_ROOT / ".env", override=False)
# 导入 task_generator 的纯函数(在 backend 目录下)
_BACKEND = _ROOT / "apps" / "backend"
sys.path.insert(0, str(_BACKEND))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("simulate_tasks")
import psycopg2
import psycopg2.extras
TZ = ZoneInfo("Asia/Shanghai")
# 导入 task_generator 纯函数
from app.services.task_generator import (
IndexData,
determine_task_type,
should_replace_task,
TASK_TYPE_PRIORITY,
)
# 推演截止日期(现有 active 任务从 03-29 开始)
CUTOFF_DATE = date(2026, 3, 28)
# 回访任务过期时长
FOLLOW_UP_HOURS = 48
def get_connections():
"""获取 ETL 库和业务库连接。"""
pg_dsn = os.environ.get("PG_DSN")
app_dsn = os.environ.get("APP_DB_DSN")
if not pg_dsn or not app_dsn:
print("错误PG_DSN 和 APP_DB_DSN 必须在 .env 中配置", file=sys.stderr)
sys.exit(1)
etl_conn = psycopg2.connect(pg_dsn)
etl_conn.set_client_encoding("UTF8")
app_conn = psycopg2.connect(app_dsn)
app_conn.set_client_encoding("UTF8")
return etl_conn, app_conn
def load_index_snapshot(etl_conn, site_id: int, stat_date: date) -> dict:
"""加载指定日期的指数快照。
返回:
{
"relation": {(assistant_id, member_id): {rs, os_label, ...}},
"wbi": {member_id: Decimal},
"nci": {member_id: Decimal},
}
"""
result = {"relation": {}, "wbi": {}, "nci": {}}
with etl_conn.cursor() as cur:
# 关系指数
cur.execute(
"""SELECT assistant_id, member_id, rs_display, os_label, os_share, session_count
FROM dws.dws_member_assistant_relation_index
WHERE site_id = %s AND stat_date = %s""",
(site_id, stat_date),
)
for row in cur.fetchall():
result["relation"][(row[0], row[1])] = {
"rs": Decimal(str(row[2])),
"os_label": row[3],
"os_share": Decimal(str(row[4])),
"session_count": row[5],
}
# WBI
cur.execute(
"""SELECT member_id, display_score
FROM dws.dws_member_winback_index
WHERE site_id = %s AND stat_date = %s""",
(site_id, stat_date),
)
for row in cur.fetchall():
result["wbi"][row[0]] = Decimal(str(row[1])) if row[1] else Decimal(0)
# NCI
cur.execute(
"""SELECT member_id, display_score
FROM dws.dws_member_newconv_index
WHERE site_id = %s AND stat_date = %s""",
(site_id, stat_date),
)
for row in cur.fetchall():
result["nci"][row[0]] = Decimal(str(row[1])) if row[1] else Decimal(0)
etl_conn.commit()
return result
def load_settlements_for_day(etl_conn, site_id: int, d: date) -> dict:
"""加载指定日期的结算记录。
返回:{(assistant_id, member_id): latest_pay_time}
"""
settlements = {}
day_start = datetime(d.year, d.month, d.day, 0, 0, 0, tzinfo=TZ)
day_end = day_start + timedelta(days=1)
with etl_conn.cursor() as cur:
cur.execute(
"""
SELECT sl.site_assistant_id AS assistant_id,
sh.member_id,
MAX(sh.pay_time) AS latest_pay_time
FROM dwd.dwd_settlement_head sh
JOIN dwd.dwd_assistant_service_log sl
ON sl.order_settle_id = sh.order_settle_id
AND sl.is_delete = 0
WHERE sh.site_id = %s
AND sh.settle_type IN (1, 3)
AND sh.pay_time >= %s AND sh.pay_time < %s
GROUP BY sl.site_assistant_id, sh.member_id
""",
(site_id, day_start, day_end),
)
for row in cur.fetchall():
if row[0] and row[1]:
settlements[(row[0], row[1])] = row[2]
etl_conn.commit()
return settlements
def simulate_day(
app_conn,
etl_conn,
site_id: int,
d: date,
snapshot: dict,
active_tasks: dict,
stats: dict,
dry_run: bool = False,
):
"""模拟单天的任务生成和召回检测。
active_tasks: {(assistant_id, member_id): {"id": int, "task_type": str, "created_at": datetime, "expires_at": datetime|None}}
stats: 累计统计
"""
day_datetime = datetime(d.year, d.month, d.day, 7, 0, 0, tzinfo=TZ)
# ── 1. 过期检测(回访任务 expires_at < 当天) ──
expired_keys = []
for key, task in active_tasks.items():
if task.get("expires_at") and task["expires_at"] < day_datetime:
expired_keys.append(key)
for key in expired_keys:
task = active_tasks.pop(key)
stats["expired"] += 1
if not dry_run:
with app_conn.cursor() as cur:
cur.execute(
"UPDATE biz.coach_tasks SET status = 'expired', updated_at = %s WHERE id = %s",
(day_datetime, task["id"]),
)
_insert_history(cur, task["id"], "expired", "active", "expired",
task["task_type"], task["task_type"],
{"reason": "follow_up_expired", "simulated": True})
# ── 2. 任务生成(基于指数快照) ──
relation = snapshot["relation"]
wbi_map = snapshot["wbi"]
nci_map = snapshot["nci"]
# 收集 MAIN/COMANAGE 对
ownership_pairs = [
(aid, mid, info)
for (aid, mid), info in relation.items()
if info["os_label"] in ("MAIN", "COMANAGE") and info["session_count"] > 0
]
for aid, mid, info in ownership_pairs:
wbi = wbi_map.get(mid, Decimal(0))
nci = nci_map.get(mid, Decimal(0))
rs = info["rs"]
index_data = IndexData(
site_id=site_id,
assistant_id=aid,
member_id=mid,
wbi=wbi,
nci=nci,
rs=rs,
has_active_recall=False,
has_follow_up_note=False,
)
new_type = determine_task_type(index_data)
if not new_type:
continue
key = (aid, mid)
existing = active_tasks.get(key)
if existing:
if existing["task_type"] == new_type:
continue # 同类型跳过
if not should_replace_task(existing["task_type"], new_type):
continue
# 关闭旧任务
if not dry_run:
with app_conn.cursor() as cur:
cur.execute(
"UPDATE biz.coach_tasks SET status = 'inactive', updated_at = %s WHERE id = %s",
(day_datetime, existing["id"]),
)
_insert_history(cur, existing["id"], "type_change_close", "active", "inactive",
existing["task_type"], new_type,
{"reason": "replaced_by_simulation", "simulated": True})
# 计算 priority_score
priority_score = float(max(wbi, nci)) if new_type in ("high_priority_recall", "priority_recall") else float(rs)
if dry_run:
active_tasks[key] = {
"id": None,
"task_type": new_type,
"created_at": day_datetime,
"expires_at": None,
}
stats["created"] += 1
continue
with app_conn.cursor() as cur:
cur.execute(
"""INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status, priority_score, created_at, updated_at)
VALUES (%s, %s, %s, %s, 'active', %s, %s, %s)
RETURNING id""",
(site_id, aid, mid, new_type, priority_score, day_datetime, day_datetime),
)
task_id = cur.fetchone()[0]
_insert_history(cur, task_id, "created", None, "active", None, new_type,
{"reason": "simulation_generated", "simulated": True})
active_tasks[key] = {
"id": task_id,
"task_type": new_type,
"created_at": day_datetime,
"expires_at": None,
}
stats["created"] += 1
# ── 3. 召回检测(基于当天结算记录) ──
settlements = load_settlements_for_day(etl_conn, site_id, d)
for (aid, mid), pay_time in settlements.items():
key = (aid, mid)
task = active_tasks.get(key)
# 写 recall_event
if not dry_run:
with app_conn.cursor() as cur:
try:
cur.execute(
"""INSERT INTO biz.recall_events
(site_id, assistant_id, member_id, pay_time, task_id, task_type, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (site_id, assistant_id, member_id,
(date_trunc('day', pay_time AT TIME ZONE 'Asia/Shanghai')))
DO NOTHING RETURNING id""",
(site_id, aid, mid, pay_time,
task["id"] if task else None,
task["task_type"] if task else None,
day_datetime),
)
inserted = cur.fetchone()
if inserted:
stats["recall_events"] += 1
except Exception:
pass # 去重冲突或其他
if not task:
continue
# 完成召回任务
if task["task_type"] in ("high_priority_recall", "priority_recall"):
if pay_time > task["created_at"]:
stats["completed"] += 1
if not dry_run:
with app_conn.cursor() as cur:
cur.execute(
"""UPDATE biz.coach_tasks
SET status = 'completed', completed_at = %s, completed_task_type = %s,
completion_type = 'auto', updated_at = %s
WHERE id = %s AND status = 'active'""",
(pay_time, task["task_type"], day_datetime, task["id"]),
)
_insert_history(cur, task["id"], "completed", "active", "completed",
task["task_type"], task["task_type"],
{"service_time": str(pay_time), "simulated": True})
# 生成回访任务
expires_at = pay_time + timedelta(hours=FOLLOW_UP_HOURS)
if not dry_run:
with app_conn.cursor() as cur:
cur.execute(
"""INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status, expires_at, created_at, updated_at)
VALUES (%s, %s, %s, 'follow_up_visit', 'active', %s, %s, %s)
RETURNING id""",
(site_id, aid, mid, expires_at, day_datetime, day_datetime),
)
fu_id = cur.fetchone()[0]
_insert_history(cur, fu_id, "created", None, "active", None, "follow_up_visit",
{"reason": "recall_completed", "simulated": True})
active_tasks[key] = {
"id": fu_id,
"task_type": "follow_up_visit",
"created_at": day_datetime,
"expires_at": expires_at,
}
else:
active_tasks[key] = {
"id": None,
"task_type": "follow_up_visit",
"created_at": day_datetime,
"expires_at": expires_at,
}
stats["follow_up_created"] += 1
if not dry_run:
app_conn.commit()
def _insert_history(cur, task_id, action, old_status, new_status, old_task_type, new_task_type, detail=None):
"""在 coach_task_history 中记录变更。"""
if task_id is None:
return
cur.execute(
"""INSERT INTO biz.coach_task_history
(task_id, action, old_status, new_status, old_task_type, new_task_type, detail)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(task_id, action, old_status, new_status, old_task_type, new_task_type,
json.dumps(detail) if detail else None),
)
def get_site_id(etl_conn) -> int:
"""从 relation_index 获取 site_id。"""
with etl_conn.cursor() as cur:
cur.execute("SELECT DISTINCT site_id FROM dws.dws_member_assistant_relation_index LIMIT 1")
row = cur.fetchone()
if not row:
raise RuntimeError("relation_index 表为空,请先运行 backfill_index_snapshots.py")
return row[0]
etl_conn.commit()
def clean_simulated_data(app_conn, cutoff: date):
"""清理模拟产生的数据。"""
cutoff_dt = datetime(cutoff.year, cutoff.month, cutoff.day, 23, 59, 59, tzinfo=TZ)
with app_conn.cursor() as cur:
# 先删 history外键依赖
cur.execute(
"""DELETE FROM biz.coach_task_history
WHERE task_id IN (SELECT id FROM biz.coach_tasks WHERE created_at < %s)""",
(cutoff_dt,),
)
history_count = cur.rowcount
# 删 recall_events
cur.execute(
"DELETE FROM biz.recall_events WHERE created_at < %s",
(cutoff_dt,),
)
events_count = cur.rowcount
# 删 coach_tasks
cur.execute(
"DELETE FROM biz.coach_tasks WHERE created_at < %s",
(cutoff_dt,),
)
tasks_count = cur.rowcount
app_conn.commit()
logger.info(
"清理完成: %d 条 history, %d 条 recall_events, %d 条 coach_tasks",
history_count, events_count, tasks_count,
)
def run_simulation(start_date: date, end_date: date, dry_run: bool = False, clean: bool = False):
"""执行历史任务推演主流程。"""
if end_date > CUTOFF_DATE:
logger.warning("end_date %s 超过截止日期 %s,自动截断", end_date, CUTOFF_DATE)
end_date = CUTOFF_DATE
etl_conn, app_conn = get_connections()
site_id = get_site_id(etl_conn)
if clean:
logger.info("清理 %s 之前的模拟数据...", CUTOFF_DATE)
clean_simulated_data(app_conn, CUTOFF_DATE)
total_days = (end_date - start_date).days + 1
logger.info(
"推演参数: %s ~ %s (%d天), site_id=%s, dry_run=%s",
start_date, end_date, total_days, site_id, dry_run,
)
# 内存中维护 active 任务集
active_tasks: dict[tuple[int, int], dict] = {}
stats = {
"created": 0, "completed": 0, "expired": 0,
"follow_up_created": 0, "recall_events": 0, "skipped_no_snapshot": 0,
}
t0 = time.time()
current = start_date
while current <= end_date:
snapshot = load_index_snapshot(etl_conn, site_id, current)
if not snapshot["relation"] and not snapshot["wbi"] and not snapshot["nci"]:
stats["skipped_no_snapshot"] += 1
current += timedelta(days=1)
continue
simulate_day(app_conn, etl_conn, site_id, current, snapshot, active_tasks, stats, dry_run)
day_num = (current - start_date).days + 1
if day_num % 30 == 0 or current == end_date:
elapsed = time.time() - t0
logger.info(
"进度: %s (%d/%d) | 已创建=%d 已完成=%d 已过期=%d 回访=%d 事件=%d | %.0fs",
current, day_num, total_days,
stats["created"], stats["completed"], stats["expired"],
stats["follow_up_created"], stats["recall_events"], elapsed,
)
current += timedelta(days=1)
total_elapsed = time.time() - t0
logger.info("=" * 60)
logger.info("推演完成: %.1f 秒 (%.1f 分钟)", total_elapsed, total_elapsed / 60)
logger.info(" 任务创建: %d", stats["created"])
logger.info(" 任务完成: %d", stats["completed"])
logger.info(" 任务过期: %d", stats["expired"])
logger.info(" 回访生成: %d", stats["follow_up_created"])
logger.info(" 召回事件: %d", stats["recall_events"])
logger.info(" 跳过(无快照): %d", stats["skipped_no_snapshot"])
logger.info(" 推演结束时 active 任务数: %d", len(active_tasks))
# 统计类型分布
type_dist = {}
for task in active_tasks.values():
tt = task["task_type"]
type_dist[tt] = type_dist.get(tt, 0) + 1
for tt, cnt in sorted(type_dist.items()):
logger.info(" %s: %d", tt, cnt)
etl_conn.close()
app_conn.close()
def main():
parser = argparse.ArgumentParser(description="历史任务推演(基于指数日快照)")
parser.add_argument("--start", required=True, help="起始日期 (YYYY-MM-DD)")
parser.add_argument("--end", required=True, help="结束日期 (YYYY-MM-DD)")
parser.add_argument("--dry-run", action="store_true", help="干跑模式")
parser.add_argument("--clean", action="store_true", help="清理之前的模拟数据后重跑")
args = parser.parse_args()
start_date = date.fromisoformat(args.start)
end_date = date.fromisoformat(args.end)
if start_date > end_date:
print("错误start 必须 <= end", file=sys.stderr)
sys.exit(1)
run_simulation(start_date, end_date, dry_run=args.dry_run, clean=args.clean)
if __name__ == "__main__":
main()