"""排查并修复所有 DWS 表中 site_id 仍为 integer 的列 → bigint""" import os from pathlib import Path from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parents[2] / ".env") import psycopg2 import psycopg2.extras dsn = os.environ.get("PG_DSN") if not dsn: raise RuntimeError("PG_DSN 未设置") conn = psycopg2.connect(dsn) conn.autocommit = False cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # 1. 查找所有 schema 中 site_id 为 integer 的 **基表**(排除视图) cur.execute(""" SELECT c.table_schema, c.table_name, c.data_type FROM information_schema.columns c JOIN information_schema.tables t ON c.table_schema = t.table_schema AND c.table_name = t.table_name WHERE c.column_name = 'site_id' AND c.data_type = 'integer' AND c.table_schema IN ('dws', 'dwd', 'ods', 'dim', 'quality', 'staging', 'app') AND t.table_type = 'BASE TABLE' ORDER BY c.table_schema, c.table_name """) rows = cur.fetchall() print(f"=== site_id 为 integer 的表: {len(rows)} 个 ===") for r in rows: print(f" {r['table_schema']}.{r['table_name']} ({r['data_type']})") # 2. 查找依赖这些表的视图 tables_to_fix = [(r['table_schema'], r['table_name']) for r in rows] if not tables_to_fix: print("\n无需修复") cur.close() conn.close() exit(0) # 查找视图依赖 cur.execute(""" SELECT DISTINCT v.table_schema AS view_schema, v.table_name AS view_name, t.table_schema AS dep_schema, t.table_name AS dep_table FROM information_schema.view_column_usage t JOIN information_schema.views v ON v.table_schema = t.view_schema AND v.table_name = t.view_name WHERE (t.table_schema, t.table_name) IN %s ORDER BY v.table_schema, v.table_name """, (tuple(tables_to_fix),)) view_deps = cur.fetchall() print(f"\n=== 视图依赖: {len(view_deps)} 个 ===") for v in view_deps: print(f" {v['view_schema']}.{v['view_name']} → {v['dep_schema']}.{v['dep_table']}") # 3. 收集需要 DROP/RECREATE 的视图定义 views_to_recreate = {} for v in view_deps: vkey = f"{v['view_schema']}.{v['view_name']}" if vkey not in views_to_recreate: cur.execute(""" SELECT definition FROM pg_views WHERE schemaname = %s AND viewname = %s """, (v['view_schema'], v['view_name'])) vdef = cur.fetchone() if vdef: views_to_recreate[vkey] = { 'schema': v['view_schema'], 'name': v['view_name'], 'definition': vdef['definition'] } # 4. 执行修复 print(f"\n=== 开始修复 ===") try: # 先 DROP 视图 for vkey, vinfo in views_to_recreate.items(): drop_sql = f"DROP VIEW IF EXISTS {vinfo['schema']}.{vinfo['name']} CASCADE" print(f" DROP VIEW {vkey}") cur.execute(drop_sql) # ALTER 表 for schema, table in tables_to_fix: alter_sql = f"ALTER TABLE {schema}.{table} ALTER COLUMN site_id TYPE bigint" print(f" ALTER {schema}.{table}.site_id → bigint") cur.execute(alter_sql) # 重建视图 for vkey, vinfo in views_to_recreate.items(): create_sql = f"CREATE OR REPLACE VIEW {vinfo['schema']}.{vinfo['name']} AS {vinfo['definition']}" print(f" RECREATE VIEW {vkey}") cur.execute(create_sql) conn.commit() print("\n✅ 全部修复完成") except Exception as e: conn.rollback() print(f"\n❌ 修复失败,已回滚: {e}") cur.close() conn.close()