335 lines
11 KiB
Python
335 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
从正式库完整镜像到测试库:
|
||
etl_feiqiu → test_etl_feiqiu(六层 schema + 数据 + 索引 + 物化视图)
|
||
zqyy_app → test_zqyy_app(全部表 + 数据 + 索引)
|
||
|
||
策略:先用 init_databases.py 的 DDL 建表,再用 COPY 协议迁移数据,
|
||
最后迁移自定义索引和物化视图。
|
||
"""
|
||
import sys
|
||
import os
|
||
import io
|
||
import psycopg2
|
||
|
||
if sys.platform == "win32":
|
||
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
||
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
||
|
||
DB_HOST = "100.64.0.4"
|
||
DB_PORT = 5432
|
||
DB_USER = "local-Python"
|
||
DB_PASS = "Neo-local-1991125"
|
||
DB_OPTS = "-c client_encoding=UTF8"
|
||
|
||
# 源库 → 测试库
|
||
CLONE_PAIRS = [
|
||
("etl_feiqiu", "test_etl_feiqiu"),
|
||
("zqyy_app", "test_zqyy_app"),
|
||
]
|
||
|
||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
|
||
def conn_to(dbname):
|
||
return psycopg2.connect(
|
||
host=DB_HOST, port=DB_PORT, dbname=dbname,
|
||
user=DB_USER, password=DB_PASS, options=DB_OPTS)
|
||
|
||
|
||
def execute_sql_file(conn, filepath, label=""):
|
||
full = os.path.join(BASE_DIR, filepath)
|
||
if not os.path.exists(full):
|
||
print(f" [SKIP] 不存在: {filepath}")
|
||
return False
|
||
with open(full, "r", encoding="utf-8") as f:
|
||
sql = f.read()
|
||
if not sql.strip():
|
||
return False
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
conn.commit()
|
||
print(f" [OK] {label or filepath}")
|
||
return True
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" [FAIL] {label or filepath}: {e}")
|
||
return False
|
||
|
||
|
||
def get_schemas(conn):
|
||
"""获取用户自定义 schema 列表。"""
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT nspname FROM pg_namespace
|
||
WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'
|
||
ORDER BY nspname
|
||
""")
|
||
return [r[0] for r in cur.fetchall()]
|
||
|
||
|
||
def get_tables(conn, schema):
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename", (schema,))
|
||
return [r[0] for r in cur.fetchall()]
|
||
|
||
|
||
def get_columns(conn, schema, table):
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT column_name FROM information_schema.columns
|
||
WHERE table_schema = %s AND table_name = %s
|
||
ORDER BY ordinal_position
|
||
""", (schema, table))
|
||
return [r[0] for r in cur.fetchall()]
|
||
|
||
|
||
def count_rows(conn, schema, table):
|
||
with conn.cursor() as cur:
|
||
cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
|
||
return cur.fetchone()[0]
|
||
|
||
|
||
def copy_table(src, dst, schema, table):
|
||
"""用 COPY 协议迁移单表数据。"""
|
||
src_cols = get_columns(src, schema, table)
|
||
dst_cols = get_columns(dst, schema, table)
|
||
if not src_cols or not dst_cols:
|
||
return 0
|
||
common = [c for c in dst_cols if c in src_cols]
|
||
if not common:
|
||
return 0
|
||
cols_sql = ", ".join(f'"{c}"' for c in common)
|
||
|
||
buf = io.BytesIO()
|
||
with src.cursor() as cur:
|
||
cur.copy_expert(f'COPY (SELECT {cols_sql} FROM "{schema}"."{table}") TO STDOUT WITH (FORMAT binary)', buf)
|
||
buf.seek(0)
|
||
if buf.getbuffer().nbytes <= 11:
|
||
return 0
|
||
with dst.cursor() as cur:
|
||
cur.copy_expert(f'COPY "{schema}"."{table}" ({cols_sql}) FROM STDIN WITH (FORMAT binary)', buf)
|
||
dst.commit()
|
||
return count_rows(dst, schema, table)
|
||
|
||
|
||
def migrate_indexes(src, dst, schema):
|
||
"""迁移用户自定义索引。"""
|
||
with src.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT indexname, indexdef FROM pg_indexes
|
||
WHERE schemaname = %s
|
||
AND indexname NOT IN (
|
||
SELECT conname FROM pg_constraint
|
||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname = %s))
|
||
ORDER BY indexname
|
||
""", (schema, schema))
|
||
indexes = cur.fetchall()
|
||
ok = 0
|
||
for name, defn in indexes:
|
||
new_def = defn.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1)
|
||
new_def = new_def.replace("CREATE UNIQUE INDEX", "CREATE UNIQUE INDEX IF NOT EXISTS", 1)
|
||
try:
|
||
with dst.cursor() as cur:
|
||
cur.execute(new_def)
|
||
dst.commit()
|
||
ok += 1
|
||
except Exception as e:
|
||
dst.rollback()
|
||
# 物化视图索引可能因视图不存在而失败,后面会处理
|
||
if "不存在" not in str(e) and "does not exist" not in str(e):
|
||
print(f" 索引 {name}: {e}")
|
||
return ok, len(indexes)
|
||
|
||
|
||
def migrate_matviews(src, dst, schema):
|
||
"""迁移物化视图(从源库获取定义,替换 schema 后在目标库创建)。"""
|
||
with src.cursor() as cur:
|
||
cur.execute("SELECT matviewname, definition FROM pg_matviews WHERE schemaname = %s ORDER BY matviewname", (schema,))
|
||
mvs = cur.fetchall()
|
||
if not mvs:
|
||
return 0, 0
|
||
ok = 0
|
||
for name, defn in mvs:
|
||
# 检查目标库是否已存在
|
||
with dst.cursor() as cur:
|
||
cur.execute("SELECT 1 FROM pg_matviews WHERE schemaname = %s AND matviewname = %s", (schema, name))
|
||
if cur.fetchone():
|
||
ok += 1
|
||
continue
|
||
try:
|
||
# pg_matviews.definition 末尾可能带分号,需去掉后再拼 WITH DATA
|
||
clean_def = defn.rstrip().rstrip(";").rstrip()
|
||
with dst.cursor() as cur:
|
||
cur.execute(f'CREATE MATERIALIZED VIEW "{schema}"."{name}" AS {clean_def} WITH DATA')
|
||
dst.commit()
|
||
ok += 1
|
||
except Exception as e:
|
||
dst.rollback()
|
||
print(f" 物化视图 {name}: {e}")
|
||
return ok, len(mvs)
|
||
|
||
|
||
def init_test_etl_feiqiu(conn):
|
||
"""用 DDL 文件初始化 test_etl_feiqiu 的六层 schema。"""
|
||
print(" 初始化 DDL...")
|
||
files = [
|
||
("db/etl_feiqiu/schemas/meta.sql", "meta"),
|
||
("db/etl_feiqiu/schemas/ods.sql", "ods"),
|
||
("db/etl_feiqiu/schemas/dwd.sql", "dwd"),
|
||
("db/etl_feiqiu/schemas/core.sql", "core"),
|
||
("db/etl_feiqiu/schemas/dws.sql", "dws"),
|
||
("db/etl_feiqiu/schemas/app.sql", "app"),
|
||
]
|
||
for fp, label in files:
|
||
execute_sql_file(conn, fp, label)
|
||
# 种子数据不导入——后面会从正式库 COPY 全量数据
|
||
|
||
|
||
def init_test_zqyy_app(conn):
|
||
"""用 DDL 文件初始化 test_zqyy_app。"""
|
||
print(" 初始化 DDL...")
|
||
files = [
|
||
("db/zqyy_app/schemas/init.sql", "zqyy_app schema"),
|
||
("db/zqyy_app/migrations/20250715_create_admin_web_tables.sql", "admin_web 迁移"),
|
||
]
|
||
for fp, label in files:
|
||
execute_sql_file(conn, fp, label)
|
||
|
||
|
||
def clone_database(src_name, dst_name):
|
||
"""完整镜像一个数据库。"""
|
||
print(f"\n{'='*60}")
|
||
print(f"镜像: {src_name} → {dst_name}")
|
||
print(f"{'='*60}")
|
||
|
||
src = conn_to(src_name)
|
||
dst = conn_to(dst_name)
|
||
|
||
# 步骤 1: 初始化 DDL
|
||
if dst_name == "test_etl_feiqiu":
|
||
init_test_etl_feiqiu(dst)
|
||
elif dst_name == "test_zqyy_app":
|
||
init_test_zqyy_app(dst)
|
||
|
||
# 步骤 2: 迁移数据
|
||
print("\n 迁移数据...")
|
||
schemas = get_schemas(src)
|
||
# 只迁移源库中有表的 schema
|
||
total_rows = 0
|
||
total_tables = 0
|
||
for schema in schemas:
|
||
tables = get_tables(src, schema)
|
||
if not tables:
|
||
continue
|
||
# 确保目标库有这个 schema
|
||
with dst.cursor() as cur:
|
||
cur.execute(f'CREATE SCHEMA IF NOT EXISTS "{schema}"')
|
||
dst.commit()
|
||
|
||
for t in tables:
|
||
s_cnt = count_rows(src, schema, t)
|
||
if s_cnt == 0:
|
||
continue
|
||
# 检查目标表是否存在
|
||
dst_cols = get_columns(dst, schema, t)
|
||
if not dst_cols:
|
||
continue
|
||
# 检查是否已有数据
|
||
d_cnt = count_rows(dst, schema, t)
|
||
if d_cnt >= s_cnt:
|
||
total_rows += d_cnt
|
||
total_tables += 1
|
||
continue
|
||
if d_cnt > 0:
|
||
with dst.cursor() as cur:
|
||
cur.execute(f'TRUNCATE "{schema}"."{t}" CASCADE')
|
||
dst.commit()
|
||
try:
|
||
migrated = copy_table(src, dst, schema, t)
|
||
total_rows += migrated
|
||
total_tables += 1
|
||
if migrated != s_cnt:
|
||
print(f" ⚠ {schema}.{t}: src={s_cnt} dst={migrated}")
|
||
except Exception as e:
|
||
dst.rollback()
|
||
print(f" ✗ {schema}.{t}: {e}")
|
||
|
||
print(f" 数据: {total_tables} 表, {total_rows} 行")
|
||
|
||
# 步骤 3: 物化视图
|
||
print("\n 迁移物化视图...")
|
||
for schema in schemas:
|
||
ok, total = migrate_matviews(src, dst, schema)
|
||
if total > 0:
|
||
print(f" {schema}: {ok}/{total}")
|
||
|
||
# 步骤 4: 索引
|
||
print("\n 迁移索引...")
|
||
total_idx = 0
|
||
for schema in schemas:
|
||
ok, total = migrate_indexes(src, dst, schema)
|
||
total_idx += ok
|
||
if total > 0:
|
||
print(f" {schema}: {ok}/{total}")
|
||
print(f" 索引: {total_idx} 个")
|
||
|
||
# 步骤 5: ANALYZE
|
||
print("\n ANALYZE...")
|
||
dst.autocommit = True
|
||
with dst.cursor() as cur:
|
||
for schema in schemas:
|
||
cur.execute(f"""
|
||
SELECT tablename FROM pg_tables WHERE schemaname = '{schema}'
|
||
UNION ALL
|
||
SELECT matviewname FROM pg_matviews WHERE schemaname = '{schema}'
|
||
""")
|
||
for (obj,) in cur.fetchall():
|
||
cur.execute(f'ANALYZE "{schema}"."{obj}"')
|
||
dst.autocommit = False
|
||
print(" ANALYZE 完成")
|
||
|
||
# 步骤 6: 验证
|
||
print("\n 验证...")
|
||
all_ok = True
|
||
for schema in schemas:
|
||
tables = get_tables(src, schema)
|
||
for t in tables:
|
||
s = count_rows(src, schema, t)
|
||
if s == 0:
|
||
continue
|
||
dst_cols = get_columns(dst, schema, t)
|
||
if not dst_cols:
|
||
print(f" MISS {schema}.{t}")
|
||
all_ok = False
|
||
continue
|
||
d = count_rows(dst, schema, t)
|
||
if d != s:
|
||
print(f" FAIL {schema}.{t}: src={s} dst={d}")
|
||
all_ok = False
|
||
|
||
if all_ok:
|
||
print(" ✓ 全部一致")
|
||
else:
|
||
print(" ✗ 存在不一致")
|
||
|
||
src.close()
|
||
dst.close()
|
||
return all_ok
|
||
|
||
|
||
def main():
|
||
results = {}
|
||
for src_name, dst_name in CLONE_PAIRS:
|
||
results[dst_name] = clone_database(src_name, dst_name)
|
||
|
||
print(f"\n{'='*60}")
|
||
for db, ok in results.items():
|
||
print(f" {db}: {'OK' if ok else 'FAIL'}")
|
||
print(f"{'='*60}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|