268 lines
11 KiB
Python
268 lines
11 KiB
Python
"""
|
||
迁移脚本执行与验证:P4 业务表(coach_tasks / coach_task_history / notes / trigger_jobs)
|
||
- 在 test_zqyy_app 中执行 DDL 建表脚本和种子数据脚本
|
||
- 验证幂等性:连续执行两次无错误
|
||
- 验证表结构、约束、索引正确
|
||
- 验证种子数据完整(4 条触发器配置)
|
||
Requirements: 11.1-11.5, 12.1
|
||
"""
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# 加载根 .env
|
||
root = Path(__file__).resolve().parents[2]
|
||
sys.path.insert(0, str(root))
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv(root / ".env")
|
||
|
||
dsn = os.environ.get("TEST_APP_DB_DSN") or os.environ.get("APP_DB_DSN")
|
||
if not dsn:
|
||
raise RuntimeError("TEST_APP_DB_DSN / APP_DB_DSN 未配置")
|
||
if "test_zqyy_app" not in dsn:
|
||
raise RuntimeError(f"DSN 不是测试库: {dsn}")
|
||
|
||
import psycopg2
|
||
|
||
# 迁移脚本路径
|
||
DDL_SCRIPT = root / "db" / "zqyy_app" / "migrations" / "2026-02-27__p4_create_biz_tables.sql"
|
||
SEED_SCRIPT = root / "db" / "zqyy_app" / "migrations" / "2026-02-27__p4_seed_trigger_jobs.sql"
|
||
|
||
PASS = "✅"
|
||
FAIL = "❌"
|
||
results: list[tuple[str, bool, str]] = []
|
||
|
||
|
||
def run_sql_file(conn, path: Path, label: str) -> None:
|
||
"""执行 SQL 文件(跳过注释行中的回滚语句)。"""
|
||
sql = path.read_text(encoding="utf-8")
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
conn.commit()
|
||
print(f" {PASS} {label} 执行成功")
|
||
|
||
|
||
def check(name: str, ok: bool, detail: str = "") -> None:
|
||
results.append((name, ok, detail))
|
||
icon = PASS if ok else FAIL
|
||
msg = f" {icon} {name}"
|
||
if detail:
|
||
msg += f" — {detail}"
|
||
print(msg)
|
||
|
||
|
||
def main() -> None:
|
||
conn = psycopg2.connect(dsn)
|
||
try:
|
||
# ==================================================================
|
||
# 第一阶段:执行迁移脚本(第 1 次)
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
print("阶段 1:首次执行迁移脚本")
|
||
print("=" * 60)
|
||
run_sql_file(conn, DDL_SCRIPT, "DDL 建表脚本(第 1 次)")
|
||
run_sql_file(conn, SEED_SCRIPT, "种子数据脚本(第 1 次)")
|
||
|
||
# ==================================================================
|
||
# 第二阶段:幂等性验证(第 2 次执行)
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
print("阶段 2:幂等性验证(第 2 次执行)")
|
||
print("=" * 60)
|
||
try:
|
||
run_sql_file(conn, DDL_SCRIPT, "DDL 建表脚本(第 2 次 — 幂等)")
|
||
check("DDL 幂等性", True)
|
||
except Exception as e:
|
||
check("DDL 幂等性", False, str(e))
|
||
conn.rollback()
|
||
|
||
try:
|
||
run_sql_file(conn, SEED_SCRIPT, "种子数据脚本(第 2 次 — 幂等)")
|
||
check("种子数据幂等性", True)
|
||
except Exception as e:
|
||
check("种子数据幂等性", False, str(e))
|
||
conn.rollback()
|
||
|
||
# ==================================================================
|
||
# 第三阶段:表结构验证
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
print("阶段 3:表结构验证")
|
||
print("=" * 60)
|
||
with conn.cursor() as cur:
|
||
# 3.1 验证 4 张表存在
|
||
expected_tables = ["coach_tasks", "coach_task_history", "notes", "trigger_jobs"]
|
||
cur.execute("""
|
||
SELECT table_name FROM information_schema.tables
|
||
WHERE table_schema = 'biz' AND table_name = ANY(%s)
|
||
ORDER BY table_name
|
||
""", (expected_tables,))
|
||
found_tables = [r[0] for r in cur.fetchall()]
|
||
check(
|
||
"4 张表存在",
|
||
set(expected_tables) == set(found_tables),
|
||
f"期望 {sorted(expected_tables)},实际 {sorted(found_tables)}",
|
||
)
|
||
|
||
# 3.2 验证 coach_tasks 关键字段
|
||
cur.execute("""
|
||
SELECT column_name, data_type, is_nullable, column_default
|
||
FROM information_schema.columns
|
||
WHERE table_schema = 'biz' AND table_name = 'coach_tasks'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
ct_cols = {r[0]: r for r in cur.fetchall()}
|
||
required_cols = [
|
||
"id", "site_id", "assistant_id", "member_id", "task_type",
|
||
"status", "priority_score", "expires_at", "is_pinned",
|
||
"abandon_reason", "completed_at", "completed_task_type",
|
||
"parent_task_id", "created_at", "updated_at",
|
||
]
|
||
missing = [c for c in required_cols if c not in ct_cols]
|
||
check("coach_tasks 字段完整", len(missing) == 0, f"缺失: {missing}" if missing else f"{len(required_cols)} 个字段")
|
||
|
||
# 3.3 验证 coach_tasks.status 默认值
|
||
status_col = ct_cols.get("status")
|
||
if status_col:
|
||
check(
|
||
"coach_tasks.status 默认 'active'",
|
||
status_col[3] is not None and "active" in str(status_col[3]),
|
||
f"default={status_col[3]}",
|
||
)
|
||
|
||
# 3.4 验证 notes CHECK 约束
|
||
cur.execute("""
|
||
SELECT conname, pg_get_constraintdef(oid)
|
||
FROM pg_constraint
|
||
WHERE conrelid = 'biz.notes'::regclass AND contype = 'c'
|
||
""")
|
||
check_constraints = cur.fetchall()
|
||
check(
|
||
"notes CHECK 约束(评分 1-5)",
|
||
len(check_constraints) >= 2,
|
||
f"找到 {len(check_constraints)} 个 CHECK 约束: {[c[0] for c in check_constraints]}",
|
||
)
|
||
|
||
# 3.5 验证外键
|
||
cur.execute("""
|
||
SELECT conname, conrelid::regclass, confrelid::regclass
|
||
FROM pg_constraint
|
||
WHERE contype = 'f'
|
||
AND (conrelid::regclass::text LIKE 'biz.%')
|
||
ORDER BY conname
|
||
""")
|
||
fks = cur.fetchall()
|
||
fk_names = [r[0] for r in fks]
|
||
check(
|
||
"外键约束存在",
|
||
len(fks) >= 3,
|
||
f"找到 {len(fks)} 个外键: {fk_names}",
|
||
)
|
||
|
||
# ==================================================================
|
||
# 第四阶段:索引验证
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
print("阶段 4:索引验证")
|
||
print("=" * 60)
|
||
with conn.cursor() as cur:
|
||
expected_indexes = [
|
||
"idx_coach_tasks_site_assistant_member_type",
|
||
"idx_coach_tasks_assistant_status",
|
||
"idx_notes_target",
|
||
]
|
||
cur.execute("""
|
||
SELECT indexname, indexdef FROM pg_indexes
|
||
WHERE schemaname = 'biz' AND indexname = ANY(%s)
|
||
""", (expected_indexes,))
|
||
found_idx = {r[0]: r[1] for r in cur.fetchall()}
|
||
for idx_name in expected_indexes:
|
||
check(
|
||
f"索引 {idx_name}",
|
||
idx_name in found_idx,
|
||
found_idx.get(idx_name, "未找到")[:80] if idx_name in found_idx else "未找到",
|
||
)
|
||
|
||
# 验证部分唯一索引包含 WHERE 子句
|
||
partial_idx = found_idx.get("idx_coach_tasks_site_assistant_member_type", "")
|
||
check(
|
||
"部分唯一索引含 WHERE status='active'",
|
||
"active" in partial_idx.lower() and "unique" in partial_idx.lower(),
|
||
partial_idx[:100] if partial_idx else "未找到",
|
||
)
|
||
|
||
# 验证 trigger_jobs.job_name UNIQUE 约束
|
||
cur.execute("""
|
||
SELECT indexname FROM pg_indexes
|
||
WHERE schemaname = 'biz' AND tablename = 'trigger_jobs'
|
||
AND indexdef ILIKE '%unique%'
|
||
""")
|
||
unique_idx = cur.fetchall()
|
||
check(
|
||
"trigger_jobs.job_name UNIQUE",
|
||
len(unique_idx) >= 1,
|
||
f"找到 {len(unique_idx)} 个唯一索引",
|
||
)
|
||
|
||
# ==================================================================
|
||
# 第五阶段:种子数据验证
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
print("阶段 5:种子数据验证")
|
||
print("=" * 60)
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT job_name, trigger_condition, trigger_config, status FROM biz.trigger_jobs ORDER BY job_name")
|
||
rows = cur.fetchall()
|
||
check("trigger_jobs 记录数 = 4", len(rows) == 4, f"实际 {len(rows)} 条")
|
||
|
||
expected_jobs = {
|
||
"note_reclassify_backfill": ("event", "recall_completed"),
|
||
"recall_completion_check": ("event", "etl_data_updated"),
|
||
"task_expiry_check": ("interval", None),
|
||
"task_generator": ("cron", None),
|
||
}
|
||
for job_name, trigger_condition, trigger_config, status in rows:
|
||
exp = expected_jobs.get(job_name)
|
||
if exp:
|
||
cond_ok = trigger_condition == exp[0]
|
||
if exp[0] == "event":
|
||
event_ok = trigger_config.get("event_name") == exp[1]
|
||
check(f" {job_name}", cond_ok and event_ok,
|
||
f"condition={trigger_condition}, event={trigger_config.get('event_name')}, status={status}")
|
||
elif exp[0] == "cron":
|
||
cron_ok = "0 4 * * *" in str(trigger_config.get("cron_expression", ""))
|
||
check(f" {job_name}", cond_ok and cron_ok,
|
||
f"condition={trigger_condition}, cron={trigger_config.get('cron_expression')}, status={status}")
|
||
elif exp[0] == "interval":
|
||
interval_ok = trigger_config.get("interval_seconds") == 3600
|
||
check(f" {job_name}", cond_ok and interval_ok,
|
||
f"condition={trigger_condition}, interval={trigger_config.get('interval_seconds')}s, status={status}")
|
||
else:
|
||
check(f" {job_name}", False, "未预期的 job")
|
||
|
||
# ==================================================================
|
||
# 汇总
|
||
# ==================================================================
|
||
print("\n" + "=" * 60)
|
||
total = len(results)
|
||
passed = sum(1 for _, ok, _ in results if ok)
|
||
failed = total - passed
|
||
print(f"验证完成:{passed}/{total} 通过,{failed} 失败")
|
||
if failed:
|
||
print("\n失败项:")
|
||
for name, ok, detail in results:
|
||
if not ok:
|
||
print(f" {FAIL} {name}: {detail}")
|
||
sys.exit(1)
|
||
else:
|
||
print(f"{PASS} 全部验证通过!")
|
||
print("=" * 60)
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|