- 清理 1155 个已删除的历史文件(废弃 prompt_logs、tmp、旧 ops 脚本) - export/ 数据文件从 git 移除(已在 .gitignore) - demo-miniprogram 从 tmp/ 移入 apps/,添加 CLAUDE.md 注解 - DDL 合并:完整 schema 定义填充到 db/*/schemas/(从 docs/database/ddl/ 复制) - 39 个 v1 迁移脚本归档到 db/_archived/migrations_v1_merged/ - 4 个迁移变更类 BD_Manual 文档归档到 docs/database/_archived/ - .gitignore 补充 .vite/ 和 apps/*.zip - settings.json 添加 effortLevel 默认配置 - scripts/ops/ 新增运维脚本入库 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
384 lines
14 KiB
Python
384 lines
14 KiB
Python
"""
|
||
ETL 缺失字段补充 — 第二阶段:历史数据 backfill + DWS 对比验证
|
||
目标库:test_etl_feiqiu(测试库)
|
||
|
||
执行步骤:
|
||
1. ODS 层 backfill(从 payload JSON 提取新字段值)
|
||
2. DWD 层 backfill(从 ODS 已回填字段同步到 DWD)
|
||
3. 验证 backfill 结果
|
||
4. DWS 交叉对比(member_profiles 3 个字段)
|
||
"""
|
||
import os
|
||
import sys
|
||
import time
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
|
||
if not PG_DSN:
|
||
print("ERROR: TEST_DB_DSN / PG_DSN 未配置", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if "test_etl_feiqiu" not in PG_DSN:
|
||
print(f"ERROR: DSN 不指向测试库 test_etl_feiqiu: {PG_DSN}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
import psycopg2
|
||
|
||
def run_sql(conn, label: str, sql: str, *, fetch: bool = False):
|
||
"""执行单条 SQL,打印结果"""
|
||
print(f"\n{'='*60}")
|
||
print(f"[{label}]")
|
||
print(f"{'='*60}")
|
||
t0 = time.time()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
if fetch:
|
||
cols = [d[0] for d in cur.description]
|
||
rows = cur.fetchall()
|
||
print(f" 列: {cols}")
|
||
for r in rows:
|
||
print(f" {r}")
|
||
print(f" 共 {len(rows)} 行")
|
||
else:
|
||
print(f" 影响行数: {cur.rowcount}")
|
||
conn.commit()
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" ERROR: {e}")
|
||
elapsed = time.time() - t0
|
||
print(f" 耗时: {elapsed:.2f}s")
|
||
|
||
def main():
|
||
conn = psycopg2.connect(PG_DSN)
|
||
print(f"已连接: {PG_DSN.split('@')[1]}")
|
||
|
||
# =========================================================================
|
||
# 第一步:ODS 层 backfill
|
||
# =========================================================================
|
||
print("\n" + "#"*60)
|
||
print("# 第一步:ODS 层 backfill")
|
||
print("#"*60)
|
||
|
||
# 1.1 member_profiles — 4 个字段
|
||
run_sql(conn, "1.1 ODS member_profiles — 4 字段", """
|
||
UPDATE ods.member_profiles SET
|
||
other_pay_money_sum = (payload->>'otherPayMoneySum')::numeric(18,2),
|
||
last_consume_time = (payload->>'lastConsumeTime')::timestamp,
|
||
non_consume_day_num = (payload->>'nonConsumeDayNum')::integer,
|
||
first_consumption = (payload->>'firstConsumption')::integer
|
||
WHERE payload->>'otherPayMoneySum' IS NOT NULL
|
||
AND other_pay_money_sum IS NULL
|
||
""")
|
||
|
||
# 1.2 assistant_service_records — deduct_leave_seconds
|
||
run_sql(conn, "1.2a ODS assistant_service_records — deduct_leave_seconds", """
|
||
UPDATE ods.assistant_service_records SET
|
||
deduct_leave_seconds = COALESCE((payload->>'deductLeaveSeconds')::integer, 0)
|
||
WHERE deduct_leave_seconds IS NULL OR deduct_leave_seconds = 0
|
||
""")
|
||
|
||
# 1.2 assistant_service_records — order_from
|
||
run_sql(conn, "1.2b ODS assistant_service_records — order_from", """
|
||
UPDATE ods.assistant_service_records SET
|
||
order_from = (payload->>'orderFrom')::integer
|
||
WHERE payload->>'orderFrom' IS NOT NULL
|
||
AND order_from IS NULL
|
||
""")
|
||
|
||
# 1.3 store_goods_sales_records — activity_amount + activity_id
|
||
run_sql(conn, "1.3a ODS store_goods_sales_records — activity_amount + activity_id", """
|
||
UPDATE ods.store_goods_sales_records SET
|
||
activity_amount = COALESCE((payload->>'activityAmount')::numeric(18,2), 0),
|
||
activity_id = COALESCE((payload->>'activityId')::bigint, 0)
|
||
WHERE activity_amount IS NULL OR activity_amount = 0
|
||
""")
|
||
|
||
# 1.3 store_goods_sales_records — order_from
|
||
run_sql(conn, "1.3b ODS store_goods_sales_records — order_from", """
|
||
UPDATE ods.store_goods_sales_records SET
|
||
order_from = (payload->>'orderFrom')::integer
|
||
WHERE payload->>'orderFrom' IS NOT NULL
|
||
AND order_from IS NULL
|
||
""")
|
||
|
||
# 1.4 goods_stock_summary — createtime
|
||
run_sql(conn, "1.4 ODS goods_stock_summary — createtime", """
|
||
UPDATE ods.goods_stock_summary SET
|
||
createtime = (payload->>'createTime')::timestamp
|
||
WHERE payload->>'createTime' IS NOT NULL
|
||
AND createtime IS NULL
|
||
""")
|
||
|
||
# 1.5 table_fee_transactions — order_from
|
||
run_sql(conn, "1.5 ODS table_fee_transactions — order_from", """
|
||
UPDATE ods.table_fee_transactions SET
|
||
order_from = (payload->>'orderFrom')::integer
|
||
WHERE payload->>'orderFrom' IS NOT NULL
|
||
AND order_from IS NULL
|
||
""")
|
||
|
||
# 1.6 settlement_records — 先检查 payload 结构
|
||
run_sql(conn, "1.6a ODS settlement_records — 检查 payload 结构", """
|
||
SELECT payload->>'orderFrom' AS top_level,
|
||
payload->'settleList'->0->>'orderFrom' AS nested
|
||
FROM ods.settlement_records LIMIT 3
|
||
""", fetch=True)
|
||
|
||
# 根据检查结果,先尝试顶层路径
|
||
run_sql(conn, "1.6b ODS settlement_records — orderfrom (顶层)", """
|
||
UPDATE ods.settlement_records SET
|
||
orderfrom = (payload->>'orderFrom')::integer
|
||
WHERE payload->>'orderFrom' IS NOT NULL
|
||
AND orderfrom IS NULL
|
||
""")
|
||
|
||
# =========================================================================
|
||
# 第二步:DWD 层 backfill
|
||
# =========================================================================
|
||
print("\n" + "#"*60)
|
||
print("# 第二步:DWD 层 backfill")
|
||
print("#"*60)
|
||
|
||
# 2.1 dim_member_ex
|
||
run_sql(conn, "2.1 DWD dim_member_ex — 4 字段", """
|
||
UPDATE dwd.dim_member_ex me SET
|
||
other_pay_money_sum = (mp.payload->>'otherPayMoneySum')::numeric(18,2),
|
||
last_consume_time = (mp.payload->>'lastConsumeTime')::timestamptz,
|
||
non_consume_day_num = (mp.payload->>'nonConsumeDayNum')::integer,
|
||
first_consumption = (mp.payload->>'firstConsumption')::integer
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, payload
|
||
FROM ods.member_profiles
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) mp
|
||
WHERE me.member_id = mp.id
|
||
AND me.scd2_is_current = 1
|
||
AND me.other_pay_money_sum IS NULL
|
||
""")
|
||
|
||
# 2.2 dim_member_card_account_ex
|
||
run_sql(conn, "2.2 DWD dim_member_card_account_ex — pdassisnatlevel + cxassisnatlevel", """
|
||
UPDATE dwd.dim_member_card_account_ex ce SET
|
||
pdassisnatlevel = mc.pdassisnatlevel,
|
||
cxassisnatlevel = mc.cxassisnatlevel
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, pdassisnatlevel, cxassisnatlevel
|
||
FROM ods.member_stored_value_cards
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) mc
|
||
WHERE ce.member_card_id = mc.id
|
||
AND ce.scd2_is_current = 1
|
||
AND ce.pdassisnatlevel IS NULL
|
||
""")
|
||
|
||
# 2.3 dwd_assistant_service_log_ex
|
||
run_sql(conn, "2.3 DWD dwd_assistant_service_log_ex — deduct_leave_seconds + order_from", """
|
||
UPDATE dwd.dwd_assistant_service_log_ex de SET
|
||
deduct_leave_seconds = COALESCE(ar.deduct_leave_seconds, 0),
|
||
order_from = ar.order_from
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, deduct_leave_seconds, order_from
|
||
FROM ods.assistant_service_records
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) ar
|
||
WHERE de.assistant_service_id = ar.id
|
||
AND de.deduct_leave_seconds IS NULL
|
||
""")
|
||
|
||
# 2.4 dwd_store_goods_sale_ex
|
||
run_sql(conn, "2.4 DWD dwd_store_goods_sale_ex — activity_amount + activity_id + order_from", """
|
||
UPDATE dwd.dwd_store_goods_sale_ex se SET
|
||
activity_amount = COALESCE(sr.activity_amount, 0),
|
||
activity_id = COALESCE(sr.activity_id, 0),
|
||
order_from = sr.order_from
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, activity_amount, activity_id, order_from
|
||
FROM ods.store_goods_sales_records
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) sr
|
||
WHERE se.store_goods_sale_id = sr.id
|
||
AND se.activity_amount IS NULL
|
||
""")
|
||
|
||
# 2.5 dwd_goods_stock_summary
|
||
run_sql(conn, "2.5 DWD dwd_goods_stock_summary — create_time", """
|
||
UPDATE dwd.dwd_goods_stock_summary ds SET
|
||
create_time = gs.createtime
|
||
FROM (
|
||
SELECT DISTINCT ON (sitegoodsid) sitegoodsid, createtime
|
||
FROM ods.goods_stock_summary
|
||
WHERE createtime IS NOT NULL
|
||
ORDER BY sitegoodsid, fetched_at DESC NULLS LAST
|
||
) gs
|
||
WHERE ds.site_goods_id = gs.sitegoodsid
|
||
AND ds.create_time IS NULL
|
||
""")
|
||
|
||
# 2.6 dwd_table_fee_log_ex
|
||
run_sql(conn, "2.6 DWD dwd_table_fee_log_ex — order_from", """
|
||
UPDATE dwd.dwd_table_fee_log_ex te SET
|
||
order_from = tf.order_from
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, order_from
|
||
FROM ods.table_fee_transactions
|
||
WHERE order_from IS NOT NULL
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) tf
|
||
WHERE te.table_fee_log_id = tf.id
|
||
AND te.order_from IS NULL
|
||
""")
|
||
|
||
# 2.7 dwd_settlement_head_ex
|
||
run_sql(conn, "2.7 DWD dwd_settlement_head_ex — order_from", """
|
||
UPDATE dwd.dwd_settlement_head_ex she SET
|
||
order_from = sr.orderfrom
|
||
FROM (
|
||
SELECT DISTINCT ON (id) id, orderfrom
|
||
FROM ods.settlement_records
|
||
WHERE orderfrom IS NOT NULL
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
) sr
|
||
WHERE she.order_settle_id = sr.id
|
||
AND she.order_from IS NULL
|
||
""")
|
||
|
||
# =========================================================================
|
||
# 第三步:验证 backfill 结果
|
||
# =========================================================================
|
||
print("\n" + "#"*60)
|
||
print("# 第三步:验证 backfill 结果")
|
||
print("#"*60)
|
||
|
||
run_sql(conn, "验证1: ODS member_profiles backfill", """
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
COUNT(other_pay_money_sum) AS has_other_pay,
|
||
COUNT(last_consume_time) AS has_last_consume,
|
||
COUNT(non_consume_day_num) AS has_non_consume_day,
|
||
COUNT(first_consumption) AS has_first_consumption
|
||
FROM ods.member_profiles
|
||
""", fetch=True)
|
||
|
||
run_sql(conn, "验证2: DWD dim_member_ex backfill", """
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
COUNT(other_pay_money_sum) AS has_other_pay,
|
||
COUNT(last_consume_time) AS has_last_consume,
|
||
COUNT(non_consume_day_num) AS has_non_consume_day,
|
||
COUNT(first_consumption) AS has_first_consumption
|
||
FROM dwd.dim_member_ex
|
||
WHERE scd2_is_current = 1
|
||
""", fetch=True)
|
||
|
||
run_sql(conn, "验证3: order_from backfill(各表)", """
|
||
SELECT 'table_fee_transactions' AS tbl, COUNT(*) AS total, COUNT(order_from) AS has_order_from FROM ods.table_fee_transactions
|
||
UNION ALL
|
||
SELECT 'assistant_service_records', COUNT(*), COUNT(order_from) FROM ods.assistant_service_records
|
||
UNION ALL
|
||
SELECT 'store_goods_sales_records', COUNT(*), COUNT(order_from) FROM ods.store_goods_sales_records
|
||
UNION ALL
|
||
SELECT 'settlement_records', COUNT(*), COUNT(orderfrom) FROM ods.settlement_records
|
||
""", fetch=True)
|
||
|
||
# =========================================================================
|
||
# 第四步:DWS 交叉对比
|
||
# =========================================================================
|
||
print("\n" + "#"*60)
|
||
print("# 第四步:DWS 交叉对比")
|
||
print("#"*60)
|
||
|
||
# 4.1 先查 DWS 中包含 member 和 consumption 的表
|
||
run_sql(conn, "4.1a 查找 DWS member consumption 相关表", """
|
||
SELECT table_name FROM information_schema.tables
|
||
WHERE table_schema = 'dws' AND table_name LIKE '%member%'
|
||
ORDER BY table_name
|
||
""", fetch=True)
|
||
|
||
# 4.1b 查找 DWS 中包含 last_consume / first_consume 等列的表
|
||
run_sql(conn, "4.1b 查找 DWS 中相关列", """
|
||
SELECT table_name, column_name
|
||
FROM information_schema.columns
|
||
WHERE table_schema = 'dws'
|
||
AND column_name IN (
|
||
'last_consume_date', 'last_consume_time',
|
||
'first_consume_date', 'first_consumption',
|
||
'days_since_last', 'non_consume_day_num',
|
||
'last_order_date', 'first_order_date'
|
||
)
|
||
ORDER BY table_name, column_name
|
||
""", fetch=True)
|
||
|
||
# 4.2 尝试 dws_member_consumption_monthly 对比
|
||
# 如果表不存在会报错,脚本会 catch 并继续
|
||
run_sql(conn, "4.2 last_consume_time 对比: ODS API vs DWS 计算值", """
|
||
WITH api_vals AS (
|
||
SELECT DISTINCT ON (id) id AS member_id,
|
||
last_consume_time AS api_last_consume_time
|
||
FROM ods.member_profiles
|
||
WHERE last_consume_time IS NOT NULL
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
),
|
||
dws_vals AS (
|
||
SELECT member_id,
|
||
MAX(stat_date) AS dws_last_consume_date
|
||
FROM dws.dws_member_consumption_monthly
|
||
WHERE total_orders > 0
|
||
GROUP BY member_id
|
||
)
|
||
SELECT
|
||
a.member_id,
|
||
a.api_last_consume_time,
|
||
d.dws_last_consume_date,
|
||
a.api_last_consume_time::date - d.dws_last_consume_date AS diff_days
|
||
FROM api_vals a
|
||
LEFT JOIN dws_vals d ON a.member_id = d.member_id
|
||
WHERE d.dws_last_consume_date IS NOT NULL
|
||
ORDER BY ABS(EXTRACT(EPOCH FROM a.api_last_consume_time - d.dws_last_consume_date::timestamptz)) DESC
|
||
LIMIT 20
|
||
""", fetch=True)
|
||
|
||
# 4.3 差异统计摘要
|
||
run_sql(conn, "4.3 last_consume_time 差异分布", """
|
||
WITH api_vals AS (
|
||
SELECT DISTINCT ON (id) id AS member_id,
|
||
last_consume_time AS api_val
|
||
FROM ods.member_profiles
|
||
WHERE last_consume_time IS NOT NULL
|
||
ORDER BY id, fetched_at DESC NULLS LAST
|
||
),
|
||
dws_vals AS (
|
||
SELECT member_id,
|
||
MAX(stat_date) AS dws_val
|
||
FROM dws.dws_member_consumption_monthly
|
||
WHERE total_orders > 0
|
||
GROUP BY member_id
|
||
),
|
||
diffs AS (
|
||
SELECT
|
||
a.api_val::date - d.dws_val AS diff_days
|
||
FROM api_vals a
|
||
JOIN dws_vals d ON a.member_id = d.member_id
|
||
)
|
||
SELECT
|
||
COUNT(*) AS total_compared,
|
||
COUNT(*) FILTER (WHERE diff_days = 0) AS exact_match,
|
||
COUNT(*) FILTER (WHERE ABS(diff_days) <= 1) AS within_1day,
|
||
COUNT(*) FILTER (WHERE ABS(diff_days) <= 7) AS within_7days,
|
||
COUNT(*) FILTER (WHERE ABS(diff_days) > 7) AS over_7days,
|
||
MIN(diff_days) AS min_diff,
|
||
MAX(diff_days) AS max_diff
|
||
FROM diffs
|
||
""", fetch=True)
|
||
|
||
conn.close()
|
||
print("\n" + "="*60)
|
||
print("全部完成!")
|
||
print("="*60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|