Files
Neo-ZQYY/scripts/ops/run_migrations_2026_02_20.py

210 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
执行 2026-02-20 批次的所有迁移脚本到测试库TEST_DB_DSN
按文件名排序依次执行,每个脚本执行后运行内嵌验证 SQL。
所有脚本均为幂等设计IF NOT EXISTS / IF EXISTS
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
DSN = os.getenv("TEST_DB_DSN")
if not DSN:
print("ERROR: TEST_DB_DSN 未配置")
sys.exit(1)
MIGRATIONS_DIR = Path(__file__).resolve().parents[2] / "db" / "etl_feiqiu" / "migrations"
# 按顺序执行的迁移脚本2026-02-20 批次)
SCRIPTS = sorted([
f for f in MIGRATIONS_DIR.glob("2026-02-20__*.sql")
], key=lambda p: p.name)
def execute_migration(conn, script_path: Path) -> bool:
"""执行单个迁移脚本,返回是否成功"""
name = script_path.name
sql = script_path.read_text(encoding="utf-8")
# 提取主体 SQL去掉注释中的回滚和验证部分
# 找到第一个 "-- ===...回滚" 或文件末尾
main_sql_lines = []
in_rollback_or_verify = False
for line in sql.split("\n"):
stripped = line.strip()
if stripped.startswith("-- ====") and ("回滚" in stripped or "ROLLBACK" in stripped.upper()):
in_rollback_or_verify = True
if stripped.startswith("-- ====") and "验证" in stripped:
in_rollback_or_verify = True
if not in_rollback_or_verify:
main_sql_lines.append(line)
main_sql = "\n".join(main_sql_lines).strip()
if not main_sql:
print(f" ⚠️ {name}: 空脚本,跳过")
return True
try:
# 对于包含 BEGIN/COMMIT 的脚本,需要 autocommit
# 但 psycopg2 默认在事务中,我们直接执行即可
# 注意:脚本内部已有 BEGIN/COMMIT所以用 autocommit 模式
old_autocommit = conn.autocommit
conn.autocommit = True
cur = conn.cursor()
cur.execute(main_sql)
cur.close()
conn.autocommit = old_autocommit
print(f"{name}")
return True
except Exception as e:
conn.rollback()
print(f"{name}: {e}")
return False
def verify_all(conn):
"""执行迁移后的综合验证"""
cur = conn.cursor()
checks = []
# 1. dim_assistant_ex 新增 4 列
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dim_assistant_ex'
AND column_name IN ('system_role_id', 'job_num', 'cx_unit_price', 'pd_unit_price')
ORDER BY column_name
""")
cols = [r[0] for r in cur.fetchall()]
checks.append(("dim_assistant_ex +4列", len(cols) == 4, cols))
# 2. dwd_assistant_service_log_ex 新增 2 列
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dwd_assistant_service_log_ex'
AND column_name IN ('operator_id', 'operator_name')
ORDER BY column_name
""")
cols = [r[0] for r in cur.fetchall()]
checks.append(("dwd_assistant_service_log_ex +2列", len(cols) == 2, cols))
# 3. dim_table_ex 新增 14 列
cur.execute("""
SELECT count(*) FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dim_table_ex'
AND column_name IN (
'create_time', 'light_status', 'tablestatusname', 'sitename',
'applet_qr_code_url', 'audit_status', 'charge_free', 'delay_lights_time',
'is_rest_area', 'only_allow_groupon', 'order_delay_time', 'self_table',
'temporary_light_second', 'virtual_table'
)
""")
cnt = cur.fetchone()[0]
checks.append(("dim_table_ex +14列", cnt == 14, f"{cnt}/14"))
# 4. dwd_member_balance_change_ex.relate_id
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dwd_member_balance_change_ex'
AND column_name = 'relate_id'
""")
checks.append(("dwd_member_balance_change_ex +relate_id", cur.fetchone() is not None, ""))
# 5. dim_store_goods_ex.batch_stock_quantity
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dim_store_goods_ex'
AND column_name = 'batch_stock_quantity'
""")
checks.append(("dim_store_goods_ex +batch_stock_quantity", cur.fetchone() is not None, ""))
# 6. dwd_goods_stock_summary 表存在
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_summary'
""")
checks.append(("dwd_goods_stock_summary 已创建", cur.fetchone() is not None, ""))
# 7. dwd_goods_stock_movement 表存在
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_movement'
""")
checks.append(("dwd_goods_stock_movement 已创建", cur.fetchone() is not None, ""))
# 8. DWS 库存汇总 3 张表
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'dws' AND table_name LIKE 'dws_goods_stock_%_summary'
ORDER BY table_name
""")
tables = [r[0] for r in cur.fetchall()]
checks.append(("DWS 库存汇总 3 张表", len(tables) == 3, tables))
# 9. dwd_store_goods_sale: discount_money + discount_price 两列
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dwd_store_goods_sale'
AND column_name IN ('discount_money', 'discount_price')
ORDER BY column_name
""")
cols = [r[0] for r in cur.fetchall()]
checks.append(("dwd_store_goods_sale discount_money+discount_price", len(cols) == 2, cols))
# 10. settlement_ticket_details 已删除
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'ods' AND table_name = 'settlement_ticket_details'
""")
checks.append(("settlement_ticket_details 已删除", cur.fetchone() is None, ""))
cur.close()
print("\n" + "=" * 60)
print("迁移验证结果")
print("=" * 60)
all_ok = True
for name, ok, detail in checks:
status = "" if ok else ""
detail_str = f"{detail}" if detail else ""
print(f" {status} {name}{detail_str}")
if not ok:
all_ok = False
return all_ok
def main():
print(f"连接测试库: {DSN.split('@')[1] if '@' in DSN else DSN}")
print(f"迁移目录: {MIGRATIONS_DIR}")
print(f"发现 {len(SCRIPTS)} 个 2026-02-20 迁移脚本\n")
conn = psycopg2.connect(DSN)
print("执行迁移:")
success = 0
failed = 0
for script in SCRIPTS:
if execute_migration(conn, script):
success += 1
else:
failed += 1
print(f"\n执行完成: {success} 成功, {failed} 失败")
# 验证
all_ok = verify_all(conn)
conn.close()
if not all_ok:
print("\n⚠️ 部分验证未通过,请检查")
sys.exit(1)
else:
print("\n✅ 所有迁移已成功执行并验证通过")
if __name__ == "__main__":
main()