232 lines
7.1 KiB
Python
232 lines
7.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Create performance indexes for integrity verification and run ANALYZE.
|
|
|
|
Usage:
|
|
python -m scripts.tune_integrity_indexes
|
|
python -m scripts.tune_integrity_indexes --dry-run
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Sequence, Set, Tuple
|
|
|
|
import psycopg2
|
|
from psycopg2 import sql
|
|
|
|
from config.settings import AppConfig
|
|
|
|
|
|
TIME_CANDIDATES = (
|
|
"pay_time",
|
|
"create_time",
|
|
"start_use_time",
|
|
"scd2_start_time",
|
|
"calc_time",
|
|
"order_date",
|
|
"fetched_at",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class IndexPlan:
|
|
schema: str
|
|
table: str
|
|
index_name: str
|
|
columns: Tuple[str, ...]
|
|
|
|
|
|
def _short_index_name(table: str, tag: str, columns: Sequence[str]) -> str:
|
|
raw = f"idx_{table}_{tag}_{'_'.join(columns)}"
|
|
if len(raw) <= 63:
|
|
return raw
|
|
digest = hashlib.md5(raw.encode("utf-8")).hexdigest()[:8]
|
|
shortened = f"idx_{table}_{tag}_{digest}"
|
|
return shortened[:63]
|
|
|
|
|
|
def _load_table_columns(cur, schema: str, table: str) -> Set[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s
|
|
""",
|
|
(schema, table),
|
|
)
|
|
return {r[0] for r in cur.fetchall()}
|
|
|
|
|
|
def _load_pk_columns(cur, schema: str, table: str) -> List[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT kcu.column_name
|
|
FROM information_schema.table_constraints tc
|
|
JOIN information_schema.key_column_usage kcu
|
|
ON tc.constraint_name = kcu.constraint_name
|
|
AND tc.table_schema = kcu.table_schema
|
|
AND tc.table_name = kcu.table_name
|
|
WHERE tc.table_schema = %s
|
|
AND tc.table_name = %s
|
|
AND tc.constraint_type = 'PRIMARY KEY'
|
|
ORDER BY kcu.ordinal_position
|
|
""",
|
|
(schema, table),
|
|
)
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
def _load_tables(cur, schema: str) -> List[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = %s
|
|
AND table_type = 'BASE TABLE'
|
|
ORDER BY table_name
|
|
""",
|
|
(schema,),
|
|
)
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
def _plan_indexes(cur, schema: str, table: str) -> List[IndexPlan]:
|
|
plans: List[IndexPlan] = []
|
|
cols = _load_table_columns(cur, schema, table)
|
|
pk_cols = _load_pk_columns(cur, schema, table)
|
|
|
|
if schema == "billiards_ods":
|
|
if "fetched_at" in cols:
|
|
plans.append(
|
|
IndexPlan(
|
|
schema=schema,
|
|
table=table,
|
|
index_name=_short_index_name(table, "fetched_at", ("fetched_at",)),
|
|
columns=("fetched_at",),
|
|
)
|
|
)
|
|
if pk_cols and len(pk_cols) <= 3 and all(c in cols for c in pk_cols):
|
|
comp_cols = ("fetched_at", *pk_cols)
|
|
plans.append(
|
|
IndexPlan(
|
|
schema=schema,
|
|
table=table,
|
|
index_name=_short_index_name(table, "fetched_pk", comp_cols),
|
|
columns=comp_cols,
|
|
)
|
|
)
|
|
|
|
if schema == "billiards_dwd":
|
|
if pk_cols and "scd2_is_current" in cols and len(pk_cols) <= 4:
|
|
comp_cols = (*pk_cols, "scd2_is_current")
|
|
plans.append(
|
|
IndexPlan(
|
|
schema=schema,
|
|
table=table,
|
|
index_name=_short_index_name(table, "pk_current", comp_cols),
|
|
columns=comp_cols,
|
|
)
|
|
)
|
|
|
|
for tcol in TIME_CANDIDATES:
|
|
if tcol in cols:
|
|
plans.append(
|
|
IndexPlan(
|
|
schema=schema,
|
|
table=table,
|
|
index_name=_short_index_name(table, "time", (tcol,)),
|
|
columns=(tcol,),
|
|
)
|
|
)
|
|
if pk_cols and len(pk_cols) <= 3 and all(c in cols for c in pk_cols):
|
|
comp_cols = (tcol, *pk_cols)
|
|
plans.append(
|
|
IndexPlan(
|
|
schema=schema,
|
|
table=table,
|
|
index_name=_short_index_name(table, "time_pk", comp_cols),
|
|
columns=comp_cols,
|
|
)
|
|
)
|
|
|
|
# 按索引名去重
|
|
dedup: Dict[str, IndexPlan] = {}
|
|
for p in plans:
|
|
dedup[p.index_name] = p
|
|
return list(dedup.values())
|
|
|
|
|
|
def _create_index(cur, plan: IndexPlan) -> None:
|
|
stmt = sql.SQL("CREATE INDEX IF NOT EXISTS {idx} ON {sch}.{tbl} ({cols})").format(
|
|
idx=sql.Identifier(plan.index_name),
|
|
sch=sql.Identifier(plan.schema),
|
|
tbl=sql.Identifier(plan.table),
|
|
cols=sql.SQL(", ").join(sql.Identifier(c) for c in plan.columns),
|
|
)
|
|
cur.execute(stmt)
|
|
|
|
|
|
def _analyze_table(cur, schema: str, table: str) -> None:
|
|
stmt = sql.SQL("ANALYZE {sch}.{tbl}").format(
|
|
sch=sql.Identifier(schema),
|
|
tbl=sql.Identifier(table),
|
|
)
|
|
cur.execute(stmt)
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="Tune indexes for integrity verification.")
|
|
ap.add_argument("--dry-run", action="store_true", help="Print planned SQL only.")
|
|
ap.add_argument(
|
|
"--skip-analyze",
|
|
action="store_true",
|
|
help="Create indexes but skip ANALYZE.",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
cfg = AppConfig.load({})
|
|
dsn = cfg.get("db.dsn")
|
|
timeout_sec = int(cfg.get("db.connect_timeout_sec", 10) or 10)
|
|
|
|
with psycopg2.connect(dsn, connect_timeout=timeout_sec) as conn:
|
|
conn.autocommit = False
|
|
with conn.cursor() as cur:
|
|
all_plans: List[IndexPlan] = []
|
|
for schema in ("billiards_ods", "billiards_dwd"):
|
|
for table in _load_tables(cur, schema):
|
|
all_plans.extend(_plan_indexes(cur, schema, table))
|
|
|
|
touched_tables: Set[Tuple[str, str]] = set()
|
|
print(f"planned indexes: {len(all_plans)}")
|
|
for plan in all_plans:
|
|
cols = ", ".join(plan.columns)
|
|
print(f"[INDEX] {plan.schema}.{plan.table} ({cols}) -> {plan.index_name}")
|
|
if not args.dry_run:
|
|
_create_index(cur, plan)
|
|
touched_tables.add((plan.schema, plan.table))
|
|
|
|
if not args.skip_analyze:
|
|
if args.dry_run:
|
|
for schema, table in sorted({(p.schema, p.table) for p in all_plans}):
|
|
print(f"[ANALYZE] {schema}.{table}")
|
|
else:
|
|
for schema, table in sorted(touched_tables):
|
|
_analyze_table(cur, schema, table)
|
|
print(f"[ANALYZE] {schema}.{table}")
|
|
|
|
if args.dry_run:
|
|
conn.rollback()
|
|
print("dry-run complete; transaction rolled back")
|
|
else:
|
|
conn.commit()
|
|
print("index tuning complete")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|