Files
Neo-ZQYY/scripts/ops/field_disappearance_scan.py
2026-03-15 10:15:02 +08:00

190 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
字段消失扫描器:检测 DWD 表中字段值从某天起突然全部为空的异常
判定条件:连续 ≥3 天 且 连续空记录 ≥20 条
报告类型:
- ONGOING从某天起至今持续为空如 DQ-6 member_phone
- RECOVERED中途消失后又恢复
输出:终端 + CSV → export/SYSTEM/REPORTS/field_scan/
"""
import os
import csv
from datetime import date, timedelta
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
if not PG_DSN:
raise RuntimeError("TEST_DB_DSN / PG_DSN 未配置")
SYSTEM_ANALYZE_ROOT = os.environ.get("SYSTEM_ANALYZE_ROOT")
if not SYSTEM_ANALYZE_ROOT:
raise RuntimeError("SYSTEM_ANALYZE_ROOT 未配置")
import psycopg2
# ── 扫描配置 ──────────────────────────────────────────────
# (schema.table, time_column, field, filter_sql)
# filter_sql 用于限定有意义的行(如只看会员订单)
SCAN_TARGETS = [
("dwd.dwd_settlement_head", "pay_time", "member_phone",
"settle_type IN (1,3) AND member_id IS NOT NULL AND member_id != 0"),
("dwd.dwd_settlement_head", "pay_time", "member_name",
"settle_type IN (1,3) AND member_id IS NOT NULL AND member_id != 0"),
("dwd.dwd_settlement_head", "pay_time", "member_card_type_name",
"settle_type IN (1,3) AND member_id IS NOT NULL AND member_id != 0"),
("dwd.dwd_settlement_head", "pay_time", "is_bind_member",
"settle_type IN (1,3) AND member_id IS NOT NULL AND member_id != 0"),
]
# 阈值
MIN_CONSECUTIVE_DAYS = 3
MIN_CONSECUTIVE_ROWS = 20
@dataclass
class Gap:
"""一段字段消失区间"""
table: str
field: str
start_date: date
end_date: date # 最后一个空日期
days: int
total_rows: int # 区间内总行数
null_rows: int # 区间内空行数
recovered: bool # 后面是否恢复了
def build_daily_sql(table: str, time_col: str, field: str, filter_sql: str) -> str:
"""生成按天统计非空率的 SQL直接分组不用 generate_series"""
where = f"WHERE {filter_sql}" if filter_sql else ""
return f"""
SELECT
{time_col}::date AS day,
COUNT(*) AS total,
COUNT(CASE
WHEN {field} IS NOT NULL
AND {field}::text != ''
AND {field}::text != '0'
THEN 1
END) AS non_null
FROM {table}
{where}
GROUP BY {time_col}::date
HAVING COUNT(*) > 0
ORDER BY day
"""
def detect_gaps(daily_stats: list[tuple[date, int, int]],
table: str, field: str) -> list[Gap]:
"""从每日统计中检测连续全空段"""
gaps = []
in_gap = False
gap_start = None
gap_rows = 0
gap_null = 0
gap_days = 0
for day, total, non_null in daily_stats:
is_empty = (non_null == 0)
if is_empty:
if not in_gap:
in_gap = True
gap_start = day
gap_rows = 0
gap_null = 0
gap_days = 0
gap_days += 1
gap_rows += total
gap_null += total
else:
if in_gap:
# 空段结束,检查是否达到阈值
if gap_days >= MIN_CONSECUTIVE_DAYS and gap_null >= MIN_CONSECUTIVE_ROWS:
gaps.append(Gap(
table=table, field=field,
start_date=gap_start,
end_date=day - timedelta(days=1),
days=gap_days, total_rows=gap_rows,
null_rows=gap_null, recovered=True
))
in_gap = False
# 如果到最后仍在空段中
if in_gap and gap_days >= MIN_CONSECUTIVE_DAYS and gap_null >= MIN_CONSECUTIVE_ROWS:
last_day = daily_stats[-1][0]
gaps.append(Gap(
table=table, field=field,
start_date=gap_start,
end_date=last_day,
days=gap_days, total_rows=gap_rows,
null_rows=gap_null, recovered=False
))
return gaps
def run_scan():
all_gaps: list[Gap] = []
with psycopg2.connect(PG_DSN, connect_timeout=15,
options="-c statement_timeout=120000") as conn:
with conn.cursor() as cur:
for table, time_col, field, filter_sql in SCAN_TARGETS:
print(f"扫描 {table}.{field} ...")
sql = build_daily_sql(table, time_col, field, filter_sql)
cur.execute(sql)
rows = cur.fetchall()
if not rows:
print(f" ⏭️ 无数据")
continue
gaps = detect_gaps(rows, table, field)
if gaps:
for g in gaps:
status = "🔴 ONGOING" if not g.recovered else "🟡 RECOVERED"
print(f" {status} {g.field}: {g.start_date}{g.end_date} "
f"({g.days}天, {g.null_rows}条全空)")
all_gaps.extend(gaps)
else:
print(f" ✅ 无异常")
# 输出报告
if not all_gaps:
print("\n✅ 所有字段正常,未发现消失段")
return
report_dir = os.path.join(os.path.dirname(SYSTEM_ANALYZE_ROOT), "field_scan")
os.makedirs(report_dir, exist_ok=True)
csv_path = os.path.join(report_dir, "field_disappearance_report.csv")
with open(csv_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["", "字段", "状态", "消失起始日", "消失结束日",
"持续天数", "区间总行数", "空行数"])
for g in all_gaps:
writer.writerow([
g.table, g.field,
"ONGOING" if not g.recovered else "RECOVERED",
g.start_date, g.end_date,
g.days, g.total_rows, g.null_rows
])
print(f"\n📊 发现 {len(all_gaps)} 个字段消失段")
print(f" 报告已生成: {csv_path}")
# 终端汇总
print(f"\n{'='*90}")
print(f"{'':<35} {'字段':<20} {'状态':<12} {'起始':<12} {'结束':<12} {'天数':>5} {'空行':>6}")
print(f"{'='*90}")
for g in all_gaps:
status = "ONGOING" if not g.recovered else "RECOVERED"
print(f"{g.table:<35} {g.field:<20} {status:<12} "
f"{str(g.start_date):<12} {str(g.end_date):<12} {g.days:>5} {g.null_rows:>6}")
if __name__ == "__main__":
run_scan()