Files
Neo-ZQYY/scripts/ops/backfill_phase2_final.py
Neo 779b2f6d52 chore: v1 整理 — 清理历史文件、DDL 合并、文档归档
- 清理 1155 个已删除的历史文件(废弃 prompt_logs、tmp、旧 ops 脚本)
- export/ 数据文件从 git 移除(已在 .gitignore)
- demo-miniprogram 从 tmp/ 移入 apps/,添加 CLAUDE.md 注解
- DDL 合并:完整 schema 定义填充到 db/*/schemas/(从 docs/database/ddl/ 复制)
- 39 个 v1 迁移脚本归档到 db/_archived/migrations_v1_merged/
- 4 个迁移变更类 BD_Manual 文档归档到 docs/database/_archived/
- .gitignore 补充 .vite/ 和 apps/*.zip
- settings.json 添加 effortLevel 默认配置
- scripts/ops/ 新增运维脚本入库

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:39:27 +08:00

338 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ETL 缺失字段补充 — 第二阶段最终修正版
payload key 已确认:
- member_profiles: snake_case (other_pay_money_sum, last_consume_time, ...)
- assistant_service_records: snake_case (order_from, deduct_leave_seconds)
- store_goods_sales_records: snake_case (order_from, activity_amount, activity_id)
- table_fee_transactions: snake_case (order_from)
- settlement_records: camelCase (orderFrom 不存在于 payload跳过)
- goods_stock_summary: snake_case (createtime → 需确认 key)
"""
import os, sys, time
from dotenv import load_dotenv
load_dotenv()
PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
if not PG_DSN or "test_etl_feiqiu" not in PG_DSN:
print("ERROR: DSN 未配置或不指向测试库", file=sys.stderr); sys.exit(1)
import psycopg2
def run(conn, label, sql, *, fetch=False):
print(f"\n{'='*60}\n[{label}]\n{'='*60}")
t0 = time.time()
try:
with conn.cursor() as cur:
cur.execute(sql)
if fetch:
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
print(f" 列: {cols}")
for r in rows: print(f" {r}")
print(f"{len(rows)}")
else:
print(f" 影响行数: {cur.rowcount}")
conn.commit()
except Exception as e:
conn.rollback(); print(f" ERROR: {e}")
print(f" 耗时: {time.time()-t0:.2f}s")
def main():
conn = psycopg2.connect(PG_DSN)
print(f"已连接: {PG_DSN.split('@')[1]}")
# =========================================================================
# ODS 层 backfill使用正确的 snake_case key
# =========================================================================
print("\n" + "#"*60)
print("# ODS 层 backfillsnake_case key 修正版)")
print("#"*60)
# 1.1 member_profiles — 4 个字段snake_case key
run(conn, "1.1 ODS member_profiles — 4 字段", """
UPDATE ods.member_profiles SET
other_pay_money_sum = (payload->>'other_pay_money_sum')::numeric(18,2),
last_consume_time = (payload->>'last_consume_time')::timestamp,
non_consume_day_num = (payload->>'non_consume_day_num')::integer,
first_consumption = (payload->>'first_consumption')::integer
WHERE payload->>'other_pay_money_sum' IS NOT NULL
AND other_pay_money_sum IS NULL
""")
# 1.2 assistant_service_records — order_fromsnake_case key
run(conn, "1.2 ODS assistant_service_records — order_from", """
UPDATE ods.assistant_service_records SET
order_from = (payload->>'order_from')::integer
WHERE payload->>'order_from' IS NOT NULL
AND order_from IS NULL
""")
# 1.3 store_goods_sales_records — order_fromsnake_case key
run(conn, "1.3 ODS store_goods_sales_records — order_from", """
UPDATE ods.store_goods_sales_records SET
order_from = (payload->>'order_from')::integer
WHERE payload->>'order_from' IS NOT NULL
AND order_from IS NULL
""")
# 1.4 table_fee_transactions — order_from
# 先检查 payload key
run(conn, "1.4a table_fee_transactions payload order 相关 key", """
SELECT DISTINCT k
FROM ods.table_fee_transactions,
LATERAL jsonb_object_keys(payload) AS k
WHERE k ILIKE '%order%' OR k ILIKE '%from%'
ORDER BY k
""", fetch=True)
run(conn, "1.4b ODS table_fee_transactions — order_from (snake)", """
UPDATE ods.table_fee_transactions SET
order_from = (payload->>'order_from')::integer
WHERE payload->>'order_from' IS NOT NULL
AND order_from IS NULL
""")
# 1.5 goods_stock_summary — 检查 createTime key
run(conn, "1.5a goods_stock_summary payload create 相关 key", """
SELECT DISTINCT k
FROM ods.goods_stock_summary,
LATERAL jsonb_object_keys(payload) AS k
WHERE k ILIKE '%create%'
ORDER BY k
""", fetch=True)
# 1.6 settlement_records — 检查 order_from keycamelCase payload
run(conn, "1.6a settlement_records payload order 相关 key", """
SELECT DISTINCT k
FROM ods.settlement_records,
LATERAL jsonb_object_keys(payload) AS k
WHERE k ILIKE '%order%' OR k ILIKE '%from%'
ORDER BY k
""", fetch=True)
# settlement_records payload 是 camelCase检查 orderFrom 是否有值
run(conn, "1.6b settlement_records orderFrom 值统计", """
SELECT COUNT(*) AS total,
COUNT(payload->>'orderFrom') AS has_orderFrom
FROM ods.settlement_records
""", fetch=True)
# =========================================================================
# DWD 层 backfill
# =========================================================================
print("\n" + "#"*60)
print("# DWD 层 backfill")
print("#"*60)
# 2.1 dim_member_ex用 snake_case key
run(conn, "2.1 DWD dim_member_ex — 4 字段", """
UPDATE dwd.dim_member_ex me SET
other_pay_money_sum = (mp.payload->>'other_pay_money_sum')::numeric(18,2),
last_consume_time = (mp.payload->>'last_consume_time')::timestamptz,
non_consume_day_num = (mp.payload->>'non_consume_day_num')::integer,
first_consumption = (mp.payload->>'first_consumption')::integer
FROM (
SELECT DISTINCT ON (id) id, payload
FROM ods.member_profiles
WHERE payload->>'other_pay_money_sum' IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) mp
WHERE me.member_id = mp.id
AND me.scd2_is_current = 1
AND me.other_pay_money_sum IS NULL
""")
# 2.3 dwd_assistant_service_log_ex — order_from
run(conn, "2.3 DWD dwd_assistant_service_log_ex — order_from", """
UPDATE dwd.dwd_assistant_service_log_ex de SET
order_from = ar.order_from
FROM (
SELECT DISTINCT ON (id) id, order_from
FROM ods.assistant_service_records
WHERE order_from IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) ar
WHERE de.assistant_service_id = ar.id
AND de.order_from IS NULL
""")
# 2.4 dwd_store_goods_sale_ex — order_from
run(conn, "2.4 DWD dwd_store_goods_sale_ex — order_from", """
UPDATE dwd.dwd_store_goods_sale_ex se SET
order_from = sr.order_from
FROM (
SELECT DISTINCT ON (id) id, order_from
FROM ods.store_goods_sales_records
WHERE order_from IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) sr
WHERE se.store_goods_sale_id = sr.id
AND se.order_from IS NULL
""")
# 2.6 dwd_table_fee_log_ex — order_from
run(conn, "2.6 DWD dwd_table_fee_log_ex — order_from", """
UPDATE dwd.dwd_table_fee_log_ex te SET
order_from = tf.order_from
FROM (
SELECT DISTINCT ON (id) id, order_from
FROM ods.table_fee_transactions
WHERE order_from IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
) tf
WHERE te.table_fee_log_id = tf.id
AND te.order_from IS NULL
""")
# =========================================================================
# 验证
# =========================================================================
print("\n" + "#"*60)
print("# 验证 backfill 结果")
print("#"*60)
run(conn, "验证1: ODS member_profiles", """
SELECT COUNT(*) AS total,
COUNT(other_pay_money_sum) AS has_other_pay,
COUNT(last_consume_time) AS has_last_consume,
COUNT(non_consume_day_num) AS has_non_consume_day,
COUNT(first_consumption) AS has_first_consumption
FROM ods.member_profiles
""", fetch=True)
run(conn, "验证2: DWD dim_member_ex (current)", """
SELECT COUNT(*) AS total,
COUNT(other_pay_money_sum) AS has_other_pay,
COUNT(last_consume_time) AS has_last_consume,
COUNT(non_consume_day_num) AS has_non_consume_day,
COUNT(first_consumption) AS has_first_consumption
FROM dwd.dim_member_ex WHERE scd2_is_current = 1
""", fetch=True)
run(conn, "验证3: order_from各表", """
SELECT 'ods.assistant_service_records' AS tbl, COUNT(*) AS total, COUNT(order_from) AS has_of FROM ods.assistant_service_records
UNION ALL SELECT 'ods.store_goods_sales_records', COUNT(*), COUNT(order_from) FROM ods.store_goods_sales_records
UNION ALL SELECT 'ods.table_fee_transactions', COUNT(*), COUNT(order_from) FROM ods.table_fee_transactions
UNION ALL SELECT 'dwd.dwd_assistant_service_log_ex', COUNT(*), COUNT(order_from) FROM dwd.dwd_assistant_service_log_ex
UNION ALL SELECT 'dwd.dwd_store_goods_sale_ex', COUNT(*), COUNT(order_from) FROM dwd.dwd_store_goods_sale_ex
UNION ALL SELECT 'dwd.dwd_table_fee_log_ex', COUNT(*), COUNT(order_from) FROM dwd.dwd_table_fee_log_ex
""", fetch=True)
# =========================================================================
# DWS 交叉对比
# =========================================================================
print("\n" + "#"*60)
print("# DWS 交叉对比dws_member_consumption_summary")
print("#"*60)
# last_consume_time 对比
run(conn, "对比1: last_consume_time — ODS API vs DWS", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id,
last_consume_time AS api_val
FROM ods.member_profiles
WHERE last_consume_time IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id, last_consume_date AS dws_val
FROM dws.dws_member_consumption_summary
WHERE last_consume_date IS NOT NULL
)
SELECT
a.member_id,
a.api_val AS api_last_consume_time,
d.dws_val AS dws_last_consume_date,
a.api_val::date - d.dws_val AS diff_days
FROM api_vals a
JOIN dws_vals d ON a.member_id = d.member_id
ORDER BY ABS(a.api_val::date - d.dws_val) DESC
LIMIT 20
""", fetch=True)
# last_consume_time 差异分布
run(conn, "对比1 统计: last_consume_time 差异分布", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id,
last_consume_time AS api_val
FROM ods.member_profiles
WHERE last_consume_time IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id, last_consume_date AS dws_val
FROM dws.dws_member_consumption_summary
WHERE last_consume_date IS NOT NULL
),
diffs AS (
SELECT a.api_val::date - d.dws_val AS diff_days
FROM api_vals a JOIN dws_vals d ON a.member_id = d.member_id
)
SELECT
COUNT(*) AS total_compared,
COUNT(*) FILTER (WHERE diff_days = 0) AS exact_match,
COUNT(*) FILTER (WHERE ABS(diff_days) <= 1) AS within_1day,
COUNT(*) FILTER (WHERE ABS(diff_days) <= 7) AS within_7days,
COUNT(*) FILTER (WHERE ABS(diff_days) > 7) AS over_7days,
MIN(diff_days) AS min_diff, MAX(diff_days) AS max_diff
FROM diffs
""", fetch=True)
# non_consume_day_num vs days_since_last
run(conn, "对比2: non_consume_day_num — ODS API vs DWS days_since_last", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id,
non_consume_day_num AS api_days
FROM ods.member_profiles
WHERE non_consume_day_num IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id, days_since_last AS dws_days
FROM dws.dws_member_consumption_summary
WHERE days_since_last IS NOT NULL
)
SELECT
COUNT(*) AS total_compared,
COUNT(*) FILTER (WHERE a.api_days = d.dws_days) AS exact_match,
COUNT(*) FILTER (WHERE ABS(a.api_days - d.dws_days) <= 1) AS within_1,
COUNT(*) FILTER (WHERE ABS(a.api_days - d.dws_days) <= 7) AS within_7,
COUNT(*) FILTER (WHERE ABS(a.api_days - d.dws_days) > 7) AS over_7,
MIN(a.api_days - d.dws_days) AS min_diff,
MAX(a.api_days - d.dws_days) AS max_diff,
ROUND(AVG(a.api_days - d.dws_days), 1) AS avg_diff
FROM api_vals a JOIN dws_vals d ON a.member_id = d.member_id
""", fetch=True)
# first_consumption vs first_consume_date
run(conn, "对比3: first_consumption — ODS API vs DWS first_consume_date", """
WITH api_vals AS (
SELECT DISTINCT ON (id) id AS member_id, first_consumption
FROM ods.member_profiles
WHERE first_consumption IS NOT NULL
ORDER BY id, fetched_at DESC NULLS LAST
),
dws_vals AS (
SELECT member_id, first_consume_date
FROM dws.dws_member_consumption_summary
WHERE first_consume_date IS NOT NULL
)
SELECT
COUNT(*) AS api_total,
COUNT(d.member_id) AS matched_in_dws,
COUNT(*) FILTER (WHERE a.first_consumption = 2 AND d.first_consume_date IS NOT NULL) AS api_yes_dws_yes,
COUNT(*) FILTER (WHERE a.first_consumption = 2 AND d.first_consume_date IS NULL) AS api_yes_dws_null,
COUNT(*) FILTER (WHERE a.first_consumption != 2 AND d.first_consume_date IS NOT NULL) AS api_no_dws_yes,
COUNT(*) FILTER (WHERE a.first_consumption != 2 AND d.first_consume_date IS NULL) AS api_no_dws_null
FROM api_vals a
LEFT JOIN dws_vals d ON a.member_id = d.member_id
""", fetch=True)
conn.close()
print("\n" + "="*60)
print("第二阶段全部完成!")
print("="*60)
if __name__ == "__main__":
main()