346 lines
13 KiB
Python
346 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
字段级数据质量采样分析报告(v2 - 性能优化版)
|
||
|
||
策略:每张表只执行 1~2 条 SQL(而非逐字段查询),大幅减少网络往返。
|
||
- 用 information_schema 获取列元数据
|
||
- 用动态 SQL 一次性获取所有列的 NULL 计数
|
||
- 数值/日期/文本统计用单条聚合 SQL
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
|
||
|
||
ETL_REPORT_ROOT = os.environ.get("ETL_REPORT_ROOT")
|
||
if not ETL_REPORT_ROOT:
|
||
raise RuntimeError("ETL_REPORT_ROOT 未在 .env 中定义")
|
||
|
||
PG_DSN = os.environ.get("PG_DSN")
|
||
if not PG_DSN:
|
||
raise RuntimeError("PG_DSN 未在 .env 中定义")
|
||
|
||
TARGET_SCHEMAS = ["ods", "dwd", "dws"]
|
||
# 跳过这些列的详细统计(ETL 元数据列,不影响业务判断)
|
||
SKIP_STATS_COLS = {"payload", "content_hash", "record_index", "source_file", "source_endpoint"}
|
||
|
||
|
||
def get_conn():
|
||
conn = psycopg2.connect(PG_DSN, cursor_factory=psycopg2.extras.RealDictCursor)
|
||
conn.set_session(readonly=True)
|
||
return conn
|
||
|
||
|
||
def list_tables(conn, schema: str) -> list[str]:
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT table_name FROM information_schema.tables
|
||
WHERE table_schema = %s AND table_type = 'BASE TABLE'
|
||
ORDER BY table_name
|
||
""", (schema,))
|
||
return [r["table_name"] for r in cur.fetchall()]
|
||
|
||
|
||
def get_columns_meta(conn, schema: str, table: str) -> list[dict]:
|
||
with conn.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT column_name, udt_name, is_nullable,
|
||
character_maximum_length, numeric_precision, numeric_scale
|
||
FROM information_schema.columns
|
||
WHERE table_schema = %s AND table_name = %s
|
||
ORDER BY ordinal_position
|
||
""", (schema, table))
|
||
return [dict(r) for r in cur.fetchall()]
|
||
|
||
|
||
def analyze_table_batch(conn, schema: str, table: str, columns: list[dict]) -> dict:
|
||
"""用尽量少的 SQL 批量分析一张表的所有字段。
|
||
|
||
核心思路:构造一条 SELECT,每个列生成若干聚合表达式,一次性拿到所有统计。
|
||
"""
|
||
with conn.cursor() as cur:
|
||
# 1) 行数
|
||
try:
|
||
cur.execute(f"SELECT COUNT(*) AS cnt FROM {schema}.{table}")
|
||
total = cur.fetchone()["cnt"]
|
||
except Exception:
|
||
conn.rollback()
|
||
return {"table": f"{schema}.{table}", "total_rows": -1, "columns": [], "error": "无法读取"}
|
||
|
||
if total == 0:
|
||
return {
|
||
"table": f"{schema}.{table}",
|
||
"total_rows": 0,
|
||
"column_count": len(columns),
|
||
"columns": [
|
||
{"column": c["column_name"], "type": c["udt_name"], "total": 0,
|
||
"null_count": 0, "null_pct": "0%", "distinct": 0, "notes": "空表"}
|
||
for c in columns
|
||
],
|
||
}
|
||
|
||
# 2) 构造批量聚合 SQL
|
||
# 对每个列生成: COUNT(*) FILTER (WHERE "col" IS NULL) AS null_col
|
||
# 对数值列: MIN/MAX/AVG
|
||
# 对日期列: MIN/MAX
|
||
# 对文本列: MIN(LENGTH)/MAX(LENGTH)
|
||
# 对 bool 列: COUNT FILTER TRUE/FALSE
|
||
select_parts = [f"{total} AS _total"]
|
||
col_plan = [] # 记录每列的统计计划
|
||
|
||
for c in columns:
|
||
cname = c["column_name"]
|
||
udt = c["udt_name"]
|
||
safe = f'"{cname}"'
|
||
alias_base = cname.replace(" ", "_").replace("-", "_")
|
||
|
||
plan = {"column": cname, "type": udt, "stats": []}
|
||
|
||
# NULL 计数(所有列都做)
|
||
select_parts.append(f"COUNT(*) FILTER (WHERE {safe} IS NULL) AS null_{alias_base}")
|
||
plan["stats"].append("null")
|
||
|
||
# 跳过 JSONB/bytea/ETL 元数据列的详细统计
|
||
if udt in ("jsonb", "json", "bytea") or cname in SKIP_STATS_COLS:
|
||
col_plan.append(plan)
|
||
continue
|
||
|
||
if udt in ("int2", "int4", "int8", "float4", "float8", "numeric"):
|
||
select_parts.append(f"MIN({safe}) AS min_{alias_base}")
|
||
select_parts.append(f"MAX({safe}) AS max_{alias_base}")
|
||
select_parts.append(f"ROUND(AVG({safe})::numeric, 2) AS avg_{alias_base}")
|
||
plan["stats"].extend(["min", "max", "avg"])
|
||
elif udt in ("date", "timestamp", "timestamptz"):
|
||
# 用 text 比较避免 psycopg2 解析 year<1 的异常日期
|
||
select_parts.append(f"MIN({safe}::text) FILTER (WHERE {safe}::text >= '0001') AS min_{alias_base}")
|
||
select_parts.append(f"MAX({safe}::text) FILTER (WHERE {safe}::text <= '9999') AS max_{alias_base}")
|
||
plan["stats"].extend(["earliest", "latest"])
|
||
elif udt in ("text", "varchar", "bpchar", "name"):
|
||
select_parts.append(f"MIN(LENGTH({safe})) AS minlen_{alias_base}")
|
||
select_parts.append(f"MAX(LENGTH({safe})) AS maxlen_{alias_base}")
|
||
plan["stats"].extend(["min_len", "max_len"])
|
||
elif udt == "bool":
|
||
select_parts.append(f"COUNT(*) FILTER (WHERE {safe} = TRUE) AS true_{alias_base}")
|
||
select_parts.append(f"COUNT(*) FILTER (WHERE {safe} = FALSE) AS false_{alias_base}")
|
||
plan["stats"].extend(["true_count", "false_count"])
|
||
|
||
col_plan.append(plan)
|
||
|
||
# 执行批量聚合
|
||
sql = f"SELECT {', '.join(select_parts)} FROM {schema}.{table}"
|
||
try:
|
||
cur.execute(sql)
|
||
agg = cur.fetchone()
|
||
except Exception as e:
|
||
conn.rollback()
|
||
return {
|
||
"table": f"{schema}.{table}",
|
||
"total_rows": total,
|
||
"column_count": len(columns),
|
||
"columns": [],
|
||
"error": f"聚合查询失败: {str(e)[:120]}",
|
||
}
|
||
|
||
# 3) 解析结果
|
||
results = []
|
||
for plan in col_plan:
|
||
cname = plan["column"]
|
||
udt = plan["type"]
|
||
alias_base = cname.replace(" ", "_").replace("-", "_")
|
||
|
||
null_cnt = agg.get(f"null_{alias_base}", 0) or 0
|
||
null_pct = round(null_cnt / total * 100, 1) if total > 0 else 0
|
||
|
||
r = {
|
||
"column": cname,
|
||
"type": udt,
|
||
"total": total,
|
||
"null_count": null_cnt,
|
||
"null_pct": f"{null_pct}%",
|
||
}
|
||
|
||
if udt in ("jsonb", "json", "bytea"):
|
||
r["samples"] = [f"({udt.upper()})"]
|
||
results.append(r)
|
||
continue
|
||
if cname in SKIP_STATS_COLS:
|
||
r["samples"] = ["(ETL元数据)"]
|
||
results.append(r)
|
||
continue
|
||
|
||
if "min" in plan["stats"]:
|
||
r["min"] = agg.get(f"min_{alias_base}")
|
||
r["max"] = agg.get(f"max_{alias_base}")
|
||
r["avg"] = agg.get(f"avg_{alias_base}")
|
||
if "earliest" in plan["stats"]:
|
||
v = agg.get(f"min_{alias_base}")
|
||
r["earliest"] = str(v) if v else None
|
||
v = agg.get(f"max_{alias_base}")
|
||
r["latest"] = str(v) if v else None
|
||
if "min_len" in plan["stats"]:
|
||
r["min_len"] = agg.get(f"minlen_{alias_base}")
|
||
r["max_len"] = agg.get(f"maxlen_{alias_base}")
|
||
if "true_count" in plan["stats"]:
|
||
r["true_count"] = agg.get(f"true_{alias_base}")
|
||
r["false_count"] = agg.get(f"false_{alias_base}")
|
||
|
||
results.append(r)
|
||
|
||
# 4) 对非大表补充 distinct 计数(小表逐列,大表跳过)
|
||
if total <= 3000:
|
||
for r in results:
|
||
cname = r["column"]
|
||
udt = r["type"]
|
||
if udt in ("jsonb", "json", "bytea") or cname in SKIP_STATS_COLS:
|
||
r["distinct"] = "-"
|
||
continue
|
||
try:
|
||
cur.execute(f'SELECT COUNT(DISTINCT "{cname}") AS d FROM {schema}.{table}')
|
||
r["distinct"] = cur.fetchone()["d"]
|
||
except Exception:
|
||
conn.rollback()
|
||
r["distinct"] = "?"
|
||
else:
|
||
for r in results:
|
||
r["distinct"] = "-"
|
||
|
||
return {
|
||
"table": f"{schema}.{table}",
|
||
"total_rows": total,
|
||
"column_count": len(columns),
|
||
"columns": results,
|
||
}
|
||
|
||
|
||
# ── 报告格式化 ────────────────────────────────────────────────
|
||
|
||
def fmt_col_row(c: dict) -> str:
|
||
"""格式化单个字段为 Markdown 表格行"""
|
||
col = c.get("column", "?")
|
||
typ = c.get("type", "?")
|
||
null_pct = c.get("null_pct", "?")
|
||
distinct = c.get("distinct", "-")
|
||
|
||
stats_parts = []
|
||
if "min" in c and c["min"] is not None:
|
||
stats_parts.append(f"min={c['min']}, max={c['max']}, avg={c['avg']}")
|
||
if "earliest" in c and c["earliest"] is not None:
|
||
stats_parts.append(f"{c['earliest']} ~ {c['latest']}")
|
||
if "min_len" in c and c["min_len"] is not None:
|
||
stats_parts.append(f"len={c['min_len']}~{c['max_len']}")
|
||
if "true_count" in c:
|
||
stats_parts.append(f"T={c['true_count']}, F={c['false_count']}")
|
||
stats = "; ".join(stats_parts) if stats_parts else "-"
|
||
|
||
samples = c.get("samples", [])
|
||
sample_str = ", ".join(str(s)[:40] for s in samples[:3]) if samples else "-"
|
||
|
||
return f"| {col} | {typ} | {null_pct} | {distinct} | {stats} | {sample_str} |"
|
||
|
||
|
||
def generate_report(all_results: dict[str, list[dict]]) -> str:
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
lines = [
|
||
f"# 字段级数据质量采样报告",
|
||
f"",
|
||
f"生成时间: {ts}",
|
||
f"",
|
||
]
|
||
|
||
for schema in TARGET_SCHEMAS:
|
||
tables = all_results.get(schema, [])
|
||
if not tables:
|
||
continue
|
||
|
||
total_rows_sum = sum(t["total_rows"] for t in tables if t["total_rows"] > 0)
|
||
lines.append(f"## {schema.upper()} 层({len(tables)} 张表,共 {total_rows_sum:,} 行)")
|
||
lines.append("")
|
||
|
||
for tbl in tables:
|
||
tname = tbl["table"]
|
||
total = tbl["total_rows"]
|
||
col_count = tbl.get("column_count", 0)
|
||
|
||
lines.append(f"### {tname}({total:,} 行,{col_count} 列)")
|
||
lines.append("")
|
||
|
||
if tbl.get("error"):
|
||
lines.append(f"> ❌ {tbl['error']}")
|
||
lines.append("")
|
||
continue
|
||
|
||
if not tbl["columns"]:
|
||
lines.append("> 无列信息")
|
||
lines.append("")
|
||
continue
|
||
|
||
lines.append("| 字段 | 类型 | NULL率 | 唯一值 | 统计 | 样本 |")
|
||
lines.append("|------|------|--------|--------|------|------|")
|
||
|
||
for col in tbl["columns"]:
|
||
lines.append(fmt_col_row(col))
|
||
|
||
lines.append("")
|
||
|
||
total_tables = sum(len(v) for v in all_results.values())
|
||
total_cols = sum(
|
||
tbl.get("column_count", 0)
|
||
for tables in all_results.values()
|
||
for tbl in tables
|
||
)
|
||
lines.append("## 汇总")
|
||
lines.append("")
|
||
lines.append(f"- 分析表数: {total_tables}")
|
||
lines.append(f"- 分析字段数: {total_cols}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main():
|
||
print("=== 字段级数据质量采样分析 (v2) ===")
|
||
conn = get_conn()
|
||
|
||
all_results: dict[str, list[dict]] = {}
|
||
|
||
for schema in TARGET_SCHEMAS:
|
||
print(f"\n分析 {schema} 层...")
|
||
tables = list_tables(conn, schema)
|
||
print(f" {len(tables)} 张表")
|
||
schema_results = []
|
||
for i, t in enumerate(tables, 1):
|
||
cols = get_columns_meta(conn, schema, t)
|
||
print(f" [{i}/{len(tables)}] {schema}.{t} ({len(cols)} 列)...", end="", flush=True)
|
||
result = analyze_table_batch(conn, schema, t, cols)
|
||
schema_results.append(result)
|
||
print(f" {result['total_rows']:,} 行", end="")
|
||
if result.get("error"):
|
||
print(f" ❌ {result['error'][:60]}")
|
||
else:
|
||
print(" ✓")
|
||
all_results[schema] = schema_results
|
||
|
||
conn.close()
|
||
|
||
print("\n生成报告...")
|
||
report = generate_report(all_results)
|
||
|
||
out_dir = Path(ETL_REPORT_ROOT)
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
out_path = out_dir / f"field_level_report_{ts}.md"
|
||
out_path.write_text(report, encoding="utf-8")
|
||
print(f"报告已生成: {out_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|