Files
2026-03-15 10:15:02 +08:00

360 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 技术设计文档P4 前置依赖修复
## 概述
本设计覆盖 6 个定点修复,修正 P4 核心业务层与 Spec 的实现偏差,为 P6 前端任务模块扫清障碍。
修复范围:
- T1任务列表返回已放弃任务GAP-3**已实现,需验证**
- T2召回完成检测器过滤任务类型GAP-6**已实现,需验证**
- T3备注回溯重分类器冲突处理GAP-7— 需修改 `note_reclassifier.py`
- T4回访完成条件改为「有备注即完成」— 需修改 `note_service.py` + `note_reclassifier.py`
- T5trigger_scheduler last_run_at 事务安全GAP-9— 需修改 `trigger_scheduler.py`
- T6cron 默认值改为 07:00 — 需修改 `trigger_scheduler.py` 默认值
**关键发现**:代码审查显示 T1 和 T2 已在之前的修复中完成T6 的种子数据也已是 `0 7 * * *`,仅 `_calculate_next_run()` 的默认值仍为 `"0 4 * * *"`
## 架构
本次修复不引入新组件,仅修改现有服务层内部逻辑。涉及的调用链:
```mermaid
sequenceDiagram
participant ETL as ETL Pipeline
participant TS as TriggerScheduler
participant RD as RecallDetector
participant NR as NoteReclassifier
participant NS as NoteService
participant DB as PostgreSQL (biz schema)
ETL->>TS: fire_event("etl_data_updated")
TS->>RD: run(payload)
RD->>DB: 查询 active 召回任务 (T2: 仅 recall 类型)
RD->>DB: 标记 completed
RD->>TS: fire_event("recall_completed")
TS->>NR: run(payload)
NR->>DB: 查找 normal 备注 → 重分类为 follow_up
NR->>DB: 冲突检查 (T3: 查询已有 follow_up_visit)
NR->>DB: 创建/跳过/顶替回访任务 (T4: 有备注→completed)
Note-->>NS: 助教提交备注
NS->>DB: 创建备注
NS->>DB: 查询 active follow_up_visit 任务 (T4)
NS->>DB: 标记 completed不依赖 AI 评分)
```
事务边界变更T5
```mermaid
graph LR
subgraph "修复前:两个独立事务"
A[handler 事务] --> B[last_run_at 事务]
end
subgraph "修复后:合并为单一事务"
C[handler + last_run_at 同一事务]
end
```
## 组件与接口
本次修复不新增接口,仅修改现有服务层内部方法。以下按修复点列出变更:
### T1task_manager.get_task_list()(已实现 ✅)
代码审查确认 `get_task_list()` 已包含:
- `WHERE status IN ('active', 'abandoned')`
- `ORDER BY CASE WHEN status = 'abandoned' THEN 1 ELSE 0 END ASC, is_pinned DESC, ...`
- SELECT 和返回结构包含 `abandon_reason` 字段
**本次无需代码变更,仅需验证测试覆盖。**
### T2recall_detector._process_service_record()(已实现 ✅)
代码审查确认 `_process_service_record()` 已包含:
- `AND task_type IN ('high_priority_recall', 'priority_recall')`
**本次无需代码变更,仅需验证测试覆盖。**
### T3note_reclassifier.run() — 冲突处理
**当前问题**`run()` 在步骤 4/5 直接 INSERT 回访任务,无冲突检查。当 AI 占位返回 None 时跳过任务创建,但修复 T4 后(有备注即完成)将不再依赖 AI 返回值。
**变更方案**
在创建 follow_up_visit 任务前,增加冲突检查逻辑:
```python
# 冲突检查:查询同 (site_id, assistant_id, member_id) 的 follow_up_visit 任务
cur.execute("""
SELECT id, status
FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND member_id = %s
AND task_type = 'follow_up_visit'
AND status IN ('active', 'completed')
ORDER BY CASE WHEN status = 'completed' THEN 0 ELSE 1 END
LIMIT 1
""", (site_id, assistant_id, member_id))
existing = cur.fetchone()
if existing:
existing_id, existing_status = existing
if existing_status == 'completed':
# 已完成 → 跳过创建
return
elif existing_status == 'active':
# 顶替:旧任务 → inactive创建新任务
cur.execute("""
UPDATE biz.coach_tasks
SET status = 'inactive', updated_at = NOW()
WHERE id = %s AND status = 'active'
""", (existing_id,))
_insert_history(cur, existing_id, action="superseded", ...)
```
**设计决策**
- 查询 `completed``active` 两种状态,优先检查 `completed`
- `inactive``abandoned` 状态的旧任务不阻止新建(语义上已失效)
- 顶替操作记录 `superseded` 历史,保留审计链
### T4回访完成条件 — note_service.create_note() + note_reclassifier.run()
**当前问题**
- `note_service.create_note()` 依赖 `ai_score >= 6` 判定回访完成,但 `ai_analyze_note()` 返回 None → 永远不触发完成
- `note_reclassifier.run()` 同样依赖 AI 返回值决定任务状态
**变更方案 — note_service.create_note()**
```python
# 修改后:有备注即完成,不依赖 AI 评分
if note_type == "follow_up" and task_id is not None:
# 保留 AI 占位调用P5 接入时调用链不变)
ai_score = ai_analyze_note(note["id"])
if ai_score is not None:
cur.execute("UPDATE biz.notes SET ai_score = %s ...", (ai_score, note["id"]))
note["ai_score"] = ai_score
# 不论 ai_score 如何,有备注即标记回访任务完成
if task_info and task_info["status"] == "active":
cur.execute("""
UPDATE biz.coach_tasks
SET status = 'completed', completed_at = NOW(),
completed_task_type = task_type, updated_at = NOW()
WHERE id = %s AND status = 'active'
""", (task_id,))
_record_history(cur, task_id, action="completed_by_note", ...)
```
**变更方案 — note_reclassifier.run()**
```python
# 修改后:不依赖 AI 返回值
# 保留 ai_analyze_note() 占位调用
ai_score = ai_analyze_note(note_id)
# 判定任务状态:有备注 → completed无备注 → active
# 此处 note_id 非 None 意味着已找到备注 → 直接 completed
task_status = "completed" # 回溯场景:已有备注 = 已完成
# 若未找到备注note_id is None创建 active 任务
# (此分支在上方 note_id is None 时已 return
```
**设计决策**
- `ai_analyze_note()` 调用保留,返回值仅用于更新 `ai_score` 字段,不影响完成判定
- P5 接入后AI 评分仍会写入 `notes.ai_score`,但不改变完成逻辑
- `note_reclassifier` 中:找到备注 = 回溯完成(`completed`),未找到备注 = 等待(`active`
### T5trigger_scheduler — last_run_at 事务安全
**当前问题**`fire_event()``check_scheduled_jobs()`handler 执行和 `last_run_at` 更新在不同事务中。handler 成功但 `last_run_at` commit 失败时,下次重跑会重复处理。
**变更方案 — 方案 Ahandler 内更新 last_run_at**
`last_run_at` 更新的 connection 传入 handler由 handler 在其事务内执行更新。但这要求修改所有 handler 签名,侵入性大。
**变更方案 — 方案 B传入 connhandler 结束后同事务更新)**
修改 `fire_event()``check_scheduled_jobs()`,将 `last_run_at` 更新纳入 handler 的事务范围:
```python
# fire_event() 修改后
def fire_event(event_name: str, payload: dict | None = None) -> int:
conn = _get_connection()
try:
# ... 查询 enabled event jobs ...
for job_id, job_type, job_name in rows:
handler = _JOB_REGISTRY.get(job_type)
if not handler:
continue
try:
handler(payload=payload)
# handler 成功后,在同一连接上更新 last_run_at
with conn.cursor() as cur:
cur.execute(
"UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE id = %s",
(job_id,),
)
conn.commit() # handler 数据变更 + last_run_at 一起提交
except Exception:
conn.rollback() # 一起回滚
finally:
conn.close()
```
**关键约束**handler`recall_detector.run()``note_reclassifier.run()`)各自管理独立连接和事务。`trigger_scheduler``last_run_at` 更新使用调度器自己的连接。这意味着 handler 事务和 `last_run_at` 事务仍然是独立的。
**实际可行方案**:由于 handler 使用独立连接,真正的"同一事务"需要 handler 接受外部 conn 参数。考虑到改动范围,采用折中方案:
1. handler 执行完毕后立即更新 `last_run_at`(当前已是如此)
2. 依赖 handler 的幂等性保证重复执行安全(`recall_detector` 只匹配 `status='active'`,已完成的不会重复处理)
3.`last_run_at` 更新从 handler 成功后的独立 commit 改为与查询 jobs 的同一连接上的事务
**设计决策**:采用用户确认的方案 A — 将 `last_run_at` 更新纳入 handler 同一事务。具体实现handler 接受可选的 `conn` 参数,在 handler 的最后一个事务中附带更新 `last_run_at`。对于 event 类型触发器(`recall_detector``note_reclassifier`),在 handler 的最终 commit 前插入 `last_run_at` 更新。
### T6cron 默认值 07:00
**当前问题**:种子数据已是 `"0 7 * * *"`,但 `_calculate_next_run()` 的 fallback 默认值仍为 `"0 4 * * *"`
**变更方案**
1. `_calculate_next_run()``trigger_config.get("cron_expression", "0 4 * * *")``"0 7 * * *"`
2. 新建迁移脚本 `db/zqyy_app/migrations/2026-03-15__p52_update_cron_0700.sql` 作为幂等更新(确保生产环境一致)
## 数据模型
本次修复不新增表或字段。涉及的现有表:
| 表 | 修改内容 | 修复点 |
|---|---|---|
| `biz.coach_tasks` | 查询条件变更(无 DDL 变更) | T1, T2, T3, T4 |
| `biz.coach_task_history` | 新增 `superseded` action 类型记录 | T3 |
| `biz.trigger_jobs` | `last_run_at` 更新时机变更cron_expression 幂等更新 | T5, T6 |
| `biz.notes` | 查询条件变更(无 DDL 变更) | T4 |
`coach_tasks.status` 状态转换新增路径:
- `active → inactive`T3 顶替场景,由 `note_reclassifier` 触发)
此路径在原 Spec 状态机中已定义("新回访顶替旧回访"),本次为首次实现。
## 正确性属性Correctness Properties
*属性是系统在所有合法执行路径上都应保持为真的特征或行为——本质上是对系统行为的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。*
### Property 1: 任务列表状态过滤
*For any* 任务集合(包含 active、abandoned、completed、inactive 四种状态的任务),`get_task_list` 的过滤逻辑应仅返回 status 为 `active``abandoned` 的任务,不包含 `completed``inactive` 状态的任务。
**Validates: Requirements 1.1**
### Property 2: 任务列表排序正确性
*For any* 包含 active 和 abandoned 任务的列表,排序后应满足:(1) 所有 abandoned 任务排在所有 active 任务之后;(2) active 任务内部按 `is_pinned DESC, priority_score DESC NULLS LAST, created_at ASC` 排序。
**Validates: Requirements 1.2**
### Property 3: abandon_reason 与 status 一致性不变量
*For any* 返回的任务记录,若 `status = 'active'``abandon_reason` 为 null`status = 'abandoned'``abandon_reason` 为非空字符串。
**Validates: Requirements 1.3**
### Property 4: 召回检测器仅完成 recall 类型任务
*For any* 服务记录和任意任务集合(包含四种 task_type`_process_service_record` 仅将 `task_type IN ('high_priority_recall', 'priority_recall')` 的 active 任务标记为 completed`follow_up_visit``relationship_building` 类型的任务状态不变。
**Validates: Requirements 2.1, 2.2, 2.3**
### Property 5: 冲突处理 — 已完成回访任务阻止新建
*For any* (site_id, assistant_id, member_id) 组合,若已存在 `status = 'completed'``follow_up_visit` 任务,则 `note_reclassifier` 不创建新的回访任务,且重复触发相同 payload 不产生唯一约束冲突。
**Validates: Requirements 3.1, 3.4**
### Property 6: 冲突处理 — active 回访任务被顶替
*For any* (site_id, assistant_id, member_id) 组合,若已存在 `status = 'active'``follow_up_visit` 任务,则 `note_reclassifier` 将旧任务标记为 `inactive` 并创建新的回访任务,旧任务的变更记录包含 `superseded` action。
**Validates: Requirements 3.2**
### Property 7: 无冲突时正常创建回访任务
*For any* (site_id, assistant_id, member_id) 组合,若不存在任何 `follow_up_visit` 任务(或仅存在 `inactive`/`abandoned` 状态),则 `note_reclassifier` 正常创建新的回访任务。
**Validates: Requirements 3.3**
### Property 8: 有备注即完成回访任务,不依赖 AI 评分
*For any* `ai_analyze_note()` 的返回值None、0-5、6-10当助教为关联 `follow_up_visit` 任务的客户提交备注时,该 active 回访任务都应被标记为 `completed`。AI 评分仅更新 `notes.ai_score` 字段,不影响任务完成判定。
**Validates: Requirements 4.2, 4.3**
### Property 9: 回溯有备注时创建 completed 回访任务
*For any* 召回完成事件,若 `note_reclassifier` 在 service_time 之后找到了 normal 备注,则创建的回访任务 `status = 'completed'`
**Validates: Requirements 4.4**
### Property 10: 回溯无备注时创建 active 回访任务
*For any* 召回完成事件,若 `note_reclassifier` 在 service_time 之后未找到 normal 备注,则创建的回访任务 `status = 'active'`(等待助教提交备注)。
**Validates: Requirements 4.5**
### Property 11: 触发器 last_run_at 事务一致性
*For any* 触发器执行event/cron/interval 类型handler 成功时 `last_run_at` 应被更新handler 抛出异常时 `last_run_at` 应保持不变(整个事务回滚)。
**Validates: Requirements 5.1, 5.2**
## 错误处理
| 场景 | 处理方式 | 修复点 |
|------|---------|--------|
| T3 冲突查询失败 | 捕获异常rollback记录日志返回 `tasks_created: 0` | T3 |
| T3 顶替 UPDATE 失败 | 捕获异常rollback不创建新任务 | T3 |
| T4 备注创建成功但任务完成 UPDATE 失败 | 整个事务 rollback备注也不创建返回 500 | T4 |
| T4 ai_analyze_note() 抛出异常 | 捕获异常记录日志继续执行任务完成逻辑AI 失败不阻塞业务) | T4 |
| T5 handler 成功但 commit 失败 | 整个事务回滚handler 数据变更 + last_run_at下次重跑依赖幂等性 | T5 |
| T5 handler 抛出异常 | rollbacklast_run_at 不更新,下次重跑 | T5 |
## 测试策略
### 属性测试Property-Based Testing
使用 **Hypothesis** 库(项目已有依赖),每个属性测试最少 100 次迭代。
每个测试用 comment 标注对应的设计属性:
```python
# Feature: p4-prerequisite-fixes, Property 1: 任务列表状态过滤
```
属性测试重点覆盖:
- Property 1-3任务列表过滤、排序、字段不变量纯函数可提取测试
- Property 4召回检测器类型过滤SQL 条件验证)
- Property 5-7冲突处理三分支状态机测试
- Property 8AI 评分不影响完成判定(参数化 ai_score 返回值)
- Property 9-10回溯场景任务状态有/无备注两分支)
- Property 11事务一致性mock handler 成功/失败)
### 单元测试
单元测试聚焦于:
- T1 边界:全部 active无 abandoned、全部 abandoned无 active、混合场景
- T2 边界:仅有 follow_up_visit 任务时返回 0 completed
- T3 边界:同时存在 completed 和 active 的 follow_up_visit优先检查 completed → 跳过)
- T4 边界task_id 为 None 时不触发完成逻辑;非 follow_up_visit 任务不触发
- T5 边界handler 抛出特定异常类型时的回滚行为
- T6 具体值:`_calculate_next_run("cron", {})` 默认使用 `"0 7 * * *"`;迁移脚本 SQL 正确性
### 测试配置
```python
from hypothesis import given, settings, strategies as st
@settings(max_examples=100)
@given(...)
def test_property_name(...):
# Feature: p4-prerequisite-fixes, Property N: property_text
...
```
测试文件位置:`tests/` 目录Monorepo 级属性测试,与现有 P4 属性测试同级)。