Files
Neo-ZQYY/scripts/ops/run_migration_c1.py

155 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
在 test_etl_feiqiu 上执行迁移脚本 C1ODS/DWD 层会员表新增 birthday 列。
执行后自动运行验证 SQL 确认列已添加。
用法python scripts/ops/run_migration_c1.py
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
# 加载根 .env
ROOT_DIR = Path(__file__).resolve().parents[2]
load_dotenv(ROOT_DIR / ".env")
TEST_DB_DSN = os.environ.get("TEST_DB_DSN")
if not TEST_DB_DSN:
print("❌ 错误TEST_DB_DSN 环境变量未定义,请检查根 .env 文件")
sys.exit(1)
# 确认连接的是测试库
if "test_etl_feiqiu" not in TEST_DB_DSN:
print(f"❌ 安全检查失败TEST_DB_DSN 未指向 test_etl_feiqiu\n 当前值: {TEST_DB_DSN}")
sys.exit(1)
import psycopg2
MIGRATION_FILE = ROOT_DIR / "db" / "etl_feiqiu" / "migrations" / "2026-02-22__C1_dim_member_add_birthday.sql"
def run_migration(conn):
"""执行迁移脚本"""
sql = MIGRATION_FILE.read_text(encoding="utf-8")
# 提取 BEGIN...COMMIT 之间的 DDL 语句(跳过注释中的回滚和验证部分)
# 迁移脚本本身包含 BEGIN/COMMITpsycopg2 默认 autocommit=False 会冲突
# 所以用 autocommit 模式,让脚本自己管理事务
conn.autocommit = True
cur = conn.cursor()
# 逐条执行 DDL跳过纯注释行和空行提取有效 SQL
statements = []
current = []
in_block = False
for line in sql.splitlines():
stripped = line.strip()
# 跳过回滚和验证部分(它们被注释掉了)
if stripped.startswith("--"):
# BEGIN 标记进入有效区域
if "回滚" in stripped or "验证 SQL" in stripped:
break
continue
if not stripped:
continue
# 跳过 BEGIN/COMMIT我们用 autocommit
if stripped.upper() in ("BEGIN;", "COMMIT;"):
continue
current.append(line)
if stripped.endswith(";"):
statements.append("\n".join(current))
current = []
print(f"📄 迁移文件: {MIGRATION_FILE.name}")
print(f"🔗 目标库: test_etl_feiqiu")
print(f"📝 待执行语句: {len(statements)}\n")
for i, stmt in enumerate(statements, 1):
print(f" [{i}] {stmt.strip()[:80]}...")
cur.execute(stmt)
print(f" ✅ 执行成功")
cur.close()
print(f"\n✅ 迁移脚本执行完成")
def run_verification(conn):
"""执行验证 SQL确认列已添加"""
conn.autocommit = True
cur = conn.cursor()
print("\n" + "=" * 60)
print("🔍 验证结果")
print("=" * 60)
# 验证 1ods.member_profiles.birthday 列存在
cur.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'ods'
AND table_name = 'member_profiles'
AND column_name = 'birthday'
""")
row = cur.fetchone()
if row:
print(f" ✅ ods.member_profiles.birthday 存在 (类型: {row[1]})")
else:
print(f" ❌ ods.member_profiles.birthday 不存在!")
# 验证 2dwd.dim_member.birthday 列存在
cur.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'dwd'
AND table_name = 'dim_member'
AND column_name = 'birthday'
""")
row = cur.fetchone()
if row:
print(f" ✅ dwd.dim_member.birthday 存在 (类型: {row[1]})")
else:
print(f" ❌ dwd.dim_member.birthday 不存在!")
# 验证 3列注释已设置
cur.execute("""
SELECT col_description(c.oid, a.attnum)
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_attribute a ON a.attrelid = c.oid
WHERE n.nspname = 'dwd'
AND c.relname = 'dim_member'
AND a.attname = 'birthday'
""")
row = cur.fetchone()
if row and row[0]:
print(f" ✅ dwd.dim_member.birthday 注释: {row[0]}")
else:
print(f" ⚠️ dwd.dim_member.birthday 注释未设置")
cur.close()
print("\n🏁 验证完成")
def main():
print("=" * 60)
print("迁移脚本 C1ODS/DWD 层会员表新增 birthday 列")
print("=" * 60 + "\n")
conn = psycopg2.connect(TEST_DB_DSN)
try:
run_migration(conn)
run_verification(conn)
finally:
conn.close()
if __name__ == "__main__":
main()