""" 导出 DWD 表字段清单(现有 + 拟新增),供人工审查。 输出:$FIELD_AUDIT_ROOT/dwd_field_review.md(由 .env 配置) 对每张涉及的 DWD 表(main + ex),列出: - 现有字段:字段名、数据类型、说明(从 pg_catalog.col_description 获取) - 拟新增字段:字段名、建议类型、来源 ODS 列、说明 """ import os import sys import psycopg2 from dotenv import load_dotenv from pathlib import Path load_dotenv(Path(__file__).resolve().parents[2] / ".env") # 构建 DSN:使用 .env 中的 PG_DSN(指向 test_etl_feiqiu,schema 为 ods/dwd/dws/meta) DSN = os.getenv("PG_DSN") if not DSN: print("ERROR: PG_DSN 未配置"); sys.exit(1) # ── 涉及的 DWD 表(schema.table) ────────────────────────────────── TABLES = [ # A 类 ("dwd", "dim_assistant"), ("dwd", "dim_assistant_ex"), ("dwd", "dwd_assistant_service_log"), ("dwd", "dwd_assistant_service_log_ex"), ("dwd", "dwd_store_goods_sale"), ("dwd", "dwd_store_goods_sale_ex"), ("dwd", "dwd_member_balance_change"), ("dwd", "dwd_member_balance_change_ex"), ("dwd", "dim_tenant_goods"), ("dwd", "dim_tenant_goods_ex"), ("dwd", "dim_table"), ("dwd", "dim_table_ex"), # B 类 ("dwd", "dwd_recharge_order"), ("dwd", "dim_store_goods"), ("dwd", "dim_store_goods_ex"), ] # ── 拟新增字段(按 DWD 表分组) ──────────────────────────────────── NEW_FIELDS = { "dim_assistant_ex": [ ("system_role_id", "bigint", "assistant_accounts_master.system_role_id", "系统角色 ID,关联角色权限"), ("job_num", "text", "assistant_accounts_master.job_num", "备用工号(当前门店未启用,全 NULL)"), ("cx_unit_price", "numeric(18,2)","assistant_accounts_master.cx_unit_price", "促销时段单价(当前值 0.00)"), ("pd_unit_price", "numeric(18,2)","assistant_accounts_master.pd_unit_price", "普通时段单价(当前值 0.00)"), ], "dwd_assistant_service_log_ex": [ ("operator_id", "bigint", "assistant_service_records.operator_id", "操作员 ID(如收银员)"), ("operator_name", "text", "assistant_service_records.operator_name", "操作员名称快照"), ], "dwd_member_balance_change_ex": [ ("relate_id", "bigint", "member_balance_changes.relate_id", "关联充值/订单 ID(0=无关联)"), ], "dim_table_ex": [ ("sitename", "text", "site_tables_master.sitename", "门店名称快照"), ("applet_qr_code_url", "text", "site_tables_master.appletqrcodeurl", "小程序二维码 URL(当前全 NULL)"), ("audit_status", "integer", "site_tables_master.audit_status", "审核状态枚举"), ("charge_free", "integer", "site_tables_master.charge_free", "是否免费(0=否)"), ("create_time", "timestamptz", "site_tables_master.create_time", "台桌创建时间"), ("delay_lights_time", "integer", "site_tables_master.delay_lights_time", "延迟关灯时间(秒)"), ("is_rest_area", "integer", "site_tables_master.is_rest_area", "是否休息区(0=否)"), ("light_status", "integer", "site_tables_master.light_status", "灯控状态枚举"), ("only_allow_groupon", "integer", "site_tables_master.only_allow_groupon", "是否仅允许团购"), ("order_delay_time", "integer", "site_tables_master.order_delay_time", "订单延迟时间(秒)"), ("self_table", "integer", "site_tables_master.self_table", "是否自助台桌"), ("table_status_name", "text", "site_tables_master.tablestatusname", "台桌状态名称(空闲中/使用中/暂停中)"), ("temporary_light_second","integer", "site_tables_master.temporary_light_second","临时灯光秒数"), ("virtual_table", "integer", "site_tables_master.virtual_table", "是否虚拟台桌(0=否)"), ], "dim_store_goods_ex": [ ("batch_stock_quantity", "numeric", "store_goods_master.batch_stock_quantity", "批次库存数量"), ], } # recharge_settlements 仅补映射,不新增列 MAPPING_ONLY = { "dwd_recharge_order": [ ("pl_coupon_sale_amount", "plcouponsaleamount", "平台券销售额"), ("mervou_sales_amount", "mervousalesamount", "美团券销售额"), ("electricity_money", "electricitymoney", "电费金额"), ("real_electricity_money", "realelectricitymoney", "实际电费金额"), ("electricity_adjust_money","electricityadjustmoney","电费调整金额"), ], } # 跳过的字段 SKIPPED = [ ("store_goods_sales_records", "discount_price", "DWD 列名已被 discount_money 占用"), ("tenant_goods_master", "commoditycode", "冗余字段,DWD 已有 commodity_code + commodity_code_list"), ("store_goods_master", "provisional_total_cost","DWD 列名已被 total_purchase_cost 占用"), ("store_goods_master", "time_slot_sale", "ODS 列不存在,需确认 API"), ] # C 类新建表(仅列出 ODS 列名,后续设计时确定 DWD 列名) C_CLASS_TABLES = { "goods_stock_summary (→ 新建 dwd_goods_stock_summary)": [ "sitegoodsid", "goodsname", "goodsunit", "goodscategoryid", "goodscategorysecondid", "categoryname", "rangestartstock", "rangeendstock", "rangein", "rangeout", "rangesale", "rangesalemoney", "rangeinventory", "currentstock", ], "goods_stock_movements (→ 新建 dwd_goods_stock_movement)": [ "sitegoodsstockid", "tenantid", "siteid", "sitegoodsid", "goodsname", "goodscategoryid", "goodssecondcategoryid", "unit", "price", "stocktype", "changenum", "startnum", "endnum", "changenuma", "startnuma", "endnuma", "remark", "operatorname", "createtime", ], } def get_table_columns(cur, schema, table): """获取表的列信息:列名、类型、注释(使用 pg_catalog 避免 search_path 问题)""" cur.execute(""" SELECT a.attname AS column_name, pg_catalog.format_type(a.atttypid, a.atttypmod) AS col_type, COALESCE(d.description, '') AS col_comment FROM pg_catalog.pg_attribute a JOIN pg_catalog.pg_class c ON c.oid = a.attrelid JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_catalog.pg_description d ON d.objoid = a.attrelid AND d.objsubid = a.attnum WHERE n.nspname = %s AND c.relname = %s AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum """, (schema, table)) return cur.fetchall() def main(): conn = psycopg2.connect(DSN) cur = conn.cursor() lines = [] lines.append("# DWD 表字段清单(现有 + 拟新增)\n") lines.append("> 导出时间:2026-02-19") lines.append("> 用途:供人工审查排查结论,确认新增字段是否合理\n") lines.append("---\n") # ── 第一部分:各 DWD 表现有字段 + 拟新增字段 ── lines.append("## 第一部分:A/B 类表(已有 DWD 表)\n") for schema, table in TABLES: cols = get_table_columns(cur, schema, table) if not cols: lines.append(f"### {schema}.{table}\n") lines.append("⚠️ 表不存在或无列\n") continue lines.append(f"### {schema}.{table}\n") # 现有字段 lines.append("#### 现有字段\n") lines.append("| # | 字段名 | 数据类型 | 说明 |") lines.append("|---|--------|---------|------|") for i, (col_name, col_type, col_comment) in enumerate(cols, 1): lines.append(f"| {i} | `{col_name}` | {col_type} | {col_comment} |") lines.append("") # 拟新增字段 new = NEW_FIELDS.get(table, []) if new: lines.append("#### 🆕 拟新增字段\n") lines.append("| # | 字段名 | 建议类型 | 来源 ODS 列 | 说明 |") lines.append("|---|--------|---------|------------|------|") for i, (fname, ftype, fsrc, fdesc) in enumerate(new, 1): lines.append(f"| {i} | `{fname}` | {ftype} | {fsrc} | {fdesc} |") lines.append("") # 仅补映射 mo = MAPPING_ONLY.get(table, []) if mo: lines.append("#### 🔗 仅补 FACT_MAPPINGS(DWD 列已存在)\n") lines.append("| # | DWD 列 | ODS 列 | 说明 |") lines.append("|---|--------|--------|------|") for i, (dwd_col, ods_col, desc) in enumerate(mo, 1): lines.append(f"| {i} | `{dwd_col}` | `{ods_col}` | {desc} |") lines.append("") lines.append("---\n") # ── 第二部分:跳过的字段 ── lines.append("## 第二部分:跳过的字段\n") lines.append("| # | ODS 表 | ODS 列 | 跳过原因 |") lines.append("|---|--------|--------|---------|") for i, (tbl, col, reason) in enumerate(SKIPPED, 1): lines.append(f"| {i} | {tbl} | `{col}` | {reason} |") lines.append("\n---\n") # ── 第三部分:C 类新建表 ── lines.append("## 第三部分:C 类表(需新建 DWD 表)\n") for title, ods_cols in C_CLASS_TABLES.items(): lines.append(f"### {title}\n") lines.append("| # | ODS 列名 |") lines.append("|---|---------|") for i, col in enumerate(ods_cols, 1): lines.append(f"| {i} | `{col}` |") lines.append("") lines.append("---\n") cur.close() conn.close() # 写入文件(从 .env 读取 FIELD_AUDIT_ROOT) from _env_paths import get_output_path out_dir = get_output_path("FIELD_AUDIT_ROOT") out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / "dwd_field_review.md" out_path.write_text("\n".join(lines), encoding="utf-8") print(f"✅ 已导出到 {out_path}") if __name__ == "__main__": main()