# -*- coding: utf-8 -*- """修复 dim_assistant 表中的 user_id 字段""" import sys sys.path.insert(0, '.') from config.settings import AppConfig from database.connection import DatabaseConnection from database.operations import DatabaseOperations config = AppConfig.load() db_conn = DatabaseConnection(config.config['db']['dsn']) db = DatabaseOperations(db_conn) print("=== 修复 dim_assistant.user_id ===") # 方案:从 ODS 表更新 DWD 表的 user_id # 通过 id (ODS) = assistant_id (DWD) 关联 # 1. 先检查当前状态 print("\n修复前:") sql_before = """ SELECT COUNT(*) as total, COUNT(CASE WHEN user_id > 0 THEN 1 END) as has_user_id FROM billiards_dwd.dim_assistant WHERE scd2_is_current = 1 """ r = dict(db.query(sql_before)[0]) print(f" 总记录: {r['total']}, 有user_id: {r['has_user_id']}") # 2. 执行更新 print("\n执行更新...") update_sql = """ UPDATE billiards_dwd.dim_assistant d SET user_id = o.user_id FROM ( SELECT DISTINCT ON (id) id, user_id FROM billiards_ods.assistant_accounts_master WHERE user_id > 0 ORDER BY id, fetched_at DESC ) o WHERE d.assistant_id = o.id AND (d.user_id IS NULL OR d.user_id = 0) """ with db_conn.conn.cursor() as cur: cur.execute(update_sql) updated = cur.rowcount print(f" 更新了 {updated} 条记录") db_conn.conn.commit() # 3. 检查修复后状态 print("\n修复后:") r2 = dict(db.query(sql_before)[0]) print(f" 总记录: {r2['total']}, 有user_id: {r2['has_user_id']}") # 4. 显示样本数据 print("\n样本数据:") sql_sample = """ SELECT assistant_id, user_id, assistant_no, nickname FROM billiards_dwd.dim_assistant WHERE scd2_is_current = 1 ORDER BY assistant_no::int LIMIT 10 """ for row in db.query(sql_sample): r = dict(row) print(f" assistant_id={r['assistant_id']}, user_id={r['user_id']}, no={r['assistant_no']}, nickname={r['nickname']}") # 5. 验证与服务日志的关联 print("\n验证与服务日志的关联:") sql_verify = """ SELECT COUNT(DISTINCT s.user_id) as service_unique_users, COUNT(DISTINCT CASE WHEN d.assistant_id IS NOT NULL THEN s.user_id END) as matched_users FROM billiards_dwd.dwd_assistant_service_log s LEFT JOIN billiards_dwd.dim_assistant d ON s.user_id = d.user_id AND d.scd2_is_current = 1 WHERE s.is_delete = 0 AND s.user_id > 0 """ r3 = dict(db.query(sql_verify)[0]) print(f" 服务日志唯一user_id: {r3['service_unique_users']}") print(f" 能匹配到dim_assistant: {r3['matched_users']}") match_rate = r3['matched_users'] / r3['service_unique_users'] * 100 if r3['service_unique_users'] > 0 else 0 print(f" 匹配率: {match_rate:.1f}%") db_conn.close() print("\n完成!")