Files
Neo-ZQYY/scripts/ops/_migrate_p4_biz_tables.py

268 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
迁移脚本执行与验证P4 业务表coach_tasks / coach_task_history / notes / trigger_jobs
- 在 test_zqyy_app 中执行 DDL 建表脚本和种子数据脚本
- 验证幂等性:连续执行两次无错误
- 验证表结构、约束、索引正确
- 验证种子数据完整4 条触发器配置)
Requirements: 11.1-11.5, 12.1
"""
import os
import sys
from pathlib import Path
# 加载根 .env
root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(root))
from dotenv import load_dotenv
load_dotenv(root / ".env")
dsn = os.environ.get("TEST_APP_DB_DSN") or os.environ.get("APP_DB_DSN")
if not dsn:
raise RuntimeError("TEST_APP_DB_DSN / APP_DB_DSN 未配置")
if "test_zqyy_app" not in dsn:
raise RuntimeError(f"DSN 不是测试库: {dsn}")
import psycopg2
# 迁移脚本路径
DDL_SCRIPT = root / "db" / "zqyy_app" / "migrations" / "2026-02-27__p4_create_biz_tables.sql"
SEED_SCRIPT = root / "db" / "zqyy_app" / "migrations" / "2026-02-27__p4_seed_trigger_jobs.sql"
PASS = ""
FAIL = ""
results: list[tuple[str, bool, str]] = []
def run_sql_file(conn, path: Path, label: str) -> None:
"""执行 SQL 文件(跳过注释行中的回滚语句)。"""
sql = path.read_text(encoding="utf-8")
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
print(f" {PASS} {label} 执行成功")
def check(name: str, ok: bool, detail: str = "") -> None:
results.append((name, ok, detail))
icon = PASS if ok else FAIL
msg = f" {icon} {name}"
if detail:
msg += f"{detail}"
print(msg)
def main() -> None:
conn = psycopg2.connect(dsn)
try:
# ==================================================================
# 第一阶段:执行迁移脚本(第 1 次)
# ==================================================================
print("\n" + "=" * 60)
print("阶段 1首次执行迁移脚本")
print("=" * 60)
run_sql_file(conn, DDL_SCRIPT, "DDL 建表脚本(第 1 次)")
run_sql_file(conn, SEED_SCRIPT, "种子数据脚本(第 1 次)")
# ==================================================================
# 第二阶段:幂等性验证(第 2 次执行)
# ==================================================================
print("\n" + "=" * 60)
print("阶段 2幂等性验证第 2 次执行)")
print("=" * 60)
try:
run_sql_file(conn, DDL_SCRIPT, "DDL 建表脚本(第 2 次 — 幂等)")
check("DDL 幂等性", True)
except Exception as e:
check("DDL 幂等性", False, str(e))
conn.rollback()
try:
run_sql_file(conn, SEED_SCRIPT, "种子数据脚本(第 2 次 — 幂等)")
check("种子数据幂等性", True)
except Exception as e:
check("种子数据幂等性", False, str(e))
conn.rollback()
# ==================================================================
# 第三阶段:表结构验证
# ==================================================================
print("\n" + "=" * 60)
print("阶段 3表结构验证")
print("=" * 60)
with conn.cursor() as cur:
# 3.1 验证 4 张表存在
expected_tables = ["coach_tasks", "coach_task_history", "notes", "trigger_jobs"]
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'biz' AND table_name = ANY(%s)
ORDER BY table_name
""", (expected_tables,))
found_tables = [r[0] for r in cur.fetchall()]
check(
"4 张表存在",
set(expected_tables) == set(found_tables),
f"期望 {sorted(expected_tables)},实际 {sorted(found_tables)}",
)
# 3.2 验证 coach_tasks 关键字段
cur.execute("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'biz' AND table_name = 'coach_tasks'
ORDER BY ordinal_position
""")
ct_cols = {r[0]: r for r in cur.fetchall()}
required_cols = [
"id", "site_id", "assistant_id", "member_id", "task_type",
"status", "priority_score", "expires_at", "is_pinned",
"abandon_reason", "completed_at", "completed_task_type",
"parent_task_id", "created_at", "updated_at",
]
missing = [c for c in required_cols if c not in ct_cols]
check("coach_tasks 字段完整", len(missing) == 0, f"缺失: {missing}" if missing else f"{len(required_cols)} 个字段")
# 3.3 验证 coach_tasks.status 默认值
status_col = ct_cols.get("status")
if status_col:
check(
"coach_tasks.status 默认 'active'",
status_col[3] is not None and "active" in str(status_col[3]),
f"default={status_col[3]}",
)
# 3.4 验证 notes CHECK 约束
cur.execute("""
SELECT conname, pg_get_constraintdef(oid)
FROM pg_constraint
WHERE conrelid = 'biz.notes'::regclass AND contype = 'c'
""")
check_constraints = cur.fetchall()
check(
"notes CHECK 约束(评分 1-5",
len(check_constraints) >= 2,
f"找到 {len(check_constraints)} 个 CHECK 约束: {[c[0] for c in check_constraints]}",
)
# 3.5 验证外键
cur.execute("""
SELECT conname, conrelid::regclass, confrelid::regclass
FROM pg_constraint
WHERE contype = 'f'
AND (conrelid::regclass::text LIKE 'biz.%')
ORDER BY conname
""")
fks = cur.fetchall()
fk_names = [r[0] for r in fks]
check(
"外键约束存在",
len(fks) >= 3,
f"找到 {len(fks)} 个外键: {fk_names}",
)
# ==================================================================
# 第四阶段:索引验证
# ==================================================================
print("\n" + "=" * 60)
print("阶段 4索引验证")
print("=" * 60)
with conn.cursor() as cur:
expected_indexes = [
"idx_coach_tasks_site_assistant_member_type",
"idx_coach_tasks_assistant_status",
"idx_notes_target",
]
cur.execute("""
SELECT indexname, indexdef FROM pg_indexes
WHERE schemaname = 'biz' AND indexname = ANY(%s)
""", (expected_indexes,))
found_idx = {r[0]: r[1] for r in cur.fetchall()}
for idx_name in expected_indexes:
check(
f"索引 {idx_name}",
idx_name in found_idx,
found_idx.get(idx_name, "未找到")[:80] if idx_name in found_idx else "未找到",
)
# 验证部分唯一索引包含 WHERE 子句
partial_idx = found_idx.get("idx_coach_tasks_site_assistant_member_type", "")
check(
"部分唯一索引含 WHERE status='active'",
"active" in partial_idx.lower() and "unique" in partial_idx.lower(),
partial_idx[:100] if partial_idx else "未找到",
)
# 验证 trigger_jobs.job_name UNIQUE 约束
cur.execute("""
SELECT indexname FROM pg_indexes
WHERE schemaname = 'biz' AND tablename = 'trigger_jobs'
AND indexdef ILIKE '%unique%'
""")
unique_idx = cur.fetchall()
check(
"trigger_jobs.job_name UNIQUE",
len(unique_idx) >= 1,
f"找到 {len(unique_idx)} 个唯一索引",
)
# ==================================================================
# 第五阶段:种子数据验证
# ==================================================================
print("\n" + "=" * 60)
print("阶段 5种子数据验证")
print("=" * 60)
with conn.cursor() as cur:
cur.execute("SELECT job_name, trigger_condition, trigger_config, status FROM biz.trigger_jobs ORDER BY job_name")
rows = cur.fetchall()
check("trigger_jobs 记录数 = 4", len(rows) == 4, f"实际 {len(rows)}")
expected_jobs = {
"note_reclassify_backfill": ("event", "recall_completed"),
"recall_completion_check": ("event", "etl_data_updated"),
"task_expiry_check": ("interval", None),
"task_generator": ("cron", None),
}
for job_name, trigger_condition, trigger_config, status in rows:
exp = expected_jobs.get(job_name)
if exp:
cond_ok = trigger_condition == exp[0]
if exp[0] == "event":
event_ok = trigger_config.get("event_name") == exp[1]
check(f" {job_name}", cond_ok and event_ok,
f"condition={trigger_condition}, event={trigger_config.get('event_name')}, status={status}")
elif exp[0] == "cron":
cron_ok = "0 4 * * *" in str(trigger_config.get("cron_expression", ""))
check(f" {job_name}", cond_ok and cron_ok,
f"condition={trigger_condition}, cron={trigger_config.get('cron_expression')}, status={status}")
elif exp[0] == "interval":
interval_ok = trigger_config.get("interval_seconds") == 3600
check(f" {job_name}", cond_ok and interval_ok,
f"condition={trigger_condition}, interval={trigger_config.get('interval_seconds')}s, status={status}")
else:
check(f" {job_name}", False, "未预期的 job")
# ==================================================================
# 汇总
# ==================================================================
print("\n" + "=" * 60)
total = len(results)
passed = sum(1 for _, ok, _ in results if ok)
failed = total - passed
print(f"验证完成:{passed}/{total} 通过,{failed} 失败")
if failed:
print("\n失败项:")
for name, ok, detail in results:
if not ok:
print(f" {FAIL} {name}: {detail}")
sys.exit(1)
else:
print(f"{PASS} 全部验证通过!")
print("=" * 60)
finally:
conn.close()
if __name__ == "__main__":
main()