Files
Neo-ZQYY/scripts/ops/_fix_ods_staff_info.py

83 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
查询 meta.etl_task 表结构和现有 ODS 行,然后 INSERT ODS_STAFF_INFO。
"""
import os, sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
PG_DSN = os.environ.get("PG_DSN")
if not PG_DSN:
raise RuntimeError("PG_DSN 未设置")
import psycopg2
from psycopg2.extras import RealDictCursor
conn = psycopg2.connect(PG_DSN)
conn.autocommit = True
cur = conn.cursor(cursor_factory=RealDictCursor)
# 1) 查表结构
print("=== meta.etl_task 表结构 ===")
cur.execute("""
SELECT column_name, data_type, column_default, is_nullable
FROM information_schema.columns
WHERE table_schema = 'meta' AND table_name = 'etl_task'
ORDER BY ordinal_position
""")
for row in cur.fetchall():
print(f" {row['column_name']:30s} {row['data_type']:20s} default={row['column_default']} nullable={row['is_nullable']}")
# 2) 查一条现有 ODS 行作为参考
print("\n=== 现有 ODS 行示例LIMIT 2===")
cur.execute("""
SELECT * FROM meta.etl_task
WHERE task_code LIKE 'ODS_%'
ORDER BY task_code
LIMIT 2
""")
for row in cur.fetchall():
for k, v in row.items():
print(f" {k}: {v}")
print(" ---")
# 3) 检查 ODS_STAFF_INFO 是否已存在
cur.execute("SELECT COUNT(*) AS cnt FROM meta.etl_task WHERE task_code = 'ODS_STAFF_INFO'")
cnt = cur.fetchone()['cnt']
print(f"\nODS_STAFF_INFO 现有行数: {cnt}")
if cnt == 0:
# 4) INSERT — 参照现有 ODS 行的格式
print("\n正在 INSERT ODS_STAFF_INFO ...")
cur.execute("""
INSERT INTO meta.etl_task (task_code, store_id, enabled, cursor_field,
window_minutes_default, overlap_seconds, page_size, retry_max, params)
SELECT 'ODS_STAFF_INFO', store_id, TRUE, cursor_field,
window_minutes_default, overlap_seconds, page_size, retry_max, params
FROM meta.etl_task
WHERE task_code LIKE 'ODS_%' AND store_id = 2790685415443269 AND enabled = TRUE
LIMIT 1
RETURNING task_id, task_code, store_id, enabled
""")
inserted = cur.fetchone()
if inserted:
print(f" 已插入: task_id={inserted['task_id']}, task_code={inserted['task_code']}, "
f"store_id={inserted['store_id']}, enabled={inserted['enabled']}")
else:
print(" INSERT 未返回行 — 可能没有匹配的参考行,需手动处理")
else:
print("ODS_STAFF_INFO 已存在,跳过 INSERT")
# 5) 验证
cur.execute("SELECT * FROM meta.etl_task WHERE task_code = 'ODS_STAFF_INFO'")
row = cur.fetchone()
if row:
print("\n=== 验证ODS_STAFF_INFO 当前记录 ===")
for k, v in row.items():
print(f" {k}: {v}")
cur.close()
conn.close()
print("\n完成。")