# -*- coding: utf-8 -*- """ 跨库数据迁移脚本:LLZQ-test → etl_feiqiu 从旧 schema (billiards_ods/billiards_dwd/billiards_dws/etl_admin) 迁移到新 schema (ods/dwd/dws/meta) 策略:逐表 SELECT → INSERT,使用 COPY 协议加速大表 """ import sys import io import os import psycopg2 import psycopg2.extras # Windows 控制台 UTF-8 输出 if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") DB_HOST = "100.64.0.4" DB_PORT = 5432 DB_USER = "local-Python" DB_PASS = "Neo-local-1991125" OLD_DB = "LLZQ-test" NEW_DB = "etl_feiqiu" # 旧 schema → 新 schema 映射 SCHEMA_MAP = { "billiards_ods": "ods", "billiards_dwd": "dwd", "billiards_dws": "dws", "etl_admin": "meta", } def get_tables(conn, schema): """获取指定 schema 下所有用户表(排除物化视图)。""" with conn.cursor() as cur: cur.execute(""" SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename """, (schema,)) return [r[0] for r in cur.fetchall()] def get_columns(conn, schema, table): """获取表的列名列表(按 ordinal_position 排序)。""" with conn.cursor() as cur: cur.execute(""" SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (schema, table)) return [r[0] for r in cur.fetchall()] def get_row_count(conn, schema, table): """精确行数(不用近似值)。""" with conn.cursor() as cur: cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"') return cur.fetchone()[0] def migrate_table(src_conn, dst_conn, old_schema, new_schema, table, dst_columns): """使用 COPY 协议迁移单表数据。""" # 获取源表列名 src_columns = get_columns(src_conn, old_schema, table) # 取交集(按目标表列顺序),处理新旧表列不完全一致的情况 common_cols = [c for c in dst_columns if c in src_columns] if not common_cols: print(f" ⚠ 无公共列,跳过") return 0 cols_sql = ", ".join(f'"{c}"' for c in common_cols) # 使用 COPY TO/FROM 通过内存 buffer 传输 buf = io.BytesIO() with src_conn.cursor() as src_cur: copy_out_sql = f'COPY (SELECT {cols_sql} FROM "{old_schema}"."{table}") TO STDOUT WITH (FORMAT binary)' src_cur.copy_expert(copy_out_sql, buf) buf.seek(0) data_size = buf.getbuffer().nbytes if data_size <= 11: # binary COPY 空数据的 header+trailer 约 11 字节 return 0 with dst_conn.cursor() as dst_cur: copy_in_sql = f'COPY "{new_schema}"."{table}" ({cols_sql}) FROM STDIN WITH (FORMAT binary)' dst_cur.copy_expert(copy_in_sql, buf) dst_conn.commit() # 返回迁移后行数 return get_row_count(dst_conn, new_schema, table) def migrate_indexes(src_conn, dst_conn, old_schema, new_schema): """迁移用户自定义索引(排除主键/唯一约束自动索引)。""" with src_conn.cursor() as cur: cur.execute(""" SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = %s AND indexname NOT IN ( SELECT conname FROM pg_constraint WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname = %s) ) ORDER BY indexname """, (old_schema, old_schema)) indexes = cur.fetchall() created = 0 for idx_name, idx_def in indexes: # 替换 schema 名 new_def = idx_def.replace(f'"{old_schema}"', f'"{new_schema}"') new_def = new_def.replace(f'{old_schema}.', f'{new_schema}.') # 添加 IF NOT EXISTS new_def = new_def.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1) new_def = new_def.replace("CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1) try: with dst_conn.cursor() as dst_cur: dst_cur.execute(new_def) dst_conn.commit() created += 1 except Exception as e: dst_conn.rollback() print(f" ⚠ 索引 {idx_name} 创建失败: {e}") return created, len(indexes) def main(): print("=" * 60) print("数据迁移: LLZQ-test → etl_feiqiu") print("=" * 60) src_conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=OLD_DB, user=DB_USER, password=DB_PASS, options="-c client_encoding=UTF8" ) dst_conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=NEW_DB, user=DB_USER, password=DB_PASS, options="-c client_encoding=UTF8" ) src_conn.autocommit = False dst_conn.autocommit = False total_rows = 0 total_tables = 0 total_indexes_created = 0 for old_schema, new_schema in SCHEMA_MAP.items(): print(f"\n{'─' * 50}") print(f"Schema: {old_schema} → {new_schema}") print(f"{'─' * 50}") tables = get_tables(src_conn, old_schema) print(f"源表数量: {len(tables)}") for table in tables: src_count = get_row_count(src_conn, old_schema, table) if src_count == 0: print(f" {table}: 0 行,跳过") continue # 检查目标表是否存在 dst_columns = get_columns(dst_conn, new_schema, table) if not dst_columns: print(f" ⚠ {table}: 目标表不存在,跳过") continue # 检查目标表是否已有数据 dst_count = get_row_count(dst_conn, new_schema, table) if dst_count > 0 and dst_count >= src_count: print(f" {table}: 目标已有 {dst_count} 行 (源 {src_count}),跳过") total_rows += dst_count total_tables += 1 continue elif dst_count > 0 and dst_count < src_count: # 部分迁移,先清空再重导 print(f" {table}: 目标有 {dst_count} 行 < 源 {src_count} 行,清空后重导...") with dst_conn.cursor() as dst_cur: dst_cur.execute(f'TRUNCATE "{new_schema}"."{table}" CASCADE') dst_conn.commit() try: migrated = migrate_table(src_conn, dst_conn, old_schema, new_schema, table, dst_columns) print(f" {table}: {src_count} → {migrated} 行 ✓") total_rows += migrated total_tables += 1 except Exception as e: dst_conn.rollback() print(f" ✗ {table}: 迁移失败 - {e}") # 迁移索引 print(f"\n 迁移索引 {old_schema} → {new_schema} ...") created, total_idx = migrate_indexes(src_conn, dst_conn, old_schema, new_schema) total_indexes_created += created print(f" 索引: {created}/{total_idx} 创建成功") # 在新库执行 ANALYZE print(f"\n{'─' * 50}") print("执行 ANALYZE ...") dst_conn.autocommit = True with dst_conn.cursor() as cur: for new_schema in SCHEMA_MAP.values(): cur.execute(f"ANALYZE {new_schema}") # 不能用引号 print("ANALYZE 完成") print(f"\n{'=' * 60}") print(f"迁移完成: {total_tables} 表, {total_rows} 行, {total_indexes_created} 索引") print(f"{'=' * 60}") src_conn.close() dst_conn.close() if __name__ == "__main__": main()