Files
ZQYY.FQ-ETL/scripts/repair/tune_integrity_indexes.py

232 lines
7.1 KiB
Python

# -*- coding: utf-8 -*-
"""Create performance indexes for integrity verification and run ANALYZE.
Usage:
python -m scripts.tune_integrity_indexes
python -m scripts.tune_integrity_indexes --dry-run
"""
from __future__ import annotations
import argparse
import hashlib
from dataclasses import dataclass
from typing import Dict, List, Sequence, Set, Tuple
import psycopg2
from psycopg2 import sql
from config.settings import AppConfig
TIME_CANDIDATES = (
"pay_time",
"create_time",
"start_use_time",
"scd2_start_time",
"calc_time",
"order_date",
"fetched_at",
)
@dataclass(frozen=True)
class IndexPlan:
schema: str
table: str
index_name: str
columns: Tuple[str, ...]
def _short_index_name(table: str, tag: str, columns: Sequence[str]) -> str:
raw = f"idx_{table}_{tag}_{'_'.join(columns)}"
if len(raw) <= 63:
return raw
digest = hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
shortened = f"idx_{table}_{tag}_{digest}"
return shortened[:63]
def _load_table_columns(cur, schema: str, table: str) -> Set[str]:
cur.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
""",
(schema, table),
)
return {r[0] for r in cur.fetchall()}
def _load_pk_columns(cur, schema: str, table: str) -> List[str]:
cur.execute(
"""
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
AND tc.table_name = kcu.table_name
WHERE tc.table_schema = %s
AND tc.table_name = %s
AND tc.constraint_type = 'PRIMARY KEY'
ORDER BY kcu.ordinal_position
""",
(schema, table),
)
return [r[0] for r in cur.fetchall()]
def _load_tables(cur, schema: str) -> List[str]:
cur.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = %s
AND table_type = 'BASE TABLE'
ORDER BY table_name
""",
(schema,),
)
return [r[0] for r in cur.fetchall()]
def _plan_indexes(cur, schema: str, table: str) -> List[IndexPlan]:
plans: List[IndexPlan] = []
cols = _load_table_columns(cur, schema, table)
pk_cols = _load_pk_columns(cur, schema, table)
if schema == "billiards_ods":
if "fetched_at" in cols:
plans.append(
IndexPlan(
schema=schema,
table=table,
index_name=_short_index_name(table, "fetched_at", ("fetched_at",)),
columns=("fetched_at",),
)
)
if pk_cols and len(pk_cols) <= 3 and all(c in cols for c in pk_cols):
comp_cols = ("fetched_at", *pk_cols)
plans.append(
IndexPlan(
schema=schema,
table=table,
index_name=_short_index_name(table, "fetched_pk", comp_cols),
columns=comp_cols,
)
)
if schema == "billiards_dwd":
if pk_cols and "scd2_is_current" in cols and len(pk_cols) <= 4:
comp_cols = (*pk_cols, "scd2_is_current")
plans.append(
IndexPlan(
schema=schema,
table=table,
index_name=_short_index_name(table, "pk_current", comp_cols),
columns=comp_cols,
)
)
for tcol in TIME_CANDIDATES:
if tcol in cols:
plans.append(
IndexPlan(
schema=schema,
table=table,
index_name=_short_index_name(table, "time", (tcol,)),
columns=(tcol,),
)
)
if pk_cols and len(pk_cols) <= 3 and all(c in cols for c in pk_cols):
comp_cols = (tcol, *pk_cols)
plans.append(
IndexPlan(
schema=schema,
table=table,
index_name=_short_index_name(table, "time_pk", comp_cols),
columns=comp_cols,
)
)
# 按索引名去重
dedup: Dict[str, IndexPlan] = {}
for p in plans:
dedup[p.index_name] = p
return list(dedup.values())
def _create_index(cur, plan: IndexPlan) -> None:
stmt = sql.SQL("CREATE INDEX IF NOT EXISTS {idx} ON {sch}.{tbl} ({cols})").format(
idx=sql.Identifier(plan.index_name),
sch=sql.Identifier(plan.schema),
tbl=sql.Identifier(plan.table),
cols=sql.SQL(", ").join(sql.Identifier(c) for c in plan.columns),
)
cur.execute(stmt)
def _analyze_table(cur, schema: str, table: str) -> None:
stmt = sql.SQL("ANALYZE {sch}.{tbl}").format(
sch=sql.Identifier(schema),
tbl=sql.Identifier(table),
)
cur.execute(stmt)
def main() -> int:
ap = argparse.ArgumentParser(description="Tune indexes for integrity verification.")
ap.add_argument("--dry-run", action="store_true", help="Print planned SQL only.")
ap.add_argument(
"--skip-analyze",
action="store_true",
help="Create indexes but skip ANALYZE.",
)
args = ap.parse_args()
cfg = AppConfig.load({})
dsn = cfg.get("db.dsn")
timeout_sec = int(cfg.get("db.connect_timeout_sec", 10) or 10)
with psycopg2.connect(dsn, connect_timeout=timeout_sec) as conn:
conn.autocommit = False
with conn.cursor() as cur:
all_plans: List[IndexPlan] = []
for schema in ("billiards_ods", "billiards_dwd"):
for table in _load_tables(cur, schema):
all_plans.extend(_plan_indexes(cur, schema, table))
touched_tables: Set[Tuple[str, str]] = set()
print(f"planned indexes: {len(all_plans)}")
for plan in all_plans:
cols = ", ".join(plan.columns)
print(f"[INDEX] {plan.schema}.{plan.table} ({cols}) -> {plan.index_name}")
if not args.dry_run:
_create_index(cur, plan)
touched_tables.add((plan.schema, plan.table))
if not args.skip_analyze:
if args.dry_run:
for schema, table in sorted({(p.schema, p.table) for p in all_plans}):
print(f"[ANALYZE] {schema}.{table}")
else:
for schema, table in sorted(touched_tables):
_analyze_table(cur, schema, table)
print(f"[ANALYZE] {schema}.{table}")
if args.dry_run:
conn.rollback()
print("dry-run complete; transaction rolled back")
else:
conn.commit()
print("index tuning complete")
return 0
if __name__ == "__main__":
raise SystemExit(main())