Files
Neo-ZQYY/scripts/migrate/migrate_data.py

223 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
跨库数据迁移脚本LLZQ-test → etl_feiqiu
从旧 schema (billiards_ods/billiards_dwd/billiards_dws/etl_admin)
迁移到新 schema (ods/dwd/dws/meta)
策略:逐表 SELECT → INSERT使用 COPY 协议加速大表
"""
import sys
import io
import os
import psycopg2
import psycopg2.extras
# Windows 控制台 UTF-8 输出
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
DB_HOST = "100.64.0.4"
DB_PORT = 5432
DB_USER = "local-Python"
DB_PASS = "Neo-local-1991125"
OLD_DB = "LLZQ-test"
NEW_DB = "etl_feiqiu"
# 旧 schema → 新 schema 映射
SCHEMA_MAP = {
"billiards_ods": "ods",
"billiards_dwd": "dwd",
"billiards_dws": "dws",
"etl_admin": "meta",
}
def get_tables(conn, schema):
"""获取指定 schema 下所有用户表(排除物化视图)。"""
with conn.cursor() as cur:
cur.execute("""
SELECT tablename FROM pg_tables
WHERE schemaname = %s
ORDER BY tablename
""", (schema,))
return [r[0] for r in cur.fetchall()]
def get_columns(conn, schema, table):
"""获取表的列名列表(按 ordinal_position 排序)。"""
with conn.cursor() as cur:
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""", (schema, table))
return [r[0] for r in cur.fetchall()]
def get_row_count(conn, schema, table):
"""精确行数(不用近似值)。"""
with conn.cursor() as cur:
cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
return cur.fetchone()[0]
def migrate_table(src_conn, dst_conn, old_schema, new_schema, table, dst_columns):
"""使用 COPY 协议迁移单表数据。"""
# 获取源表列名
src_columns = get_columns(src_conn, old_schema, table)
# 取交集(按目标表列顺序),处理新旧表列不完全一致的情况
common_cols = [c for c in dst_columns if c in src_columns]
if not common_cols:
print(f" ⚠ 无公共列,跳过")
return 0
cols_sql = ", ".join(f'"{c}"' for c in common_cols)
# 使用 COPY TO/FROM 通过内存 buffer 传输
buf = io.BytesIO()
with src_conn.cursor() as src_cur:
copy_out_sql = f'COPY (SELECT {cols_sql} FROM "{old_schema}"."{table}") TO STDOUT WITH (FORMAT binary)'
src_cur.copy_expert(copy_out_sql, buf)
buf.seek(0)
data_size = buf.getbuffer().nbytes
if data_size <= 11: # binary COPY 空数据的 header+trailer 约 11 字节
return 0
with dst_conn.cursor() as dst_cur:
copy_in_sql = f'COPY "{new_schema}"."{table}" ({cols_sql}) FROM STDIN WITH (FORMAT binary)'
dst_cur.copy_expert(copy_in_sql, buf)
dst_conn.commit()
# 返回迁移后行数
return get_row_count(dst_conn, new_schema, table)
def migrate_indexes(src_conn, dst_conn, old_schema, new_schema):
"""迁移用户自定义索引(排除主键/唯一约束自动索引)。"""
with src_conn.cursor() as cur:
cur.execute("""
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = %s
AND indexname NOT IN (
SELECT conname FROM pg_constraint
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname = %s)
)
ORDER BY indexname
""", (old_schema, old_schema))
indexes = cur.fetchall()
created = 0
for idx_name, idx_def in indexes:
# 替换 schema 名
new_def = idx_def.replace(f'"{old_schema}"', f'"{new_schema}"')
new_def = new_def.replace(f'{old_schema}.', f'{new_schema}.')
# 添加 IF NOT EXISTS
new_def = new_def.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1)
new_def = new_def.replace("CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1)
try:
with dst_conn.cursor() as dst_cur:
dst_cur.execute(new_def)
dst_conn.commit()
created += 1
except Exception as e:
dst_conn.rollback()
print(f" ⚠ 索引 {idx_name} 创建失败: {e}")
return created, len(indexes)
def main():
print("=" * 60)
print("数据迁移: LLZQ-test → etl_feiqiu")
print("=" * 60)
src_conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=OLD_DB,
user=DB_USER, password=DB_PASS,
options="-c client_encoding=UTF8"
)
dst_conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=NEW_DB,
user=DB_USER, password=DB_PASS,
options="-c client_encoding=UTF8"
)
src_conn.autocommit = False
dst_conn.autocommit = False
total_rows = 0
total_tables = 0
total_indexes_created = 0
for old_schema, new_schema in SCHEMA_MAP.items():
print(f"\n{'' * 50}")
print(f"Schema: {old_schema}{new_schema}")
print(f"{'' * 50}")
tables = get_tables(src_conn, old_schema)
print(f"源表数量: {len(tables)}")
for table in tables:
src_count = get_row_count(src_conn, old_schema, table)
if src_count == 0:
print(f" {table}: 0 行,跳过")
continue
# 检查目标表是否存在
dst_columns = get_columns(dst_conn, new_schema, table)
if not dst_columns:
print(f"{table}: 目标表不存在,跳过")
continue
# 检查目标表是否已有数据
dst_count = get_row_count(dst_conn, new_schema, table)
if dst_count > 0 and dst_count >= src_count:
print(f" {table}: 目标已有 {dst_count} 行 (源 {src_count}),跳过")
total_rows += dst_count
total_tables += 1
continue
elif dst_count > 0 and dst_count < src_count:
# 部分迁移,先清空再重导
print(f" {table}: 目标有 {dst_count} 行 < 源 {src_count} 行,清空后重导...")
with dst_conn.cursor() as dst_cur:
dst_cur.execute(f'TRUNCATE "{new_schema}"."{table}" CASCADE')
dst_conn.commit()
try:
migrated = migrate_table(src_conn, dst_conn, old_schema, new_schema, table, dst_columns)
print(f" {table}: {src_count}{migrated} 行 ✓")
total_rows += migrated
total_tables += 1
except Exception as e:
dst_conn.rollback()
print(f"{table}: 迁移失败 - {e}")
# 迁移索引
print(f"\n 迁移索引 {old_schema}{new_schema} ...")
created, total_idx = migrate_indexes(src_conn, dst_conn, old_schema, new_schema)
total_indexes_created += created
print(f" 索引: {created}/{total_idx} 创建成功")
# 在新库执行 ANALYZE
print(f"\n{'' * 50}")
print("执行 ANALYZE ...")
dst_conn.autocommit = True
with dst_conn.cursor() as cur:
for new_schema in SCHEMA_MAP.values():
cur.execute(f"ANALYZE {new_schema}") # 不能用引号
print("ANALYZE 完成")
print(f"\n{'=' * 60}")
print(f"迁移完成: {total_tables} 表, {total_rows} 行, {total_indexes_created} 索引")
print(f"{'=' * 60}")
src_conn.close()
dst_conn.close()
if __name__ == "__main__":
main()