Files
Neo-ZQYY/scripts/ops/fix_test_db.py

175 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
修复 test_etl_feiqiu补齐 meta 数据 + 创建物化视图 + 索引 + ANALYZE
"""
import sys
import io
import psycopg2
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
DB = dict(host="100.64.0.4", port=5432, user="local-Python",
password="Neo-local-1991125", options="-c client_encoding=UTF8")
def conn(dbname):
return psycopg2.connect(dbname=dbname, **DB)
def count(c, schema, table):
with c.cursor() as cur:
cur.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
return cur.fetchone()[0]
def get_columns(c, schema, table):
with c.cursor() as cur:
cur.execute("""SELECT column_name FROM information_schema.columns
WHERE table_schema=%s AND table_name=%s
ORDER BY ordinal_position""", (schema, table))
return [r[0] for r in cur.fetchall()]
def copy_table(src, dst, schema, table):
src_cols = get_columns(src, schema, table)
dst_cols = get_columns(dst, schema, table)
common = [c for c in dst_cols if c in src_cols]
if not common:
return 0
cols = ", ".join(f'"{c}"' for c in common)
# TRUNCATE 先清空
with dst.cursor() as cur:
cur.execute(f'TRUNCATE "{schema}"."{table}" CASCADE')
dst.commit()
# COPY
buf = io.BytesIO()
with src.cursor() as cur:
cur.copy_expert(f'COPY (SELECT {cols} FROM "{schema}"."{table}") TO STDOUT WITH (FORMAT binary)', buf)
buf.seek(0)
if buf.getbuffer().nbytes <= 11:
return 0
with dst.cursor() as cur:
cur.copy_expert(f'COPY "{schema}"."{table}" ({cols}) FROM STDIN WITH (FORMAT binary)', buf)
dst.commit()
return count(dst, schema, table)
def main():
src = conn("etl_feiqiu")
dst = conn("test_etl_feiqiu")
# ── 1. 补齐 meta 数据 ──
print("=== 补齐 meta 数据 ===")
for t in ["etl_cursor", "etl_run", "etl_task"]:
s = count(src, "meta", t)
d = count(dst, "meta", t)
if d >= s and s > 0:
print(f" {t}: 已一致 ({d} 行)")
continue
if s == 0:
print(f" {t}: 源为空,跳过")
continue
rows = copy_table(src, dst, "meta", t)
print(f" {t}: {s}{rows}")
# ── 2. 创建物化视图 ──
print("\n=== 创建物化视图 ===")
with src.cursor() as cur:
cur.execute("SELECT matviewname, definition FROM pg_matviews WHERE schemaname='dws' ORDER BY 1")
mvs = cur.fetchall()
for name, defn in mvs:
with dst.cursor() as cur:
cur.execute("SELECT 1 FROM pg_matviews WHERE schemaname='dws' AND matviewname=%s", (name,))
if cur.fetchone():
print(f" {name}: 已存在")
continue
# 去掉末尾分号
clean = defn.rstrip().rstrip(";").rstrip()
try:
with dst.cursor() as cur:
cur.execute(f'CREATE MATERIALIZED VIEW dws."{name}" AS {clean} WITH DATA')
dst.commit()
rows = count(dst, "dws", name)
print(f" {name}: 创建成功 ({rows} 行)")
except Exception as e:
dst.rollback()
print(f" {name}: 失败 - {e}")
# ── 3. 物化视图索引 ──
print("\n=== 物化视图索引 ===")
mv_indexes = [
"CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l1 ON dws.mv_dws_assistant_daily_detail_l1 (site_id, stat_date, assistant_id)",
"CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l2 ON dws.mv_dws_assistant_daily_detail_l2 (site_id, stat_date, assistant_id)",
"CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l3 ON dws.mv_dws_assistant_daily_detail_l3 (site_id, stat_date, assistant_id)",
"CREATE INDEX IF NOT EXISTS idx_mv_assistant_daily_l4 ON dws.mv_dws_assistant_daily_detail_l4 (site_id, stat_date, assistant_id)",
"CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l1 ON dws.mv_dws_finance_daily_summary_l1 (site_id, stat_date)",
"CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l2 ON dws.mv_dws_finance_daily_summary_l2 (site_id, stat_date)",
"CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l3 ON dws.mv_dws_finance_daily_summary_l3 (site_id, stat_date)",
"CREATE INDEX IF NOT EXISTS idx_mv_finance_daily_l4 ON dws.mv_dws_finance_daily_summary_l4 (site_id, stat_date)",
]
for sql in mv_indexes:
idx = sql.split("EXISTS ")[1].split(" ON ")[0]
try:
with dst.cursor() as cur:
cur.execute(sql)
dst.commit()
print(f" {idx}: OK")
except Exception as e:
dst.rollback()
print(f" {idx}: {e}")
# ── 4. ANALYZE ──
print("\n=== ANALYZE ===")
dst.autocommit = True
with dst.cursor() as cur:
for schema in ["ods", "dwd", "dws", "meta", "core", "app"]:
cur.execute(f"""
SELECT tablename FROM pg_tables WHERE schemaname='{schema}'
UNION ALL
SELECT matviewname FROM pg_matviews WHERE schemaname='{schema}'
""")
objs = [r[0] for r in cur.fetchall()]
for o in objs:
cur.execute(f'ANALYZE "{schema}"."{o}"')
print(f" {schema}: {len(objs)} 个对象")
dst.autocommit = False
# ── 5. 最终验证 ──
print("\n=== 最终验证 ===")
ok = True
for schema in ["ods", "dwd", "dws", "meta"]:
with src.cursor() as cur:
cur.execute("SELECT tablename FROM pg_tables WHERE schemaname=%s ORDER BY 1", (schema,))
tables = [r[0] for r in cur.fetchall()]
for t in tables:
s = count(src, schema, t)
if s == 0:
continue
d = count(dst, schema, t)
tag = "OK" if d == s else "FAIL"
if tag == "FAIL":
ok = False
print(f" {tag:4s} {schema}.{t}: src={s} dst={d}")
# 物化视图
with dst.cursor() as cur:
cur.execute("SELECT matviewname FROM pg_matviews WHERE schemaname='dws' ORDER BY 1")
mv_names = [r[0] for r in cur.fetchall()]
print(f"\n 物化视图: {len(mv_names)}")
for n in mv_names:
r = count(dst, "dws", n)
print(f" {n}: {r}")
print(f"\n{'='*50}")
print("全部通过" if ok else "存在不一致")
src.close()
dst.close()
if __name__ == "__main__":
main()