Files
Neo-ZQYY/scripts/ops/backfill_missing_fields.py
Neo 779b2f6d52 chore: v1 整理 — 清理历史文件、DDL 合并、文档归档
- 清理 1155 个已删除的历史文件(废弃 prompt_logs、tmp、旧 ops 脚本)
- export/ 数据文件从 git 移除(已在 .gitignore)
- demo-miniprogram 从 tmp/ 移入 apps/,添加 CLAUDE.md 注解
- DDL 合并:完整 schema 定义填充到 db/*/schemas/(从 docs/database/ddl/ 复制)
- 39 个 v1 迁移脚本归档到 db/_archived/migrations_v1_merged/
- 4 个迁移变更类 BD_Manual 文档归档到 docs/database/_archived/
- .gitignore 补充 .vite/ 和 apps/*.zip
- settings.json 添加 effortLevel 默认配置
- scripts/ops/ 新增运维脚本入库

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:39:27 +08:00

384 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ETL 缺失字段补充 — 第二阶段:历史数据 backfill + DWS 对比验证
目标库test_etl_feiqiu测试库
执行步骤:
1. ODS 层 backfill从 payload JSON 提取新字段值)
2. DWD 层 backfill从 ODS 已回填字段同步到 DWD
3. 验证 backfill 结果
4. DWS 交叉对比member_profiles 3 个字段)
"""
import os
import sys
import time
from dotenv import load_dotenv
load_dotenv()
PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
if not PG_DSN:
print("ERROR: TEST_DB_DSN / PG_DSN 未配置", file=sys.stderr)
sys.exit(1)
if "test_etl_feiqiu" not in PG_DSN:
print(f"ERROR: DSN 不指向测试库 test_etl_feiqiu: {PG_DSN}", file=sys.stderr)
sys.exit(1)
import psycopg2
def run_sql(conn, label: str, sql: str, *, fetch: bool = False):
"""执行单条 SQL打印结果"""
print(f"\n{'='*60}")
print(f"[{label}]")
print(f"{'='*60}")
t0 = time.time()
try:
with conn.cursor() as cur:
cur.execute(sql)
if fetch:
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
print(f" 列: {cols}")
for r in rows:
print(f" {r}")
print(f"{len(rows)}")
else:
print(f" 影响行数: {cur.rowcount}")
conn.commit()
except Exception as e:
conn.rollback()
print(f" ERROR: {e}")
elapsed = time.time() - t0
print(f" 耗时: {elapsed:.2f}s")
def main():
conn = psycopg2.connect(PG_DSN)
print(f"已连接: {PG_DSN.split('@')[1]}")
# =========================================================================
# 第一步ODS 层 backfill
# =========================================================================
print("\n" + "#"*60)
print("# 第一步ODS 层 backfill")
print("#"*60)
# 1.1 member_profiles — 4 个字段
run_sql(conn, "1.1 ODS member_profiles — 4 字段", """
UPDATE ods.member_profiles SET
other_pay_money_sum = (payload->>'otherPayMoneySum')::numeric(18,2),
last_consume_time = (payload->>'lastConsumeTime')::timestamp,
non_consume_day_num = (payload->>'nonConsumeDayNum')::integer,
first_consumption = (payload->>'firstConsumption')::integer
WHERE payload->>'otherPayMoneySum' IS NOT NULL
AND other_pay_money_sum IS NULL
""")
# 1.2 assistant_service_records — deduct_leave_seconds
run_sql(conn, "1.2a ODS assistant_service_records — deduct_leave_seconds", """
UPDATE ods.assistant_service_records SET
deduct_leave_seconds = COALESCE((payload->>'deductLeaveSeconds')::integer, 0)
WHERE deduct_leave_seconds IS NULL OR deduct_leave_seconds = 0
""")
# 1.2 assistant_service_records — order_from
run_sql(conn, "1.2b ODS assistant_service_records — order_from", """
UPDATE ods.assistant_service_records SET
order_from = (payload->>'orderFrom')::integer
WHERE payload->>'orderFrom' IS NOT NULL
AND order_from IS NULL
""")
# 1.3 store_goods_sales_records — activity_amount + activity_id
run_sql(conn, "1.3a ODS store_goods_sales_records — activity_amount + activity_id", """
UPDATE ods.store_goods_sales_records SET
activity_amount = COALESCE((payload->>'activityAmount')::numeric(18,2), 0),
activity_id = COALESCE((payload->>'activityId')::bigint, 0)
WHERE activity_amount IS NULL OR activity_amount = 0
""")
# 1.3 store_goods_sales_records — order_from
run_sql(conn, "1.3b ODS store_goods_sales_records — order_from", """
UPDATE ods.store_goods_sales_records SET
order_from = (payload->>'orderFrom')::integer
WHERE payload->>'orderFrom' IS NOT NULL
AND order_from IS NULL
""")
# 1.4 goods_stock_summary — createtime
run_sql(conn, "1.4 ODS goods_stock_summary — createtime", """
UPDATE ods.goods_stock_summary SET
createtime = (payload->>'createTime')::timestamp
WHERE payload->>'createTime' IS NOT NULL
AND createtime IS NULL
""")
# 1.5 table_fee_transactions — order_from
run_sql(conn, "1.5 ODS table_fee_transactions — order_from", """
UPDATE ods.table_fee_transactions SET
order_from = (payload->>'orderFrom')::integer
WHERE payload->>'orderFrom' IS NOT NULL
AND order_from IS NULL
""")
# 1.6 settlement_records — 先检查 payload 结构
run_sql(conn, "1.6a ODS settlement_records — 检查 payload 结构", """
SELECT payload->>'orderFrom' AS top_level,
payload->'settleList'->0->>'orderFrom' AS nested
FROM ods.settlement_records LIMIT 3
""", fetch=True)
# 根据检查结果,先尝试顶层路径
run_sql(conn, "1.6b ODS settlement_records — orderfrom (顶层)", """
UPDATE ods.settlement_records SET
orderfrom = (payload->>'orderFrom')::integer
WHERE payload->>'orderFrom' IS NOT NULL
AND orderfrom IS NULL
""")
# =========================================================================
# 第二步DWD 层 backfill
# =========================================================================
print("\n" + "#"*60)
print("# 第二步DWD 层 backfill")
print("#"*60)
# 2.1 dim_member_ex
run_sql(conn, "2.1 DWD dim_member_ex — 4 字段", """
UPDATE dwd.dim_member_ex me SET
other_pay_money_sum = (mp.payload->>'otherPayMoneySum')::numeric(18,2),
last_consume_time = (mp.payload->>'lastConsumeTime')::timestamptz,
non_consume_day_num = (mp.payload->>'nonConsumeDayNum')::integer,
first_consumption = (mp.payload->>'firstConsumption')::integer
FROM (
SELECT DISTINCT ON (id) id, payload
FROM ods.member_profiles
ORDER BY id, fetched_at DESC NULLS LAST
) mp
WHERE me.member_id = mp.id
AND me.scd2_is_current = 1
AND me.other_pay_money_sum IS NULL
""")
# 2.2 dim_member_card_account_ex
run_sql(conn, "2.2 DWD dim_member_card_account_ex — pdassisnatlevel + cxassisnatlevel", """
UPDATE dwd.dim_member_card_account_ex ce SET
pdassisnatlevel = mc.pdassisnatlevel,
cxassisnatlevel = mc.cxassisnatlevel
FROM (
SELECT DISTINCT ON (id) id, pdassisnatlevel, cxassisnatlevel
FROM ods.member_stored_value_cards
ORDER BY id, fetched_at DESC NULLS LAST
) mc
WHERE ce.member_card_id = mc.id
AND ce.scd2_is_current = 1
AND ce.pdassisnatlevel IS NULL
""")
# 2.3 dwd_assistant_service_log_ex
run_sql(conn, "2.3 DWD dwd_assistant_service_log_ex — deduct_leave_seconds + order_from", """
UPDATE dwd.dwd_assistant_service_log_ex de SET
deduct_leave_seconds = COALESCE(ar.deduct_leave_seconds, 0),
order_from = ar.order_from
FROM (
SELECT DISTINCT ON (id) id, deduct_leave_seconds, order_from
FROM ods.assistant_service_records
ORDER BY id, fetched_at DESC NULLS LAST
) ar
WHERE de.assistant_service_id = ar.id
AND de.deduct_leave_seconds IS NULL
""")
# 2.4 dwd_store_goods_sale_ex
run_sql(conn, "2.4 DWD dwd_store_goods_sale_ex — activity_amount + activity_id + order_from", """
UPDATE dwd.dwd_store_goods_sale_ex se SET
activity_amount = COALESCE(sr.activity_amount, 0),
activity_id = COALESCE(sr.activity_id, 0),
order_from = sr.order_from
FROM (
SELECT DISTINCT ON (id) id, activity_amount, activity_id, order_from
FROM ods.store_goods_sales_records
ORDER BY id, fetched_at DESC NULLS LAST
) sr
WHERE se.store_goods_sale_id = sr.id
AND se.activity_amount IS NULL
""")
# 2.5 dwd_goods_stock_summary
run_sql(conn, "2.5 DWD dwd_goods_stock_summary — create_time", """
UPDATE dwd.dwd_goods_stock_summary ds SET
create_time = gs.createtime
FROM (
SELECT DISTINCT ON (sitegoodsid) sitegoodsid, createtime
FROM ods.goods_stock_summary
WHERE createtime IS NOT NULL
ORDER BY sitegoodsid, fetched_at DESC NULLS LAST
) gs
WHERE ds.site_goods_id = gs.sitegoodsid
AND ds.create_time IS NULL
""")
# 2.6 dwd_table_fee_log_ex
run_sql(conn, "2.6 DWD dwd_table_fee_log_ex — order_from", """
UPDATE dwd.dwd_table_fee_log_ex te SET
order_from = tf.order_from
FROM (
SELECT DISTINCT ON (id) id, order_from
FROM ods.table_fee_transactions
WHERE order_from IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) tf
WHERE te.table_fee_log_id = tf.id
AND te.order_from IS NULL
""")
# 2.7 dwd_settlement_head_ex
run_sql(conn, "2.7 DWD dwd_settlement_head_ex — order_from", """
UPDATE dwd.dwd_settlement_head_ex she SET
order_from = sr.orderfrom
FROM (
SELECT DISTINCT ON (id) id, orderfrom
FROM ods.settlement_records
WHERE orderfrom IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) sr
WHERE she.order_settle_id = sr.id
AND she.order_from IS NULL
""")
# =========================================================================
# 第三步:验证 backfill 结果
# =========================================================================
print("\n" + "#"*60)
print("# 第三步:验证 backfill 结果")
print("#"*60)
run_sql(conn, "验证1: ODS member_profiles backfill", """
SELECT
COUNT(*) AS total,
COUNT(other_pay_money_sum) AS has_other_pay,
COUNT(last_consume_time) AS has_last_consume,
COUNT(non_consume_day_num) AS has_non_consume_day,
COUNT(first_consumption) AS has_first_consumption
FROM ods.member_profiles
""", fetch=True)
run_sql(conn, "验证2: DWD dim_member_ex backfill", """
SELECT
COUNT(*) AS total,
COUNT(other_pay_money_sum) AS has_other_pay,
COUNT(last_consume_time) AS has_last_consume,
COUNT(non_consume_day_num) AS has_non_consume_day,
COUNT(first_consumption) AS has_first_consumption
FROM dwd.dim_member_ex
WHERE scd2_is_current = 1
""", fetch=True)
run_sql(conn, "验证3: order_from backfill各表", """
SELECT 'table_fee_transactions' AS tbl, COUNT(*) AS total, COUNT(order_from) AS has_order_from FROM ods.table_fee_transactions
UNION ALL
SELECT 'assistant_service_records', COUNT(*), COUNT(order_from) FROM ods.assistant_service_records
UNION ALL
SELECT 'store_goods_sales_records', COUNT(*), COUNT(order_from) FROM ods.store_goods_sales_records
UNION ALL
SELECT 'settlement_records', COUNT(*), COUNT(orderfrom) FROM ods.settlement_records
""", fetch=True)
# =========================================================================
# 第四步DWS 交叉对比
# =========================================================================
print("\n" + "#"*60)
print("# 第四步DWS 交叉对比")
print("#"*60)
# 4.1 先查 DWS 中包含 member 和 consumption 的表
run_sql(conn, "4.1a 查找 DWS member consumption 相关表", """
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'dws' AND table_name LIKE '%member%'
ORDER BY table_name
""", fetch=True)
# 4.1b 查找 DWS 中包含 last_consume / first_consume 等列的表
run_sql(conn, "4.1b 查找 DWS 中相关列", """
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'dws'
AND column_name IN (
'last_consume_date', 'last_consume_time',
'first_consume_date', 'first_consumption',
'days_since_last', 'non_consume_day_num',
'last_order_date', 'first_order_date'
)
ORDER BY table_name, column_name
""", fetch=True)
# 4.2 尝试 dws_member_consumption_monthly 对比
# 如果表不存在会报错,脚本会 catch 并继续
run_sql(conn, "4.2 last_consume_time 对比: ODS API vs DWS 计算值", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id,
last_consume_time AS api_last_consume_time
FROM ods.member_profiles
WHERE last_consume_time IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id,
MAX(stat_date) AS dws_last_consume_date
FROM dws.dws_member_consumption_monthly
WHERE total_orders > 0
GROUP BY member_id
)
SELECT
a.member_id,
a.api_last_consume_time,
d.dws_last_consume_date,
a.api_last_consume_time::date - d.dws_last_consume_date AS diff_days
FROM api_vals a
LEFT JOIN dws_vals d ON a.member_id = d.member_id
WHERE d.dws_last_consume_date IS NOT NULL
ORDER BY ABS(EXTRACT(EPOCH FROM a.api_last_consume_time - d.dws_last_consume_date::timestamptz)) DESC
LIMIT 20
""", fetch=True)
# 4.3 差异统计摘要
run_sql(conn, "4.3 last_consume_time 差异分布", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id,
last_consume_time AS api_val
FROM ods.member_profiles
WHERE last_consume_time IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id,
MAX(stat_date) AS dws_val
FROM dws.dws_member_consumption_monthly
WHERE total_orders > 0
GROUP BY member_id
),
diffs AS (
SELECT
a.api_val::date - d.dws_val AS diff_days
FROM api_vals a
JOIN dws_vals d ON a.member_id = d.member_id
)
SELECT
COUNT(*) AS total_compared,
COUNT(*) FILTER (WHERE diff_days = 0) AS exact_match,
COUNT(*) FILTER (WHERE ABS(diff_days) <= 1) AS within_1day,
COUNT(*) FILTER (WHERE ABS(diff_days) <= 7) AS within_7days,
COUNT(*) FILTER (WHERE ABS(diff_days) > 7) AS over_7days,
MIN(diff_days) AS min_diff,
MAX(diff_days) AS max_diff
FROM diffs
""", fetch=True)
conn.close()
print("\n" + "="*60)
print("全部完成!")
print("="*60)
if __name__ == "__main__":
main()