# -*- coding: utf-8 -*- """ 迁移收尾脚本:物化视图创建 + 索引 + ANALYZE + 最终验证 在新库 etl_feiqiu 上完成旧库 LLZQ-test 迁移的最后步骤。 """ import sys import psycopg2 if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") DB_HOST = "100.64.0.4" DB_PORT = 5432 DB_USER = "local-Python" DB_PASS = "Neo-local-1991125" OLD_DB = "LLZQ-test" NEW_DB = "etl_feiqiu" SCHEMA_MAP = { "billiards_ods": "ods", "billiards_dwd": "dwd", "billiards_dws": "dws", "etl_admin": "meta", } # 物化视图定义(从旧库提取,schema 已替换为 dws) MATVIEWS = [ ("mv_dws_assistant_daily_detail_l1", """CREATE MATERIALIZED VIEW dws.mv_dws_assistant_daily_detail_l1 AS SELECT * FROM dws.dws_assistant_daily_detail WHERE stat_date >= (CURRENT_DATE - '1 day'::interval) WITH DATA"""), ("mv_dws_assistant_daily_detail_l2", """CREATE MATERIALIZED VIEW dws.mv_dws_assistant_daily_detail_l2 AS SELECT * FROM dws.dws_assistant_daily_detail WHERE stat_date >= (CURRENT_DATE - '30 days'::interval) WITH DATA"""), ("mv_dws_assistant_daily_detail_l3", """CREATE MATERIALIZED VIEW dws.mv_dws_assistant_daily_detail_l3 AS SELECT * FROM dws.dws_assistant_daily_detail WHERE stat_date >= (CURRENT_DATE - '90 days'::interval) WITH DATA"""), ("mv_dws_assistant_daily_detail_l4", """CREATE MATERIALIZED VIEW dws.mv_dws_assistant_daily_detail_l4 AS SELECT * FROM dws.dws_assistant_daily_detail WHERE stat_date >= (date_trunc('month', CURRENT_DATE::timestamp with time zone) - '6 mons'::interval) AND stat_date < date_trunc('month', CURRENT_DATE::timestamp with time zone) WITH DATA"""), ("mv_dws_finance_daily_summary_l1", """CREATE MATERIALIZED VIEW dws.mv_dws_finance_daily_summary_l1 AS SELECT * FROM dws.dws_finance_daily_summary WHERE stat_date >= (CURRENT_DATE - '1 day'::interval) WITH DATA"""), ("mv_dws_finance_daily_summary_l2", """CREATE MATERIALIZED VIEW dws.mv_dws_finance_daily_summary_l2 AS SELECT * FROM dws.dws_finance_daily_summary WHERE stat_date >= (CURRENT_DATE - '30 days'::interval) WITH DATA"""), ("mv_dws_finance_daily_summary_l3", """CREATE MATERIALIZED VIEW dws.mv_dws_finance_daily_summary_l3 AS SELECT * FROM dws.dws_finance_daily_summary WHERE stat_date >= (CURRENT_DATE - '90 days'::interval) WITH DATA"""), ("mv_dws_finance_daily_summary_l4", """CREATE MATERIALIZED VIEW dws.mv_dws_finance_daily_summary_l4 AS SELECT * FROM dws.dws_finance_daily_summary WHERE stat_date >= (date_trunc('month', CURRENT_DATE::timestamp with time zone) - '6 mons'::interval) AND stat_date < date_trunc('month', CURRENT_DATE::timestamp with time zone) WITH DATA"""), ] # 物化视图索引 MV_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l1 ON dws.mv_dws_assistant_daily_detail_l1 USING btree (site_id, stat_date, assistant_id)", "CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l2 ON dws.mv_dws_assistant_daily_detail_l2 USING btree (site_id, stat_date, assistant_id)", "CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l3 ON dws.mv_dws_assistant_daily_detail_l3 USING btree (site_id, stat_date, assistant_id)", "CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l4 ON dws.mv_dws_assistant_daily_detail_l4 USING btree (site_id, stat_date, assistant_id)", "CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l1 ON dws.mv_dws_finance_daily_summary_l1 USING btree (site_id, stat_date)", "CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l2 ON dws.mv_dws_finance_daily_summary_l2 USING btree (site_id, stat_date)", "CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l3 ON dws.mv_dws_finance_daily_summary_l3 USING btree (site_id, stat_date)", "CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l4 ON dws.mv_dws_finance_daily_summary_l4 USING btree (site_id, stat_date)", ] def count_rows(conn, schema, table): with conn.cursor() as cur: cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"') return cur.fetchone()[0] def step1_create_matviews(conn): """创建 8 个物化视图。""" print("=" * 60) print("步骤 1: 创建物化视图") print("=" * 60) ok = 0 for name, ddl in MATVIEWS: try: with conn.cursor() as cur: # 先检查是否已存在 cur.execute(""" SELECT 1 FROM pg_matviews WHERE schemaname = 'dws' AND matviewname = %s """, (name,)) if cur.fetchone(): print(f" {name}: 已存在,跳过") ok += 1 continue with conn.cursor() as cur: cur.execute(ddl) conn.commit() rows = count_rows(conn, "dws", name) print(f" {name}: 创建成功 ({rows} 行)") ok += 1 except Exception as e: conn.rollback() print(f" {name}: 创建失败 - {e}") print(f"物化视图: {ok}/{len(MATVIEWS)} 成功\n") return ok def step2_create_mv_indexes(conn): """创建物化视图索引。""" print("=" * 60) print("步骤 2: 创建物化视图索引") print("=" * 60) ok = 0 for idx_sql in MV_INDEXES: idx_name = idx_sql.split("IF NOT EXISTS ")[1].split(" ON ")[0] try: with conn.cursor() as cur: cur.execute(idx_sql) conn.commit() print(f" {idx_name}: OK") ok += 1 except Exception as e: conn.rollback() print(f" {idx_name}: 失败 - {e}") print(f"索引: {ok}/{len(MV_INDEXES)} 成功\n") return ok def step3_analyze(conn): """对所有 schema 执行 ANALYZE。""" print("=" * 60) print("步骤 3: ANALYZE") print("=" * 60) # 关键:必须在 autocommit 模式下执行 old_autocommit = conn.autocommit conn.autocommit = True try: with conn.cursor() as cur: for schema in ["ods", "dwd", "dws", "meta", "core", "app"]: # 获取该 schema 下所有表 cur.execute(""" SELECT tablename FROM pg_tables WHERE schemaname = %s UNION ALL SELECT matviewname FROM pg_matviews WHERE schemaname = %s """, (schema, schema)) tables = [r[0] for r in cur.fetchall()] for t in tables: cur.execute(f'ANALYZE "{schema}"."{t}"') print(f" {schema}: {len(tables)} 个对象已 ANALYZE") print("ANALYZE 完成\n") finally: conn.autocommit = old_autocommit def step4_verify(src_conn, dst_conn): """最终验证:对比所有有数据表的行数。""" print("=" * 60) print("步骤 4: 最终验证") print("=" * 60) all_ok = True total_tables = 0 total_rows = 0 for old_s, new_s in SCHEMA_MAP.items(): with src_conn.cursor() as cur: cur.execute( "SELECT tablename FROM pg_tables WHERE schemaname = %s ORDER BY tablename", (old_s,)) tables = [r[0] for r in cur.fetchall()] for t in tables: s_cnt = count_rows(src_conn, old_s, t) if s_cnt == 0: continue # 检查目标表是否存在 with dst_conn.cursor() as cur: cur.execute(""" SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s """, (new_s, t)) if not cur.fetchone(): print(f" MISS {new_s}.{t}: 目标表不存在") all_ok = False continue d_cnt = count_rows(dst_conn, new_s, t) total_tables += 1 total_rows += d_cnt if d_cnt == s_cnt: print(f" OK {new_s}.{t}: {s_cnt} 行") elif new_s == "meta" and t == "etl_task" and d_cnt > s_cnt: # 新库种子数据多几条,正常 print(f" OK* {new_s}.{t}: 源={s_cnt} 目标={d_cnt} (种子数据)") else: print(f" FAIL {new_s}.{t}: 源={s_cnt} 目标={d_cnt}") all_ok = False # 验证物化视图存在 print(f"\n 物化视图检查:") with dst_conn.cursor() as cur: cur.execute("SELECT matviewname FROM pg_matviews WHERE schemaname = 'dws' ORDER BY matviewname") mvs = [r[0] for r in cur.fetchall()] for mv_name, _ in MATVIEWS: if mv_name in mvs: rows = count_rows(dst_conn, "dws", mv_name) print(f" OK dws.{mv_name}: {rows} 行") else: print(f" MISS dws.{mv_name}") all_ok = False # 验证索引数量 print(f"\n 索引统计:") with dst_conn.cursor() as cur: for schema in ["ods", "dwd", "dws", "meta"]: cur.execute( "SELECT COUNT(*) FROM pg_indexes WHERE schemaname = %s", (schema,)) idx_cnt = cur.fetchone()[0] print(f" {schema}: {idx_cnt} 个索引") print(f"\n{'=' * 60}") if all_ok: print(f"验证通过: {total_tables} 表, {total_rows} 行全部一致") else: print("验证发现不一致,请检查上方 FAIL/MISS 项") print(f"{'=' * 60}") return all_ok def main(): # 连接新库 dst = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=NEW_DB, user=DB_USER, password=DB_PASS, options="-c client_encoding=UTF8" ) # 步骤 1: 物化视图 step1_create_matviews(dst) # 步骤 2: 物化视图索引 step2_create_mv_indexes(dst) # 步骤 3: ANALYZE step3_analyze(dst) # 步骤 4: 验证(需要连接旧库对比) src = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=OLD_DB, user=DB_USER, password=DB_PASS, options="-c client_encoding=UTF8" ) step4_verify(src, dst) src.close() dst.close() if __name__ == "__main__": main()