Files
Neo-ZQYY/scripts/ops/run_migration_spi_table.py

179 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
在测试库 test_etl_feiqiu 执行 SPI 建表迁移脚本。
迁移脚本db/etl_feiqiu/migrations/2026-02-23_create_dws_member_spending_power_index.sql
目标表dws.dws_member_spending_power_index
使用方式:
python scripts/ops/run_migration_spi_table.py
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
# 加载根 .env
_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(_ROOT / ".env", override=False)
DSN = os.getenv("TEST_DB_DSN")
if not DSN:
print("ERROR: TEST_DB_DSN 未配置,请在根 .env 中设置")
sys.exit(1)
MIGRATION_FILE = (
_ROOT / "db" / "etl_feiqiu" / "migrations"
/ "2026-02-23_create_dws_member_spending_power_index.sql"
)
def table_exists(conn) -> bool:
"""检查目标表是否已存在"""
cur = conn.cursor()
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'dws'
AND table_name = 'dws_member_spending_power_index'
""")
exists = cur.fetchone() is not None
cur.close()
return exists
def execute_migration(conn) -> bool:
"""执行迁移脚本,返回是否成功"""
sql = MIGRATION_FILE.read_text(encoding="utf-8")
# 提取主体 SQL去掉注释中的回滚部分
main_lines = []
in_rollback = False
for line in sql.split("\n"):
stripped = line.strip()
if stripped.startswith("-- ====") and "回滚" in stripped:
in_rollback = True
if not in_rollback:
main_lines.append(line)
main_sql = "\n".join(main_lines).strip()
if not main_sql:
print("⚠️ 迁移脚本为空,跳过")
return False
try:
cur = conn.cursor()
cur.execute(main_sql)
cur.close()
print("✅ 迁移脚本执行成功")
return True
except Exception as e:
print(f"❌ 迁移脚本执行失败: {e}")
return False
def verify(conn) -> bool:
"""验证建表结果"""
cur = conn.cursor()
checks = []
# 1. 表存在
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'dws'
AND table_name = 'dws_member_spending_power_index'
""")
checks.append(("表 dws.dws_member_spending_power_index 存在", cur.fetchone() is not None))
# 2. 关键字段完整
expected_cols = [
"spi_id", "site_id", "member_id",
"spend_30", "spend_90", "recharge_90",
"orders_30", "orders_90", "visit_days_30", "visit_days_90",
"avg_ticket_90", "active_weeks_90", "daily_spend_ewma_90",
"score_level_raw", "score_speed_raw", "score_stability_raw",
"score_level_display", "score_speed_display", "score_stability_display",
"raw_score", "display_score",
"calc_time", "created_at", "updated_at",
]
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dws'
AND table_name = 'dws_member_spending_power_index'
ORDER BY ordinal_position
""")
actual_cols = [r[0] for r in cur.fetchall()]
missing = [c for c in expected_cols if c not in actual_cols]
checks.append((f"字段完整({len(actual_cols)} 列)", len(missing) == 0))
if missing:
print(f" 缺失字段: {missing}")
# 3. 唯一索引 idx_spi_site_member 存在
cur.execute("""
SELECT 1 FROM pg_indexes
WHERE schemaname = 'dws'
AND tablename = 'dws_member_spending_power_index'
AND indexname = 'idx_spi_site_member'
""")
checks.append(("唯一索引 idx_spi_site_member 存在", cur.fetchone() is not None))
# 4. 查询索引 idx_spi_display_score 存在
cur.execute("""
SELECT 1 FROM pg_indexes
WHERE schemaname = 'dws'
AND tablename = 'dws_member_spending_power_index'
AND indexname = 'idx_spi_display_score'
""")
checks.append(("查询索引 idx_spi_display_score 存在", cur.fetchone() is not None))
cur.close()
print("\n" + "=" * 50)
print("建表验证结果")
print("=" * 50)
all_ok = True
for name, ok in checks:
status = "" if ok else ""
print(f" {status} {name}")
if not ok:
all_ok = False
return all_ok
def main():
dsn_display = DSN.split("@")[1] if "@" in DSN else DSN
print(f"连接测试库: {dsn_display}")
print(f"迁移脚本: {MIGRATION_FILE.name}\n")
if not MIGRATION_FILE.exists():
print(f"ERROR: 迁移脚本不存在: {MIGRATION_FILE}")
sys.exit(1)
conn = psycopg2.connect(DSN)
conn.autocommit = True # 建表 DDL 需要 autocommit
# 检查表是否已存在
if table_exists(conn):
print(" 表 dws.dws_member_spending_power_index 已存在,跳过建表")
else:
if not execute_migration(conn):
conn.close()
sys.exit(1)
# 验证
all_ok = verify(conn)
conn.close()
if all_ok:
print("\n✅ SPI 建表迁移完成,所有验证通过")
else:
print("\n⚠️ 部分验证未通过,请检查")
sys.exit(1)
if __name__ == "__main__":
main()