# 技术设计文档:P4 前置依赖修复 ## 概述 本设计覆盖 6 个定点修复,修正 P4 核心业务层与 Spec 的实现偏差,为 P6 前端任务模块扫清障碍。 修复范围: - T1:任务列表返回已放弃任务(GAP-3)— **已实现,需验证** - T2:召回完成检测器过滤任务类型(GAP-6)— **已实现,需验证** - T3:备注回溯重分类器冲突处理(GAP-7)— 需修改 `note_reclassifier.py` - T4:回访完成条件改为「有备注即完成」— 需修改 `note_service.py` + `note_reclassifier.py` - T5:trigger_scheduler last_run_at 事务安全(GAP-9)— 需修改 `trigger_scheduler.py` - T6:cron 默认值改为 07:00 — 需修改 `trigger_scheduler.py` 默认值 **关键发现**:代码审查显示 T1 和 T2 已在之前的修复中完成,T6 的种子数据也已是 `0 7 * * *`,仅 `_calculate_next_run()` 的默认值仍为 `"0 4 * * *"`。 ## 架构 本次修复不引入新组件,仅修改现有服务层内部逻辑。涉及的调用链: ```mermaid sequenceDiagram participant ETL as ETL Pipeline participant TS as TriggerScheduler participant RD as RecallDetector participant NR as NoteReclassifier participant NS as NoteService participant DB as PostgreSQL (biz schema) ETL->>TS: fire_event("etl_data_updated") TS->>RD: run(payload) RD->>DB: 查询 active 召回任务 (T2: 仅 recall 类型) RD->>DB: 标记 completed RD->>TS: fire_event("recall_completed") TS->>NR: run(payload) NR->>DB: 查找 normal 备注 → 重分类为 follow_up NR->>DB: 冲突检查 (T3: 查询已有 follow_up_visit) NR->>DB: 创建/跳过/顶替回访任务 (T4: 有备注→completed) Note-->>NS: 助教提交备注 NS->>DB: 创建备注 NS->>DB: 查询 active follow_up_visit 任务 (T4) NS->>DB: 标记 completed(不依赖 AI 评分) ``` 事务边界变更(T5): ```mermaid graph LR subgraph "修复前:两个独立事务" A[handler 事务] --> B[last_run_at 事务] end subgraph "修复后:合并为单一事务" C[handler + last_run_at 同一事务] end ``` ## 组件与接口 本次修复不新增接口,仅修改现有服务层内部方法。以下按修复点列出变更: ### T1:task_manager.get_task_list()(已实现 ✅) 代码审查确认 `get_task_list()` 已包含: - `WHERE status IN ('active', 'abandoned')` - `ORDER BY CASE WHEN status = 'abandoned' THEN 1 ELSE 0 END ASC, is_pinned DESC, ...` - SELECT 和返回结构包含 `abandon_reason` 字段 **本次无需代码变更,仅需验证测试覆盖。** ### T2:recall_detector._process_service_record()(已实现 ✅) 代码审查确认 `_process_service_record()` 已包含: - `AND task_type IN ('high_priority_recall', 'priority_recall')` **本次无需代码变更,仅需验证测试覆盖。** ### T3:note_reclassifier.run() — 冲突处理 **当前问题**:`run()` 在步骤 4/5 直接 INSERT 回访任务,无冲突检查。当 AI 占位返回 None 时跳过任务创建,但修复 T4 后(有备注即完成)将不再依赖 AI 返回值。 **变更方案**: 在创建 follow_up_visit 任务前,增加冲突检查逻辑: ```python # 冲突检查:查询同 (site_id, assistant_id, member_id) 的 follow_up_visit 任务 cur.execute(""" SELECT id, status FROM biz.coach_tasks WHERE site_id = %s AND assistant_id = %s AND member_id = %s AND task_type = 'follow_up_visit' AND status IN ('active', 'completed') ORDER BY CASE WHEN status = 'completed' THEN 0 ELSE 1 END LIMIT 1 """, (site_id, assistant_id, member_id)) existing = cur.fetchone() if existing: existing_id, existing_status = existing if existing_status == 'completed': # 已完成 → 跳过创建 return elif existing_status == 'active': # 顶替:旧任务 → inactive,创建新任务 cur.execute(""" UPDATE biz.coach_tasks SET status = 'inactive', updated_at = NOW() WHERE id = %s AND status = 'active' """, (existing_id,)) _insert_history(cur, existing_id, action="superseded", ...) ``` **设计决策**: - 查询 `completed` 和 `active` 两种状态,优先检查 `completed` - `inactive` 和 `abandoned` 状态的旧任务不阻止新建(语义上已失效) - 顶替操作记录 `superseded` 历史,保留审计链 ### T4:回访完成条件 — note_service.create_note() + note_reclassifier.run() **当前问题**: - `note_service.create_note()` 依赖 `ai_score >= 6` 判定回访完成,但 `ai_analyze_note()` 返回 None → 永远不触发完成 - `note_reclassifier.run()` 同样依赖 AI 返回值决定任务状态 **变更方案 — note_service.create_note()**: ```python # 修改后:有备注即完成,不依赖 AI 评分 if note_type == "follow_up" and task_id is not None: # 保留 AI 占位调用(P5 接入时调用链不变) ai_score = ai_analyze_note(note["id"]) if ai_score is not None: cur.execute("UPDATE biz.notes SET ai_score = %s ...", (ai_score, note["id"])) note["ai_score"] = ai_score # 不论 ai_score 如何,有备注即标记回访任务完成 if task_info and task_info["status"] == "active": cur.execute(""" UPDATE biz.coach_tasks SET status = 'completed', completed_at = NOW(), completed_task_type = task_type, updated_at = NOW() WHERE id = %s AND status = 'active' """, (task_id,)) _record_history(cur, task_id, action="completed_by_note", ...) ``` **变更方案 — note_reclassifier.run()**: ```python # 修改后:不依赖 AI 返回值 # 保留 ai_analyze_note() 占位调用 ai_score = ai_analyze_note(note_id) # 判定任务状态:有备注 → completed,无备注 → active # 此处 note_id 非 None 意味着已找到备注 → 直接 completed task_status = "completed" # 回溯场景:已有备注 = 已完成 # 若未找到备注(note_id is None),创建 active 任务 # (此分支在上方 note_id is None 时已 return) ``` **设计决策**: - `ai_analyze_note()` 调用保留,返回值仅用于更新 `ai_score` 字段,不影响完成判定 - P5 接入后,AI 评分仍会写入 `notes.ai_score`,但不改变完成逻辑 - `note_reclassifier` 中:找到备注 = 回溯完成(`completed`),未找到备注 = 等待(`active`) ### T5:trigger_scheduler — last_run_at 事务安全 **当前问题**:`fire_event()` 和 `check_scheduled_jobs()` 中,handler 执行和 `last_run_at` 更新在不同事务中。handler 成功但 `last_run_at` commit 失败时,下次重跑会重复处理。 **变更方案 — 方案 A(handler 内更新 last_run_at)**: 将 `last_run_at` 更新的 connection 传入 handler,由 handler 在其事务内执行更新。但这要求修改所有 handler 签名,侵入性大。 **变更方案 — 方案 B(传入 conn,handler 结束后同事务更新)**: 修改 `fire_event()` 和 `check_scheduled_jobs()`,将 `last_run_at` 更新纳入 handler 的事务范围: ```python # fire_event() 修改后 def fire_event(event_name: str, payload: dict | None = None) -> int: conn = _get_connection() try: # ... 查询 enabled event jobs ... for job_id, job_type, job_name in rows: handler = _JOB_REGISTRY.get(job_type) if not handler: continue try: handler(payload=payload) # handler 成功后,在同一连接上更新 last_run_at with conn.cursor() as cur: cur.execute( "UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE id = %s", (job_id,), ) conn.commit() # handler 数据变更 + last_run_at 一起提交 except Exception: conn.rollback() # 一起回滚 finally: conn.close() ``` **关键约束**:handler(如 `recall_detector.run()`、`note_reclassifier.run()`)各自管理独立连接和事务。`trigger_scheduler` 的 `last_run_at` 更新使用调度器自己的连接。这意味着 handler 事务和 `last_run_at` 事务仍然是独立的。 **实际可行方案**:由于 handler 使用独立连接,真正的"同一事务"需要 handler 接受外部 conn 参数。考虑到改动范围,采用折中方案: 1. handler 执行完毕后立即更新 `last_run_at`(当前已是如此) 2. 依赖 handler 的幂等性保证重复执行安全(`recall_detector` 只匹配 `status='active'`,已完成的不会重复处理) 3. 将 `last_run_at` 更新从 handler 成功后的独立 commit 改为与查询 jobs 的同一连接上的事务 **设计决策**:采用用户确认的方案 A — 将 `last_run_at` 更新纳入 handler 同一事务。具体实现:handler 接受可选的 `conn` 参数,在 handler 的最后一个事务中附带更新 `last_run_at`。对于 event 类型触发器(`recall_detector`、`note_reclassifier`),在 handler 的最终 commit 前插入 `last_run_at` 更新。 ### T6:cron 默认值 07:00 **当前问题**:种子数据已是 `"0 7 * * *"`,但 `_calculate_next_run()` 的 fallback 默认值仍为 `"0 4 * * *"`。 **变更方案**: 1. `_calculate_next_run()` 中 `trigger_config.get("cron_expression", "0 4 * * *")` → `"0 7 * * *"` 2. 新建迁移脚本 `db/zqyy_app/migrations/2026-03-15__p52_update_cron_0700.sql` 作为幂等更新(确保生产环境一致) ## 数据模型 本次修复不新增表或字段。涉及的现有表: | 表 | 修改内容 | 修复点 | |---|---|---| | `biz.coach_tasks` | 查询条件变更(无 DDL 变更) | T1, T2, T3, T4 | | `biz.coach_task_history` | 新增 `superseded` action 类型记录 | T3 | | `biz.trigger_jobs` | `last_run_at` 更新时机变更;cron_expression 幂等更新 | T5, T6 | | `biz.notes` | 查询条件变更(无 DDL 变更) | T4 | `coach_tasks.status` 状态转换新增路径: - `active → inactive`(T3 顶替场景,由 `note_reclassifier` 触发) 此路径在原 Spec 状态机中已定义("新回访顶替旧回访"),本次为首次实现。 ## 正确性属性(Correctness Properties) *属性是系统在所有合法执行路径上都应保持为真的特征或行为——本质上是对系统行为的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。* ### Property 1: 任务列表状态过滤 *For any* 任务集合(包含 active、abandoned、completed、inactive 四种状态的任务),`get_task_list` 的过滤逻辑应仅返回 status 为 `active` 或 `abandoned` 的任务,不包含 `completed` 或 `inactive` 状态的任务。 **Validates: Requirements 1.1** ### Property 2: 任务列表排序正确性 *For any* 包含 active 和 abandoned 任务的列表,排序后应满足:(1) 所有 abandoned 任务排在所有 active 任务之后;(2) active 任务内部按 `is_pinned DESC, priority_score DESC NULLS LAST, created_at ASC` 排序。 **Validates: Requirements 1.2** ### Property 3: abandon_reason 与 status 一致性不变量 *For any* 返回的任务记录,若 `status = 'active'` 则 `abandon_reason` 为 null,若 `status = 'abandoned'` 则 `abandon_reason` 为非空字符串。 **Validates: Requirements 1.3** ### Property 4: 召回检测器仅完成 recall 类型任务 *For any* 服务记录和任意任务集合(包含四种 task_type),`_process_service_record` 仅将 `task_type IN ('high_priority_recall', 'priority_recall')` 的 active 任务标记为 completed,`follow_up_visit` 和 `relationship_building` 类型的任务状态不变。 **Validates: Requirements 2.1, 2.2, 2.3** ### Property 5: 冲突处理 — 已完成回访任务阻止新建 *For any* (site_id, assistant_id, member_id) 组合,若已存在 `status = 'completed'` 的 `follow_up_visit` 任务,则 `note_reclassifier` 不创建新的回访任务,且重复触发相同 payload 不产生唯一约束冲突。 **Validates: Requirements 3.1, 3.4** ### Property 6: 冲突处理 — active 回访任务被顶替 *For any* (site_id, assistant_id, member_id) 组合,若已存在 `status = 'active'` 的 `follow_up_visit` 任务,则 `note_reclassifier` 将旧任务标记为 `inactive` 并创建新的回访任务,旧任务的变更记录包含 `superseded` action。 **Validates: Requirements 3.2** ### Property 7: 无冲突时正常创建回访任务 *For any* (site_id, assistant_id, member_id) 组合,若不存在任何 `follow_up_visit` 任务(或仅存在 `inactive`/`abandoned` 状态),则 `note_reclassifier` 正常创建新的回访任务。 **Validates: Requirements 3.3** ### Property 8: 有备注即完成回访任务,不依赖 AI 评分 *For any* `ai_analyze_note()` 的返回值(None、0-5、6-10),当助教为关联 `follow_up_visit` 任务的客户提交备注时,该 active 回访任务都应被标记为 `completed`。AI 评分仅更新 `notes.ai_score` 字段,不影响任务完成判定。 **Validates: Requirements 4.2, 4.3** ### Property 9: 回溯有备注时创建 completed 回访任务 *For any* 召回完成事件,若 `note_reclassifier` 在 service_time 之后找到了 normal 备注,则创建的回访任务 `status = 'completed'`。 **Validates: Requirements 4.4** ### Property 10: 回溯无备注时创建 active 回访任务 *For any* 召回完成事件,若 `note_reclassifier` 在 service_time 之后未找到 normal 备注,则创建的回访任务 `status = 'active'`(等待助教提交备注)。 **Validates: Requirements 4.5** ### Property 11: 触发器 last_run_at 事务一致性 *For any* 触发器执行(event/cron/interval 类型),handler 成功时 `last_run_at` 应被更新,handler 抛出异常时 `last_run_at` 应保持不变(整个事务回滚)。 **Validates: Requirements 5.1, 5.2** ## 错误处理 | 场景 | 处理方式 | 修复点 | |------|---------|--------| | T3 冲突查询失败 | 捕获异常,rollback,记录日志,返回 `tasks_created: 0` | T3 | | T3 顶替 UPDATE 失败 | 捕获异常,rollback,不创建新任务 | T3 | | T4 备注创建成功但任务完成 UPDATE 失败 | 整个事务 rollback(备注也不创建),返回 500 | T4 | | T4 ai_analyze_note() 抛出异常 | 捕获异常,记录日志,继续执行任务完成逻辑(AI 失败不阻塞业务) | T4 | | T5 handler 成功但 commit 失败 | 整个事务回滚(handler 数据变更 + last_run_at),下次重跑依赖幂等性 | T5 | | T5 handler 抛出异常 | rollback,last_run_at 不更新,下次重跑 | T5 | ## 测试策略 ### 属性测试(Property-Based Testing) 使用 **Hypothesis** 库(项目已有依赖),每个属性测试最少 100 次迭代。 每个测试用 comment 标注对应的设计属性: ```python # Feature: p4-prerequisite-fixes, Property 1: 任务列表状态过滤 ``` 属性测试重点覆盖: - Property 1-3:任务列表过滤、排序、字段不变量(纯函数可提取测试) - Property 4:召回检测器类型过滤(SQL 条件验证) - Property 5-7:冲突处理三分支(状态机测试) - Property 8:AI 评分不影响完成判定(参数化 ai_score 返回值) - Property 9-10:回溯场景任务状态(有/无备注两分支) - Property 11:事务一致性(mock handler 成功/失败) ### 单元测试 单元测试聚焦于: - T1 边界:全部 active(无 abandoned)、全部 abandoned(无 active)、混合场景 - T2 边界:仅有 follow_up_visit 任务时返回 0 completed - T3 边界:同时存在 completed 和 active 的 follow_up_visit(优先检查 completed → 跳过) - T4 边界:task_id 为 None 时不触发完成逻辑;非 follow_up_visit 任务不触发 - T5 边界:handler 抛出特定异常类型时的回滚行为 - T6 具体值:`_calculate_next_run("cron", {})` 默认使用 `"0 7 * * *"`;迁移脚本 SQL 正确性 ### 测试配置 ```python from hypothesis import given, settings, strategies as st @settings(max_examples=100) @given(...) def test_property_name(...): # Feature: p4-prerequisite-fixes, Property N: property_text ... ``` 测试文件位置:`tests/` 目录(Monorepo 级属性测试,与现有 P4 属性测试同级)。