Files
Neo-ZQYY/scripts/ops/fix_bc_dates.py

87 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""BUG 12 存量修复:扫描 DWD 所有表的 timestamptz 列,将 BC 日期(< 0002-01-01修复为 NULL。
根因:上游 API 用 0001-01-01T00:00:00 表示"未设置"ODS 存为 timestamp
DWD 隐式转为 timestamptz 时在 Asia/Shanghai 时区下变成 BC 日期,
psycopg2 无法解析导致 fetchall() 崩溃。
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
PG_DSN = os.environ.get("PG_DSN")
if not PG_DSN:
print("ERROR: PG_DSN 未配置", file=sys.stderr)
sys.exit(1)
import psycopg2
from psycopg2.extras import RealDictCursor
SENTINEL = "0002-01-01"
SCHEMA = "dwd"
def main():
conn = psycopg2.connect(PG_DSN)
conn.autocommit = False
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# 查找所有 timestamptz 列
cur.execute("""
SELECT t.table_name, c.column_name
FROM information_schema.tables t
JOIN information_schema.columns c
ON t.table_schema = c.table_schema AND t.table_name = c.table_name
WHERE t.table_schema = %s
AND t.table_type = 'BASE TABLE'
AND c.data_type = 'timestamp with time zone'
ORDER BY t.table_name, c.ordinal_position
""", (SCHEMA,))
cols = cur.fetchall()
total_fixed = 0
for row in cols:
tbl = row["table_name"]
col = row["column_name"]
# psycopg2 执行含 BC 日期的 UPDATE 可能在内部触发解析错误,
# 用 server-side DO 块绕过客户端解析
sql = (
f'UPDATE "{SCHEMA}"."{tbl}" '
f'SET "{col}" = NULL '
f"WHERE EXTRACT(year FROM \"{col}\") < 1"
)
try:
cur.execute(sql)
cnt = cur.rowcount
if cnt > 0:
print(f" FIXED: {SCHEMA}.{tbl}.{col}{cnt}")
total_fixed += cnt
except Exception as e:
conn.rollback()
# 回退后用 text cast 方式重试
print(f" WARN: {SCHEMA}.{tbl}.{col} — EXTRACT 失败({e}),用 text 方式重试")
sql2 = (
f'UPDATE "{SCHEMA}"."{tbl}" '
f'SET "{col}" = NULL '
f"WHERE \"{col}\"::text LIKE '%BC%'"
)
cur.execute(sql2)
cnt = cur.rowcount
if cnt > 0:
print(f" FIXED: {SCHEMA}.{tbl}.{col}{cnt} 行 (text 方式)")
total_fixed += cnt
conn.commit()
print(f"\n完成:共修复 {total_fixed}")
except Exception:
conn.rollback()
raise
finally:
conn.close()
if __name__ == "__main__":
main()