# etl_feiqiu Schema 迁移文档 --- ## 迁移 11:修复 DWD 层 BC 哨兵日期 + ETL 代码层哨兵值过滤(2026-02-22) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-22__fix_bc_sentinel_dates_to_null.sql` ### 变更说明 修复上游飞球 API 返回的哨兵日期 `0001-01-01T00:00:00`(表示"未设置")在 ODS→DWD 转换过程中产生的 BC 日期问题。 **根因**:ODS 表中 `timestamp without time zone` 类型的 `0001-01-01 00:00:00` 在转入 DWD 表的 `timestamp with time zone` 列时,PostgreSQL 在 `Asia/Shanghai` 时区下将其隐式转换为 `0001-12-31 23:59:43 BC`(公元前)。psycopg2 的 `fetchall()` 无法将 BC 日期转为 Python `datetime`,抛出 `ValueError: year -1 is out of range`。 **修复内容**: | 类型 | 说明 | |------|------| | 存量数据修复 | 6 个 DWD 表/列中 BC 日期 → NULL(约 69,668 行) | | 代码防御 | `_cast_expr` / `_merge_dim_scd2` / `_build_fact_select_exprs` 三处加哨兵值过滤 | **受影响表/列**: | 表 | 列 | 受影响行数 | |---|---|---| | `dwd.dim_assistant_ex` | `birth_date` | ~1,107 | | `dwd.dim_member_card_account_ex` | `disable_start_time` | ~18,172 | | `dwd.dim_member_card_account_ex` | `disable_end_time` | ~18,172 | | `dwd.dwd_assistant_service_log_ex` | `composite_grade_time` | ~5,297 | | `dwd.dwd_recharge_order_ex` | `revoke_time` | ~485 | | `dwd.dwd_settlement_head_ex` | `revoke_time` | ~26,435 | **DDL 结构变更**:无(不涉及表/列/索引/约束的新增、删除或修改) **数据变更**:`UPDATE ... SET col = NULL WHERE EXTRACT(year FROM col) < 1`,将 BC 日期置为 NULL **代码变更**(`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py`): 1. `_cast_expr`:对 `timestamptz` 类型转换加 `CASE WHEN (base)::timestamp >= '0002-01-01'::timestamp THEN ... ELSE NULL END` 2. `_build_fact_select_exprs`:对 `timestamp → timestamptz` 同类型列加哨兵过滤 3. `_merge_dim_scd2`:ODS→DWD SELECT 和 DWD 现有数据读取均加哨兵过滤 4. 新增类属性 `_SENTINEL_DATE_THRESHOLD = "0002-01-01"` ### 关联需求 前后端联调 BUG 12(admin-web 任务配置提交执行验证) ### 兼容性 - **ETL Connector**:代码层面已防御,后续 DWD 装载不再产生 BC 日期;v10 验证 19/19 任务全部成功 - **后端 API / 小程序**:无影响(不直接查询受影响列) - **DWS 层**:无影响(DWS 任务不引用这些时间列) - **幂等性**:迁移脚本可重复执行,WHERE 条件仅匹配 `EXTRACT(year) < 1` 的行 ### 回滚策略 存量数据回滚不可行(BC 日期本身是错误数据,无业务价值)。代码回滚: 1. 还原 `dwd_load_task.py` 中 `_cast_expr`、`_build_fact_select_exprs`、`_merge_dim_scd2` 的哨兵过滤逻辑 2. 移除 `_SENTINEL_DATE_THRESHOLD` 类属性 注意:回滚代码后重新执行 DWD 装载会重新产生 BC 日期,导致 psycopg2 报错。 ### 验证 SQL ```sql -- 1. 确认所有受影响列中不再存在 BC 日期 SELECT 'dim_assistant_ex.birth_date' AS target, COUNT(*) AS bc_count FROM dwd.dim_assistant_ex WHERE birth_date::text LIKE '%BC%' UNION ALL SELECT 'dim_member_card_account_ex.disable_start_time', COUNT(*) FROM dwd.dim_member_card_account_ex WHERE disable_start_time::text LIKE '%BC%' UNION ALL SELECT 'dim_member_card_account_ex.disable_end_time', COUNT(*) FROM dwd.dim_member_card_account_ex WHERE disable_end_time::text LIKE '%BC%' UNION ALL SELECT 'dwd_assistant_service_log_ex.composite_grade_time', COUNT(*) FROM dwd.dwd_assistant_service_log_ex WHERE composite_grade_time::text LIKE '%BC%' UNION ALL SELECT 'dwd_recharge_order_ex.revoke_time', COUNT(*) FROM dwd.dwd_recharge_order_ex WHERE revoke_time::text LIKE '%BC%' UNION ALL SELECT 'dwd_settlement_head_ex.revoke_time', COUNT(*) FROM dwd.dwd_settlement_head_ex WHERE revoke_time::text LIKE '%BC%'; -- 预期:所有 bc_count = 0 -- 2. 确认受影响列中 NULL 值数量合理(哨兵值已转为 NULL) SELECT 'dim_assistant_ex.birth_date' AS target, COUNT(*) FILTER (WHERE birth_date IS NULL) AS null_count, COUNT(*) AS total FROM dwd.dim_assistant_ex WHERE scd2_is_current = 1 UNION ALL SELECT 'dwd_settlement_head_ex.revoke_time', COUNT(*) FILTER (WHERE revoke_time IS NULL), COUNT(*) FROM dwd.dwd_settlement_head_ex; -- 预期:null_count > 0(哨兵值行已转为 NULL) -- 3. 确认 DWD 层无 EXTRACT(year) < 1 的异常日期 SELECT 'dim_assistant_ex' AS tbl, COUNT(*) AS bad FROM dwd.dim_assistant_ex WHERE EXTRACT(year FROM birth_date) < 1 UNION ALL SELECT 'dim_member_card_account_ex(start)', COUNT(*) FROM dwd.dim_member_card_account_ex WHERE EXTRACT(year FROM disable_start_time) < 1 UNION ALL SELECT 'dim_member_card_account_ex(end)', COUNT(*) FROM dwd.dim_member_card_account_ex WHERE EXTRACT(year FROM disable_end_time) < 1 UNION ALL SELECT 'dwd_assistant_service_log_ex', COUNT(*) FROM dwd.dwd_assistant_service_log_ex WHERE EXTRACT(year FROM composite_grade_time) < 1 UNION ALL SELECT 'dwd_recharge_order_ex', COUNT(*) FROM dwd.dwd_recharge_order_ex WHERE EXTRACT(year FROM revoke_time) < 1 UNION ALL SELECT 'dwd_settlement_head_ex', COUNT(*) FROM dwd.dwd_settlement_head_ex WHERE EXTRACT(year FROM revoke_time) < 1; -- 预期:所有 bad = 0 -- 4. 确认 ETL v10 执行后无新增 BC 日期(重新装载后仍为 NULL) SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name IN ('dim_assistant_ex', 'dim_member_card_account_ex', 'dwd_assistant_service_log_ex', 'dwd_recharge_order_ex', 'dwd_settlement_head_ex') AND data_type = 'timestamp with time zone' ORDER BY table_name, column_name; -- 预期:列出所有 timestamptz 列,确认类型未变 ``` --- ## 迁移 10:移除 ods.settlement_ticket_details 表(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__remove_settlement_ticket_details.sql` ### 变更说明 彻底移除 `ods.settlement_ticket_details` 表及其关联资源。该表对应的上游 API 数据已由 `ods.settlement_records` 完全覆盖,不再需要独立的结算小票明细表。 | 移除项 | 说明 | |--------|------| | `ods.settlement_ticket_details` 表 | ODS 结算小票明细表(含 CREATE TABLE + 全部 COMMENT) | | `ods.idx_ods_settlement_ticket_details_latest` 索引 | 取最新版本复合索引 | | `meta.ods_task_registry` 中 `ODS_SETTLEMENT_TICKET` 条目 | 任务注册记录 | | `db/etl_feiqiu/seeds/seed_ods_tasks.sql` 中 `ODS_SETTLEMENT_TICKET` | 种子数据条目 | | `db/etl_feiqiu/schemas/ods.sql` 中建表语句和注释 | DDL 源文件 | | `db/etl_feiqiu/schemas/schema_ODS_doc.sql` 中建表语句和注释 | 旧版 DDL 文档文件 | ### 关联需求 `dataflow-field-completion` Requirements 11.1–11.8 ### 兼容性 - **ETL Connector**:`ODS_SETTLEMENT_TICKET` 任务已从任务注册和种子数据中移除,不再调度 - **后端 API / 小程序**:无下游依赖此表,无影响 - **DWD / DWS 层**:无 DWD/DWS 表映射自此 ODS 表,无影响 ### 回滚策略 1. 从 git 历史恢复 `ods.sql` 和 `schema_ODS_doc.sql` 中的建表语句 2. 重新创建表和索引: ```sql -- 从 git 历史中提取原始 CREATE TABLE 语句执行 -- 重建索引 CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ods_settlement_ticket_details_latest ON ods.settlement_ticket_details (orderSettleId, fetched_at DESC); -- 重新注册任务 INSERT INTO meta.ods_task_registry (task_code, store_id, enabled) VALUES ('ODS_SETTLEMENT_TICKET', 2790685415443269, TRUE) ON CONFLICT DO NOTHING; ``` ### 验证 SQL ```sql -- 1. 确认表已不存在 SELECT table_name FROM information_schema.tables WHERE table_schema = 'ods' AND table_name = 'settlement_ticket_details'; -- 预期:0 行 -- 2. 确认索引已不存在 SELECT indexname FROM pg_indexes WHERE schemaname = 'ods' AND indexname = 'idx_ods_settlement_ticket_details_latest'; -- 预期:0 行 -- 3. 确认任务注册已移除 SELECT task_code FROM meta.ods_task_registry WHERE task_code = 'ODS_SETTLEMENT_TICKET'; -- 预期:0 行 ``` --- ## 迁移 9:为 dim_table_ex 新增 14 个字段(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__add_dim_table_ex_fields.sql` ### 变更说明 为 `dwd.dim_table_ex` 新增 14 个字段,映射自 ODS `site_tables_master` 中已有的同名列(部分驼峰式在 PostgreSQL 中已小写化)。 | 新增列 | 类型 | 业务含义 | |--------|------|---------| | `create_time` | TIMESTAMPTZ | 台桌配置的创建时间或最近一次创建/复制时间 | | `light_status` | INTEGER | 台灯状态枚举(如 2=已开灯) | | `tablestatusname` | TEXT | 台桌状态中文名称(如"空闲中""使用中"),仅展示用途 | | `sitename` | TEXT | 门店名称快照,冗余字段,配合 site_id 使用 | | `applet_qr_code_url` | TEXT | 小程序二维码 URL,用于扫码开台等场景 | | `audit_status` | INTEGER | 审核状态枚举(当前全部为 2) | | `charge_free` | INTEGER | 是否免费台(0=收费,1=免费) | | `delay_lights_time` | INTEGER | 台灯熄灭延迟时间,结账后延时关灯 | | `is_rest_area` | INTEGER | 是否休息区台桌(0=否,1=是) | | `only_allow_groupon` | INTEGER | 是否仅允许团购开台(0/1/2 枚举) | | `order_delay_time` | INTEGER | 订单自动延时时长 | | `self_table` | INTEGER | 是否自有台桌(1=自有) | | `temporary_light_second` | INTEGER | 临时开灯秒数 | | `virtual_table` | INTEGER | 是否虚拟台桌(0=实体台,1=虚拟台) | **DDL 结构变更**:`ALTER TABLE ... ADD COLUMN` × 14 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dim_table_ex"]` 新增 14 个映射条目。 **ODS→DWD 列名映射说明**: - 大部分字段为同名直通映射(ODS 列名在 PG 中已小写化) - `applet_qr_code_url` ← ODS `"appletQrCodeUrl"`(ODS DDL 中用双引号保留驼峰大小写,FACT_MAPPINGS 中需带引号引用) - `tablestatusname` ← ODS `tablestatusname`(DDL 中写 `tableStatusName`,PG 自动小写化) - `sitename` ← ODS `sitename`(DDL 中写 `siteName`,PG 自动小写化) ### 关联需求 `dataflow-field-completion` Requirements 9.1, 9.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新列数据;ODS 源列已存在 - **后端 API / 小程序**:新增列不影响现有查询;如需使用新字段需在下游查询中显式引用 - **DWS 层**:当前无 DWS 任务引用这 14 列,无影响 ### 回滚策略 ```sql BEGIN; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS create_time; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS light_status; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS tablestatusname; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS sitename; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS applet_qr_code_url; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS audit_status; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS charge_free; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS delay_lights_time; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS is_rest_area; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS only_allow_groupon; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS order_delay_time; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS self_table; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS temporary_light_second; ALTER TABLE dwd.dim_table_ex DROP COLUMN IF EXISTS virtual_table; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认 14 个新列均已创建且类型正确 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dim_table_ex' AND column_name IN ( 'create_time', 'light_status', 'tablestatusname', 'sitename', 'applet_qr_code_url', 'audit_status', 'charge_free', 'delay_lights_time', 'is_rest_area', 'only_allow_groupon', 'order_delay_time', 'self_table', 'temporary_light_second', 'virtual_table' ) ORDER BY column_name; -- 2. 确认列注释已添加(应返回 14 行) SELECT column_name, col_description( (SELECT oid FROM pg_class WHERE relname = 'dim_table_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), attnum ) AS comment FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dim_table_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname IN ( 'create_time', 'light_status', 'tablestatusname', 'sitename', 'applet_qr_code_url', 'audit_status', 'charge_free', 'delay_lights_time', 'is_rest_area', 'only_allow_groupon', 'order_delay_time', 'self_table', 'temporary_light_second', 'virtual_table' ); -- 3. 确认 ODS 源表中对应列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = 'site_tables_master' AND column_name IN ( 'create_time', 'light_status', 'tablestatusname', 'sitename', 'appletQrCodeUrl', 'audit_status', 'charge_free', 'delay_lights_time', 'is_rest_area', 'only_allow_groupon', 'order_delay_time', 'self_table', 'temporary_light_second', 'virtual_table' ) ORDER BY column_name; -- 4. 重新加载后抽样验证新列有数据 SELECT create_time, light_status, tablestatusname, sitename, applet_qr_code_url, audit_status, charge_free FROM dwd.dim_table_ex WHERE scd2_is_current = 1 LIMIT 10; ``` --- ## 迁移 8:为 dwd_member_balance_change_ex 新增 relate_id 字段(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__add_member_balance_change_ex_relate_id.sql` ### 变更说明 为 `dwd.dwd_member_balance_change_ex` 新增 1 个字段,映射自 ODS `member_balance_changes` 中已有的同名列。 | 新增列 | 类型 | 业务含义 | |--------|------|---------| | `relate_id` | BIGINT | 关联业务单据 ID,指向触发本次余额变动的业务记录(充值单、订单、结算单等),按 `from_type` 不同指向不同表 | **DDL 结构变更**:`ALTER TABLE ... ADD COLUMN` × 1 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dwd_member_balance_change_ex"]` 新增 1 个映射条目: - `("relate_id", "relate_id", None)` ### 关联需求 `dataflow-field-completion` Requirements 5.1, 5.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新列数据;ODS 源列已存在,映射为同名直通 - **后端 API / 小程序**:新增列不影响现有查询;如需使用新字段需在下游查询中显式引用 - **DWS 层**:当前无 DWS 任务引用此列,无影响 ### 回滚策略 ```sql BEGIN; ALTER TABLE dwd.dwd_member_balance_change_ex DROP COLUMN IF EXISTS relate_id; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认新列已创建且类型正确 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_member_balance_change_ex' AND column_name = 'relate_id'; -- 2. 确认列注释已添加 SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dwd_member_balance_change_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dwd_member_balance_change_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'relate_id') ) AS relate_id_comment; -- 3. 确认 ODS 源表中同名列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = 'member_balance_changes' AND column_name = 'relate_id'; -- 4. 重新加载后抽样验证新列有数据 SELECT relate_id, balance_change_id FROM dwd.dwd_member_balance_change_ex WHERE relate_id IS NOT NULL LIMIT 10; ``` --- ## 迁移 7:为 dwd_assistant_service_log_ex 新增 2 个字段(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__add_assistant_service_log_ex_fields.sql` ### 变更说明 为 `dwd.dwd_assistant_service_log_ex` 新增 2 个字段,映射自 ODS `assistant_service_records` 中已有的同名列。 | 新增列 | 类型 | 业务含义 | |--------|------|---------| | `operator_id` | BIGINT | 操作员 ID,录入/结算这条助教服务的员工 | | `operator_name` | TEXT | 操作员姓名(带职位前缀),便于直接阅读 | **DDL 结构变更**:`ALTER TABLE ... ADD COLUMN` × 2 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dwd_assistant_service_log_ex"]` 新增 2 个映射条目: - `("operator_id", "operator_id", None)` - `("operator_name", "operator_name", None)` ### 关联需求 `dataflow-field-completion` Requirements 2.1, 2.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新列数据;ODS 源列已存在,映射为同名直通 - **后端 API / 小程序**:新增列不影响现有查询;如需使用新字段需在下游查询中显式引用 - **DWS 层**:当前无 DWS 任务引用这 2 列,无影响 ### 回滚策略 ```sql BEGIN; ALTER TABLE dwd.dwd_assistant_service_log_ex DROP COLUMN IF EXISTS operator_id; ALTER TABLE dwd.dwd_assistant_service_log_ex DROP COLUMN IF EXISTS operator_name; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认 2 个新列均已创建且类型正确 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_assistant_service_log_ex' AND column_name IN ('operator_id', 'operator_name') ORDER BY column_name; -- 2. 确认列注释已添加 SELECT column_name, col_description( (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), attnum ) AS comment FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname IN ('operator_id', 'operator_name'); -- 3. 确认 ODS 源表中同名列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = 'assistant_service_records' AND column_name IN ('operator_id', 'operator_name') ORDER BY column_name; -- 4. 重新加载后抽样验证新列有数据 SELECT operator_id, operator_name FROM dwd.dwd_assistant_service_log_ex WHERE operator_id IS NOT NULL LIMIT 10; ``` --- ## 迁移 6:为 dim_assistant_ex 新增 4 个字段(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__add_dim_assistant_ex_fields.sql` ### 变更说明 为 `dwd.dim_assistant_ex` 新增 4 个字段,映射自 ODS `assistant_accounts_master` 中已有的同名列。 | 新增列 | 类型 | 业务含义 | |--------|------|---------| | `system_role_id` | BIGINT | 系统角色 ID,标识助教在系统中的角色类型 | | `job_num` | TEXT | 工号,助教的内部编号标识 | | `cx_unit_price` | NUMERIC(18,2) | 促销单价(元),助教提供促销服务时的计费单价 | | `pd_unit_price` | NUMERIC(18,2) | 陪打单价(元),助教提供陪打服务时的计费单价 | **DDL 结构变更**:`ALTER TABLE ... ADD COLUMN` × 4 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dim_assistant_ex"]` 新增 4 个映射条目: - `("system_role_id", "system_role_id", None)` - `("job_num", "job_num", None)` - `("cx_unit_price", "cx_unit_price", None)` - `("pd_unit_price", "pd_unit_price", None)` ### 关联需求 `dataflow-field-completion` Requirements 1.1, 1.2, 1.3 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新列数据;ODS 源列已存在,映射为同名直通 - **后端 API / 小程序**:新增列不影响现有查询;如需使用新字段需在下游查询中显式引用 - **DWS 层**:当前无 DWS 任务引用这 4 列,无影响 ### 回滚策略 ```sql BEGIN; ALTER TABLE dwd.dim_assistant_ex DROP COLUMN IF EXISTS system_role_id; ALTER TABLE dwd.dim_assistant_ex DROP COLUMN IF EXISTS job_num; ALTER TABLE dwd.dim_assistant_ex DROP COLUMN IF EXISTS cx_unit_price; ALTER TABLE dwd.dim_assistant_ex DROP COLUMN IF EXISTS pd_unit_price; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认 4 个新列均已创建且类型正确 SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dim_assistant_ex' AND column_name IN ('system_role_id', 'job_num', 'cx_unit_price', 'pd_unit_price') ORDER BY column_name; -- 2. 确认列注释已添加 SELECT column_name, col_description( (SELECT oid FROM pg_class WHERE relname = 'dim_assistant_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), attnum ) AS comment FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dim_assistant_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname IN ('system_role_id', 'job_num', 'cx_unit_price', 'pd_unit_price'); -- 3. 确认 ODS 源表中同名列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = 'assistant_accounts_master' AND column_name IN ('system_role_id', 'job_num', 'cx_unit_price', 'pd_unit_price') ORDER BY column_name; -- 4. 重新加载后抽样验证新列有数据 SELECT system_role_id, job_num, cx_unit_price, pd_unit_price FROM dwd.dim_assistant_ex WHERE scd2_is_current = 1 LIMIT 10; ``` --- ## 迁移 5:修正 dim_store_goods.batch_stock_qty 和 dim_store_goods_ex.provisional_total_cost 映射源(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__fix_store_goods_master_mapping.sql` ### 变更说明 修正 `dwd.dim_store_goods.batch_stock_qty` 和 `dwd.dim_store_goods_ex.provisional_total_cost` 的 ODS 映射源错误。 | 项目 | 修正前 | 修正后 | |------|--------|--------| | `batch_stock_qty` 的 ODS 源 | `stock`(当前库存) | `batch_stock_quantity`(批次库存) | | `provisional_total_cost` 的 ODS 源 | `total_purchase_cost`(实际采购成本) | `provisional_total_cost`(暂估成本) | **DDL 层面无结构变更**(两列均已存在),仅更新列注释以反映正确的 ODS 来源。 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中: - `FACT_MAPPINGS["dwd.dim_store_goods"]` 的 `batch_stock_qty` 条目从 `("batch_stock_qty", "stock", None)` 改为 `("batch_stock_qty", "batch_stock_quantity", None)` - `FACT_MAPPINGS["dwd.dim_store_goods_ex"]` 的 `provisional_total_cost` 条目从 `("provisional_total_cost", "total_purchase_cost", None)` 改为 `("provisional_total_cost", "provisional_total_cost", None)` ### 关联需求 `dataflow-field-completion` Requirements 10.3 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以修正历史数据中两列的值 - **后端 API / 小程序**:如有下游查询依赖 `batch_stock_qty` 或 `provisional_total_cost`,修正后数据将更准确 - **DWS 层**:当前无 DWS 任务引用这两列,无影响 ### 回滚策略 1. 将 `FACT_MAPPINGS` 中 `batch_stock_qty` 的 ODS 源改回 `stock` 2. 将 `FACT_MAPPINGS` 中 `provisional_total_cost` 的 ODS 源改回 `total_purchase_cost` 3. 恢复 `dwd.sql` 中的原始 COMMENT 4. 重新执行 DWD 加载任务 ```sql BEGIN; COMMENT ON COLUMN dwd.dim_store_goods.batch_stock_qty IS '【说明】数量/时长字段,用于统计与计量。 【示例】18(数量/时长字段,用于统计与计量)。 【ODS来源】store_goods_master - stock。 【JSON字段】store_goods_master.json - data.orderGoodsList - stock。'; COMMENT ON COLUMN dwd.dim_store_goods_ex.provisional_total_cost IS '【说明】金额字段,用于计费/结算/核算等金额计算。 【示例】0.0(金额字段,用于计费/结算/核算等金额计算)。 【ODS来源】store_goods_master - total_purchase_cost。 【JSON字段】store_goods_master.json - data.orderGoodsList - total_purchase_cost。'; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认 batch_stock_qty 注释已更新为 batch_stock_quantity SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'batch_stock_qty') ) AS batch_stock_qty_comment; -- 2. 确认 provisional_total_cost 注释已更新 SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'provisional_total_cost') ) AS provisional_total_cost_comment; -- 3. 确认两列均存在且类型未变 SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = 'dwd' AND ( (table_name = 'dim_store_goods' AND column_name = 'batch_stock_qty') OR (table_name = 'dim_store_goods_ex' AND column_name = 'provisional_total_cost') ) ORDER BY table_name, column_name; -- 4. 重新加载后抽样验证 batch_stock_qty 来自 batch_stock_quantity SELECT g.batch_stock_qty AS dwd_val, o.batch_stock_quantity AS ods_batch, o.stock AS ods_stock FROM dwd.dim_store_goods g JOIN (SELECT DISTINCT ON (id) * FROM ods.store_goods_master ORDER BY id, fetched_at DESC) o ON g.site_goods_id = o.id WHERE o.batch_stock_quantity IS DISTINCT FROM o.stock LIMIT 20; ``` --- ## 迁移 4:修正 dwd_store_goods_sale.discount_price 列名误导(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__fix_store_goods_sale_discount_price.sql` ### 变更说明 修正 `dwd.dwd_store_goods_sale.discount_price` 列名语义误导问题。 | 项目 | 修正前 | 修正后 | |------|--------|--------| | DWD `discount_price` 列 | 映射自 ODS `discount_money`(折扣金额),列名与实际语义不符 | 重命名为 `discount_money`,反映真实语义 | | DWD `discount_price` 列(新增) | 不存在 | 新增,映射自 ODS `discount_price`(折后单价) | **DDL 结构变更**: 1. `RENAME COLUMN discount_price TO discount_money` 2. `ADD COLUMN discount_price NUMERIC(18,2)` **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dwd_store_goods_sale"]`: - 原 `("discount_price", "discount_money", None)` → `("discount_money", "discount_money", None)` - 新增 `("discount_price", "discount_price", None)` ### 关联需求 `dataflow-field-completion` Requirements 4.1, 4.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新 `discount_price` 列数据;`discount_money` 列数据在 RENAME 后自动保留 - **后端 API / 小程序**:如有下游查询引用 `discount_price` 列,需注意该列语义已变更(原为折扣金额,现为折后单价);原折扣金额数据现在位于 `discount_money` 列 - **DWS 层**:当前无 DWS 任务引用此列,无影响 ### 回滚策略 1. 删除新增的 `discount_price` 列 2. 将 `discount_money` 重命名回 `discount_price` 3. 恢复 `FACT_MAPPINGS` 和 `dwd.sql` 中的原始定义 ```sql BEGIN; ALTER TABLE dwd.dwd_store_goods_sale DROP COLUMN IF EXISTS discount_price; ALTER TABLE dwd.dwd_store_goods_sale RENAME COLUMN discount_money TO discount_price; COMMENT ON COLUMN dwd.dwd_store_goods_sale.discount_price IS '【说明】金额字段,用于计费/结算/核算等金额计算。 【示例】0.0(金额字段,用于计费/结算/核算等金额计算)。 【ODS来源】store_goods_sales_records - discount_money。 【JSON字段】store_goods_sales_records.json - data.orderGoodsLedgers - discount_money。'; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认 discount_money 列存在且类型正确 SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_store_goods_sale' AND column_name = 'discount_money'; -- 2. 确认 discount_price 列已新增且类型正确 SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_store_goods_sale' AND column_name = 'discount_price'; -- 3. 确认两列均存在(应返回 2 行) SELECT column_name FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_store_goods_sale' AND column_name IN ('discount_money', 'discount_price') ORDER BY column_name; -- 4. 重新加载后抽样验证两列值不同 SELECT discount_money, discount_price, (discount_money = discount_price) AS same FROM dwd.dwd_store_goods_sale WHERE discount_price IS NOT NULL LIMIT 20; ``` --- ## 迁移 3:修正 dwd_assistant_service_log.site_assistant_id 映射源(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__fix_assistant_service_site_assistant_id.sql` ### 变更说明 修正 `dwd.dwd_assistant_service_log.site_assistant_id` 的映射源错误。 | 项目 | 修正前 | 修正后 | |------|--------|--------| | `site_assistant_id` 的 ODS 源 | `order_assistant_id`(订单级助教明细 ID) | `site_assistant_id`(门店维度助教档案 ID) | | `order_assistant_id` 列 | 已存在于 DDL,但被 `site_assistant_id` 的映射占用 | 恢复为同名自动映射 | **DDL 层面无结构变更**(两列均已存在),仅更新列注释以反映正确的 ODS 来源。 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dwd_assistant_service_log"]` 的 `site_assistant_id` 条目从 `("site_assistant_id", "order_assistant_id", None)` 改为 `("site_assistant_id", "site_assistant_id", None)`。 ### 关联需求 `dataflow-field-completion` Requirements 2.1, 2.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以修正历史数据中 `site_assistant_id` 的值 - **后端 API / 小程序**:如有下游查询依赖 `site_assistant_id` 关联助教档案,修正后数据将更准确 - **DWS 层**:依赖 `site_assistant_id` 的汇总表(如助教业绩)需在 DWD 重新加载后重新计算 ### 回滚策略 1. 将 `FACT_MAPPINGS` 中 `site_assistant_id` 的 ODS 源改回 `order_assistant_id` 2. 恢复 `dwd.sql` 中的原始 COMMENT 3. 重新执行 DWD 加载任务 ```sql -- 恢复原始 COMMENT COMMENT ON COLUMN dwd.dwd_assistant_service_log.site_assistant_id IS '【说明】标识类 ID 字段,用于关联/定位相关实体。 【示例】2957788717240005。 【ODS来源】assistant_service_records - order_assistant_id。'; COMMENT ON COLUMN dwd.dwd_assistant_service_log.order_assistant_id IS '【说明】标识类 ID 字段,用于关联/定位相关实体。 【示例】2957788717240005。 【ODS来源】assistant_service_records - order_assistant_id。'; ``` ### 验证 SQL ```sql -- 1. 确认 site_assistant_id 注释已更新为正确的 ODS 来源 SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'site_assistant_id') ); -- 2. 确认 order_assistant_id 注释已更新 SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_service_log' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'order_assistant_id') ); -- 3. 重新加载后抽样验证两列值不同(site_assistant_id 应为档案级 ID) SELECT site_assistant_id, order_assistant_id, (site_assistant_id = order_assistant_id) AS same FROM dwd.dwd_assistant_service_log LIMIT 20; ``` --- ## 迁移 2:ODS "取最新版本"复合索引(2026-02-17) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-17__add_ods_latest_version_indexes.sql` ### 变更说明 为全部 23 张 ODS 表添加 `(业务主键, fetched_at DESC)` 复合索引,支持 `DISTINCT ON (pk) ORDER BY pk, fetched_at DESC` 查询模式高效取每条业务记录的最新版本。 索引命名规范:`idx_ods_{table_name}_latest` | # | 表名 | 业务主键列 | 索引名 | |---|------|-----------|--------| | 1 | assistant_accounts_master | id | idx_ods_assistant_accounts_master_latest | | 2 | settlement_records | id | idx_ods_settlement_records_latest | | 3 | table_fee_transactions | id | idx_ods_table_fee_transactions_latest | | 4 | assistant_service_records | id | idx_ods_assistant_service_records_latest | | 5 | assistant_cancellation_records | id | idx_ods_assistant_cancellation_records_latest | | 6 | store_goods_sales_records | id | idx_ods_store_goods_sales_records_latest | | 7 | payment_transactions | id | idx_ods_payment_transactions_latest | | 8 | refund_transactions | id | idx_ods_refund_transactions_latest | | 9 | platform_coupon_redemption_records | id | idx_ods_platform_coupon_redemption_records_latest | | 10 | member_profiles | id | idx_ods_member_profiles_latest | | 11 | member_stored_value_cards | id | idx_ods_member_stored_value_cards_latest | | 12 | member_balance_changes | id | idx_ods_member_balance_changes_latest | | 13 | recharge_settlements | id | idx_ods_recharge_settlements_latest | | 14 | group_buy_packages | id | idx_ods_group_buy_packages_latest | | 15 | group_buy_redemption_records | id | idx_ods_group_buy_redemption_records_latest | | 16 | goods_stock_summary | siteGoodsId | idx_ods_goods_stock_summary_latest | | 17 | goods_stock_movements | siteGoodsStockId | idx_ods_goods_stock_movements_latest | | 18 | site_tables_master | id | idx_ods_site_tables_master_latest | | 19 | stock_goods_category_tree | id | idx_ods_stock_goods_category_tree_latest | | 20 | store_goods_master | id | idx_ods_store_goods_master_latest | | 21 | table_fee_discount_records | id | idx_ods_table_fee_discount_records_latest | | 22 | tenant_goods_master | id | idx_ods_tenant_goods_master_latest | | ~~23~~ | ~~settlement_ticket_details~~ | ~~orderSettleId~~ | ~~idx_ods_settlement_ticket_details_latest~~(已由迁移 10 移除) | ### 关联需求 `ods-dedup-standardize` Requirements 6.1, 6.2, 6.3 ### 兼容性 - **非破坏性变更**:仅新增索引,不修改表结构、不影响现有数据和写入逻辑 - **ETL Connector**:无需改动;索引自动加速 `skip_unchanged` 去重查询和 `_mark_missing_as_deleted` 快照对比查询 - **后端 API / 小程序**:不受影响(不直接查询 ODS 层) - **`CREATE INDEX CONCURRENTLY`**:在线创建,不阻塞表的读写操作;但不能在事务块内执行,需逐条运行或使用支持单语句模式的工具 - **DDL 源文件**:索引定义已同步写入 `db/etl_feiqiu/schemas/ods.sql`,新环境初始化时自动创建 ### 回滚策略 逐条删除索引即可,不影响数据: ```sql DROP INDEX IF EXISTS ods.idx_ods_assistant_accounts_master_latest; DROP INDEX IF EXISTS ods.idx_ods_settlement_records_latest; DROP INDEX IF EXISTS ods.idx_ods_table_fee_transactions_latest; DROP INDEX IF EXISTS ods.idx_ods_assistant_service_records_latest; DROP INDEX IF EXISTS ods.idx_ods_assistant_cancellation_records_latest; DROP INDEX IF EXISTS ods.idx_ods_store_goods_sales_records_latest; DROP INDEX IF EXISTS ods.idx_ods_payment_transactions_latest; DROP INDEX IF EXISTS ods.idx_ods_refund_transactions_latest; DROP INDEX IF EXISTS ods.idx_ods_platform_coupon_redemption_records_latest; DROP INDEX IF EXISTS ods.idx_ods_member_profiles_latest; DROP INDEX IF EXISTS ods.idx_ods_member_stored_value_cards_latest; DROP INDEX IF EXISTS ods.idx_ods_member_balance_changes_latest; DROP INDEX IF EXISTS ods.idx_ods_recharge_settlements_latest; DROP INDEX IF EXISTS ods.idx_ods_group_buy_packages_latest; DROP INDEX IF EXISTS ods.idx_ods_group_buy_redemption_records_latest; DROP INDEX IF EXISTS ods.idx_ods_goods_stock_summary_latest; DROP INDEX IF EXISTS ods.idx_ods_goods_stock_movements_latest; DROP INDEX IF EXISTS ods.idx_ods_site_tables_master_latest; DROP INDEX IF EXISTS ods.idx_ods_stock_goods_category_tree_latest; DROP INDEX IF EXISTS ods.idx_ods_store_goods_master_latest; DROP INDEX IF EXISTS ods.idx_ods_table_fee_discount_records_latest; DROP INDEX IF EXISTS ods.idx_ods_tenant_goods_master_latest; -- idx_ods_settlement_ticket_details_latest 已由迁移 10 随表一起移除 ``` ### 验证 SQL ```sql -- 1. 验证 23 个索引均已创建 SELECT indexname, tablename FROM pg_indexes WHERE schemaname = 'ods' AND indexname LIKE 'idx_ods_%_latest' ORDER BY indexname; -- 2. 验证索引数量为 23 SELECT COUNT(*) AS index_count FROM pg_indexes WHERE schemaname = 'ods' AND indexname LIKE 'idx_ods_%_latest'; -- 3. 验证索引列定义正确(以 member_profiles 为例) SELECT indexdef FROM pg_indexes WHERE schemaname = 'ods' AND indexname = 'idx_ods_member_profiles_latest'; -- 4. 验证索引可用(非 INVALID 状态) SELECT c.relname AS index_name, i.indisvalid FROM pg_index i JOIN pg_class c ON c.oid = i.indexrelid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'ods' AND c.relname LIKE 'idx_ods_%_latest' AND i.indisvalid = false; -- 预期结果:0 行(无无效索引) ``` --- ## 迁移 1:六层 Schema 架构重组 ### 变更说明 将现有 4 个 schema 重组为 6 层 schema 架构: | 原 Schema | 新 Schema | 文件 | 说明 | |-----------|-----------|------|------| | etl_admin | meta | meta.sql | 调度、游标、运行记录(3 表) | | billiards_ods | ods | ods.sql | ODS 原始数据(23 表) | | billiards_dwd | dwd | dwd.sql | DWD 明细,保留 main+EX 拆分(40 表) | | (新增) | core | core.sql | 统一维度/事实最小字段集(7 表) | | billiards_dws | dws | dws.sql | DWS 汇总(29 表) | | (新增) | app | app.sql | 视图+RLS(7 视图,6 策略) | ### 新增表(core schema) - core.dim_site — 门店维度核心字段 - core.dim_member — 会员维度核心字段 - core.dim_assistant — 助教维度核心字段 - core.dim_table — 台桌维度核心字段 - core.dim_goods_category — 商品分类维度核心字段 - core.fact_settlement — 结算事实核心字段 - core.fact_payment — 支付事实核心字段 ### 新增视图(app schema) - pp.v_site — 门店视图 - pp.v_member — 会员视图 - pp.v_assistant — 助教视图 - pp.v_assistant_daily — 助教日明细视图 - pp.v_finance_daily — 财务日报视图 - pp.v_member_consumption — 会员消费汇总视图 - pp.v_order_summary — 订单汇总视图 ### RLS 策略 - 所有 core 表启用 ROW LEVEL SECURITY - 策略基于 current_setting('app.current_site_id')::bigint 过滤 - 角色 pp_reader 仅有 SELECT 权限 ## 兼容性 - **ETL Connector**:所有代码中的 schema 引用已更新完成(etl_admin → meta, billiards_ods → ods, billiards_dwd → dwd, billiards_dws → dws) - **后端 API**:etl_status 路由已更新为 meta.etl_cursor;通过 app schema 视图访问,无需直接引用底层表 - **管理后台**:已由 `apps/admin-web/` Web 管理后台完全替代原 PySide6 桌面 GUI,通过后端 API 访问数据 - **小程序**:通过 FDW 映射 app schema,不受影响 ## 回滚策略 1. 删除新 schema:DROP SCHEMA IF EXISTS meta, ods, dwd, core, dws, app CASCADE; 2. 重建原 schema:执行原始 schema_etl_admin.sql、schema_ODS_doc.sql、schema_dwd_doc.sql、schema_dws.sql 3. 原始 DDL 文件保留在 db/etl_feiqiu/schemas/schema_*.sql 作为参考 ## 验证 SQL `sql -- 1. 验证六个 schema 均已创建 SELECT schema_name FROM information_schema.schemata WHERE schema_name IN ('meta', 'ods', 'dwd', 'core', 'dws', 'app') ORDER BY schema_name; -- 2. 验证各 schema 表数量 SELECT table_schema, COUNT(*) AS table_count FROM information_schema.tables WHERE table_schema IN ('meta', 'ods', 'dwd', 'core', 'dws', 'app') GROUP BY table_schema ORDER BY table_schema; -- 3. 验证 RLS 策略已启用 SELECT schemaname, tablename, rowsecurity FROM pg_tables WHERE schemaname = 'core' AND rowsecurity = true; -- 4. 验证 app_reader 角色存在 SELECT rolname FROM pg_roles WHERE rolname = 'app_reader'; ` ## 迁移 11:新建 dwd.dwd_goods_stock_summary 表(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__create_dwd_goods_stock_summary.sql` ### 变更说明 新建 `dwd.dwd_goods_stock_summary` 表,用于存储库存汇总快照数据。 | 项目 | 说明 | |------|------| | 新建表 | `dwd.dwd_goods_stock_summary`(14 个业务字段 + site_id/tenant_id/fetched_at) | | 主键 | `(site_goods_id, fetched_at)`,支持同一商品多时间窗口快照 | | ODS 配置变更 | `ODS_INVENTORY_STOCK`:`requires_window=True` + `time_fields=("startTime", "endTime")` | | 代码变更 | `TABLE_MAP` 和 `FACT_MAPPINGS` 中注册新表,14 个 ODS 驼峰→DWD 蛇形映射 | **新增字段**: | DWD 列 | 类型 | ODS 源列 | 业务含义 | |--------|------|---------|---------| | `site_goods_id` | BIGINT (PK) | `"siteGoodsId"` | 门店商品 ID | | `goods_name` | TEXT | `"goodsName"` | 商品名称 | | `goods_unit` | TEXT | `"goodsUnit"` | 计量单位 | | `goods_category_id` | BIGINT | `"goodsCategoryId"` | 一级分类 ID | | `goods_category_second_id` | BIGINT | `"goodsCategorySecondId"` | 二级分类 ID | | `category_name` | TEXT | `"categoryName"` | 分类名称 | | `range_start_stock` | NUMERIC | `"rangeStartStock"` | 期初库存 | | `range_end_stock` | NUMERIC | `"rangeEndStock"` | 期末库存 | | `range_in` | NUMERIC | `"rangeIn"` | 入库数量 | | `range_out` | NUMERIC | `"rangeOut"` | 出库数量 | | `range_sale` | NUMERIC | `"rangeSale"` | 销售数量 | | `range_sale_money` | NUMERIC(12,2) | `"rangeSaleMoney"` | 销售金额 | | `range_inventory` | NUMERIC | `"rangeInventory"` | 盘点调整量 | | `current_stock` | NUMERIC | `"currentStock"` | 当前库存 | ### 关联需求 `dataflow-field-completion` Requirements 7.1, 7.2, 7.3, 7.4 ### 兼容性 - **ETL Connector**:新增表,不影响现有流程;ODS 配置变更后需重新采集历史数据(按时间窗口分批) - **后端 API / 小程序**:无影响,新表需下游显式引用 - **DWS 层**:此表为 DWS 库存汇总(迁移 13)的数据源 ### 回滚策略 ```sql DROP TABLE IF EXISTS dwd.dwd_goods_stock_summary; -- 恢复 ODS 配置:requires_window=False,移除 time_fields -- 恢复 TABLE_MAP 和 FACT_MAPPINGS:移除 dwd.dwd_goods_stock_summary 条目 ``` ### 验证 SQL ```sql -- 1. 确认表已创建 SELECT table_name FROM information_schema.tables WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_summary'; -- 2. 确认列数量正确(17 列:14 业务 + site_id + tenant_id + fetched_at) SELECT count(*) FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_summary'; -- 3. 确认主键约束 SELECT constraint_name, column_name FROM information_schema.key_column_usage WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_summary' ORDER BY ordinal_position; -- 4. 确认 ODS 源表存在 SELECT table_name FROM information_schema.tables WHERE table_schema = 'ods' AND table_name = 'goods_stock_summary'; ``` --- ## 迁移 12:新建 dwd.dwd_goods_stock_movement 表(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__create_dwd_goods_stock_movement.sql` ### 变更说明 新建 `dwd.dwd_goods_stock_movement` 表,用于存储库存变动流水明细。 | 项目 | 说明 | |------|------| | 新建表 | `dwd.dwd_goods_stock_movement`(19 个业务字段 + fetched_at) | | 主键 | `site_goods_stock_id`(库存变动记录 ID) | | 加载模式 | 事实表,按 `create_time` 增量加载 | | 代码变更 | `TABLE_MAP` 和 `FACT_MAPPINGS` 中注册新表,19 个 ODS 驼峰→DWD 蛇形映射 | **新增字段**: | DWD 列 | 类型 | ODS 源列 | 业务含义 | |--------|------|---------|---------| | `site_goods_stock_id` | BIGINT (PK) | `"siteGoodsStockId"` | 库存变动记录 ID | | `tenant_id` | BIGINT | `"tenantId"` | 租户 ID | | `site_id` | BIGINT | `"siteId"` | 门店 ID | | `site_goods_id` | BIGINT | `"siteGoodsId"` | 门店商品 ID | | `goods_name` | TEXT | `"goodsName"` | 商品名称 | | `goods_category_id` | BIGINT | `"goodsCategoryId"` | 一级分类 ID | | `goods_second_category_id` | BIGINT | `"goodsSecondCategoryId"` | 二级分类 ID | | `unit` | TEXT | `unit` | 计量单位 | | `price` | NUMERIC(12,2) | `price` | 商品单价 | | `stock_type` | INTEGER | `"stockType"` | 库存变动类型枚举 | | `change_num` | NUMERIC | `"changeNum"` | 变动数量 | | `start_num` | NUMERIC | `"startNum"` | 变动前库存 | | `end_num` | NUMERIC | `"endNum"` | 变动后库存 | | `change_num_a` | NUMERIC | `"changeNumA"` | 辅助单位变动量 | | `start_num_a` | NUMERIC | `"startNumA"` | 辅助单位变动前库存 | | `end_num_a` | NUMERIC | `"endNumA"` | 辅助单位变动后库存 | | `remark` | TEXT | `remark` | 备注 | | `operator_name` | TEXT | `"operatorName"` | 操作人 | | `create_time` | TIMESTAMPTZ | `"createTime"` | 变动时间 | ### 关联需求 `dataflow-field-completion` Requirements 8.1, 8.2, 8.3, 8.4 ### 兼容性 - **ETL Connector**:新增表,不影响现有流程;ODS 源表 `ods.goods_stock_movements` 已存在且有数据 - **后端 API / 小程序**:无影响,新表需下游显式引用 - **DWS 层**:当前无 DWS 任务引用此表 ### 回滚策略 ```sql DROP TABLE IF EXISTS dwd.dwd_goods_stock_movement; -- 恢复 TABLE_MAP 和 FACT_MAPPINGS:移除 dwd.dwd_goods_stock_movement 条目 ``` ### 验证 SQL ```sql -- 1. 确认表已创建 SELECT table_name FROM information_schema.tables WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_movement'; -- 2. 确认列数量正确(20 列:19 业务 + fetched_at) SELECT count(*) FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_movement'; -- 3. 确认主键约束 SELECT constraint_name, column_name FROM information_schema.key_column_usage WHERE table_schema = 'dwd' AND table_name = 'dwd_goods_stock_movement'; -- 4. 确认 ODS 源表存在且有数据 SELECT count(*) FROM ods.goods_stock_movements; ``` --- ## 迁移 13:新建 DWS 库存汇总表——日/周/月三粒度(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__create_dws_goods_stock_summary.sql` ### 变更说明 在 DWS 层新建三张库存汇总表,基于 `dwd.dwd_goods_stock_summary`(迁移 11)数据,按日/周/月三个粒度聚合库存变动指标。 | 新建表 | 粒度 | stat_period 默认值 | 任务代码 | |--------|------|-------------------|---------| | `dws.dws_goods_stock_daily_summary` | 日 | `'daily'` | `DWS_GOODS_STOCK_DAILY` | | `dws.dws_goods_stock_weekly_summary` | 周(ISO 周,stat_date = 周一) | `'weekly'` | `DWS_GOODS_STOCK_WEEKLY` | | `dws.dws_goods_stock_monthly_summary` | 月(自然月,stat_date = 月首) | `'monthly'` | `DWS_GOODS_STOCK_MONTHLY` | 三张表结构完全相同,主键均为 `(site_id, stat_date, site_goods_id)`。 **字段定义**(三张表共用): | 列名 | 类型 | 说明 | |------|------|------| | `site_id` | BIGINT NOT NULL | 门店 ID(PK 组成) | | `tenant_id` | BIGINT | 租户 ID | | `stat_date` | DATE NOT NULL | 统计日期(PK 组成):日=当天、周=周一、月=月首 | | `site_goods_id` | BIGINT NOT NULL | 门店商品 ID(PK 组成) | | `goods_name` | TEXT | 商品名称 | | `goods_unit` | TEXT | 计量单位 | | `goods_category_id` | BIGINT | 一级分类 ID | | `goods_category_second_id` | BIGINT | 二级分类 ID | | `category_name` | TEXT | 分类名称 | | `range_start_stock` | NUMERIC | 期初库存 | | `range_end_stock` | NUMERIC | 期末库存 | | `range_in` | NUMERIC | 入库数量 | | `range_out` | NUMERIC | 出库数量 | | `range_sale` | NUMERIC | 销售数量 | | `range_sale_money` | NUMERIC(12,2) | 销售金额 | | `range_inventory` | NUMERIC | 盘点调整量 | | `current_stock` | NUMERIC | 当前库存(取期末值) | | `stat_period` | TEXT NOT NULL | 汇总粒度标识 | | `created_at` | TIMESTAMPTZ NOT NULL | 记录创建时间 | | `updated_at` | TIMESTAMPTZ NOT NULL | 记录更新时间 | **索引**(每张表 3 个辅助索引): | 索引名模式 | 列 | 用途 | |-----------|------|------| | `idx_dws_goods_stock_{period}_date` | `stat_date` | 按日期范围查询 | | `idx_dws_goods_stock_{period}_goods` | `(site_goods_id, stat_date)` | 按商品查询趋势 | | `idx_dws_goods_stock_{period}_site` | `(site_id, stat_date)` | 按门店查询汇总 | **任务代码**: - `apps/etl/connectors/feiqiu/tasks/dws/goods_stock_daily_task.py` - `apps/etl/connectors/feiqiu/tasks/dws/goods_stock_weekly_task.py` - `apps/etl/connectors/feiqiu/tasks/dws/goods_stock_monthly_task.py` 任务继承 `BaseDwsTask`,实现 extract → transform → load 三阶段,load 使用 upsert(ON CONFLICT DO UPDATE)。 ### 关联需求 `dataflow-field-completion` Requirements 12.2, 12.3, 12.4, 12.5, 12.6, 12.7, 12.8, 12.9 ### 兼容性 - **ETL Connector**:新增 3 个 DWS 任务,依赖 `dwd.dwd_goods_stock_summary` 已有数据;不影响现有 DWS 任务 - **后端 API / 小程序**:无影响,新表需下游显式引用 - **DWD 层**:只读引用 `dwd.dwd_goods_stock_summary`,不修改 DWD 数据 ### 回滚策略 ```sql DROP TABLE IF EXISTS dws.dws_goods_stock_daily_summary CASCADE; DROP TABLE IF EXISTS dws.dws_goods_stock_weekly_summary CASCADE; DROP TABLE IF EXISTS dws.dws_goods_stock_monthly_summary CASCADE; -- 从任务注册中移除 DWS_GOODS_STOCK_DAILY / DWS_GOODS_STOCK_WEEKLY / DWS_GOODS_STOCK_MONTHLY ``` ### 验证 SQL ```sql -- 1. 确认三张表已创建 SELECT table_name FROM information_schema.tables WHERE table_schema = 'dws' AND table_name LIKE 'dws_goods_stock_%_summary' ORDER BY table_name; -- 预期:3 行(daily, monthly, weekly) -- 2. 确认每张表列数量正确(20 列) SELECT table_name, count(*) AS col_count FROM information_schema.columns WHERE table_schema = 'dws' AND table_name LIKE 'dws_goods_stock_%_summary' GROUP BY table_name ORDER BY table_name; -- 3. 确认主键约束(每张表 3 列主键) SELECT conrelid::regclass AS table_name, array_agg(a.attname ORDER BY k.n) AS pk_columns FROM pg_constraint c CROSS JOIN LATERAL unnest(c.conkey) WITH ORDINALITY AS k(col, n) JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = k.col WHERE c.contype = 'p' AND c.conrelid::regclass::text LIKE 'dws.dws_goods_stock_%_summary' GROUP BY c.conrelid; -- 4. 确认辅助索引已创建(每张表 3 个,共 9 个) SELECT indexname, tablename FROM pg_indexes WHERE schemaname = 'dws' AND indexname LIKE 'idx_dws_goods_stock_%' ORDER BY tablename, indexname; ``` --- ## 代码变更记录 1:recharge_settlements FACT_MAPPINGS 补充(2026-02-20) ### 变更说明 为 `dwd.dwd_recharge_order` 补充 5 个 FACT_MAPPINGS 条目。这 5 个字段的 DWD 列已存在,ODS 源列也已存在,但此前缺少显式映射配置,导致数据未能从 ODS 流转到 DWD。 **注意**:此变更无 DDL 结构变更,无迁移脚本,仅修改 ETL 代码中的 `FACT_MAPPINGS` 配置。 | DWD 列 | ODS 源列 | 类型 | 业务含义 | 数据现状 | |--------|---------|------|---------|---------| | `pl_coupon_sale_amount` | `plcouponsaleamount` | NUMERIC(18,2) | 平台券销售金额 | ODS/DWD 两侧数据全为 0(业务未启用) | | `mervou_sales_amount` | `mervousalesamount` | NUMERIC(18,2) | 储值券销售金额 | 同上 | | `electricity_money` | `electricitymoney` | NUMERIC(18,2) | 电费金额 | 同上 | | `real_electricity_money` | `realelectricitymoney` | NUMERIC(18,2) | 实际电费金额 | 同上 | | `electricity_adjust_money` | `electricityadjustmoney` | NUMERIC(18,2) | 电费调整金额 | 同上 | **代码变更位置**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` → `FACT_MAPPINGS["dwd.dwd_recharge_order"]` ### 关联需求 `dataflow-field-completion` Requirements 6.1, 6.2, 6.3, 6.4 ### 兼容性 - **ETL Connector**:重新执行 `DWD_LOAD_FROM_ODS` 后,这 5 列将从 ODS 正确映射到 DWD(当前数据全为 0,无实际影响) - **后端 API / 小程序**:无影响 - **DWS 层**:当前无 DWS 任务引用这 5 列 --- ## 代码变更记录 2:assistant_cancellation_records assistanton 映射确认(2026-02-20) ### 变更说明 确认 `dwd.dwd_assistant_trash_event` 中 `assistant_no` 字段的 FACT_MAPPINGS 映射 `("assistant_no", "assistanton", None)` 已正确配置。ODS 列 `assistanton`(助教编号/工号)映射到 DWD 列 `assistant_no`。 **注意**:此条目在代码中已存在,本次仅为排查确认,无代码变更,无 DDL 变更。 **代码位置**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` → `FACT_MAPPINGS["dwd.dwd_assistant_trash_event"]` ### 关联需求 `dataflow-field-completion` Requirements 3.1, 3.3 --- ## 迁移 14:为 dim_store_goods_ex 新增 batch_stock_quantity 字段(2026-02-20) ### 迁移文件 `db/etl_feiqiu/migrations/2026-02-20__add_dim_store_goods_ex_batch_stock_quantity.sql` ### 变更说明 为 `dwd.dim_store_goods_ex` 新增 1 个字段,映射自 ODS `store_goods_master` 中的 `batch_stock` 列。 | 新增列 | 类型 | 业务含义 | |--------|------|---------| | `batch_stock_quantity` | INTEGER | 批次库存数量,记录商品按批次管理的库存量 | **DDL 结构变更**:`ALTER TABLE ... ADD COLUMN` × 1 **代码变更**:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 中 `FACT_MAPPINGS["dwd.dim_store_goods_ex"]` 新增 1 个映射条目: - `("batch_stock_quantity", "batch_stock", None)`(ODS 列名 `batch_stock`,DWD 列名 `batch_stock_quantity`) ### 关联需求 `dataflow-field-completion` Requirements 10.1, 10.2 ### 兼容性 - **ETL Connector**:需重新执行 `DWD_LOAD_FROM_ODS` 任务以填充新列数据;ODS 源列已存在 - **后端 API / 小程序**:新增列不影响现有查询 - **DWS 层**:当前无 DWS 任务引用此列,无影响 ### 回滚策略 ```sql BEGIN; ALTER TABLE dwd.dim_store_goods_ex DROP COLUMN IF EXISTS batch_stock_quantity; COMMIT; ``` ### 验证 SQL ```sql -- 1. 确认新列已创建且类型正确 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dim_store_goods_ex' AND column_name = 'batch_stock_quantity'; -- 2. 确认列注释已添加 SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')), (SELECT attnum FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'dim_store_goods_ex' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'dwd')) AND attname = 'batch_stock_quantity') ) AS batch_stock_quantity_comment; -- 3. 确认 ODS 源表中 batch_stock 列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'ods' AND table_name = 'store_goods_master' AND column_name = 'batch_stock'; ``` --- ## DDL 文件同步记录(2026-02-21) ### 变更说明 将迁移 6–14 的所有结构变更同步到 DDL 源文件,确保 `dwd.sql` / `schema_dwd_doc.sql` / `dws.sql` / `schema_dws.sql` 与数据库实际结构一致。 ### 同步清单 | DDL 文件 | 表 | 变更内容 | 状态 | |----------|------|---------|------| | `dwd.sql` | `dim_assistant_ex` | +4 列(system_role_id, job_num, cx_unit_price, pd_unit_price) | 已有(之前已更新) | | `dwd.sql` | `dwd_assistant_service_log_ex` | +2 列(operator_id, operator_name) | 已有(之前已更新) | | `dwd.sql` | `dwd_member_balance_change_ex` | +1 列(relate_id) | 已有(之前已更新) | | `dwd.sql` | `dwd_store_goods_sale` | discount_price→discount_money 重命名 + 新增 discount_price | 已有(之前已更新) | | `dwd.sql` | `dim_store_goods_ex` | +1 列(batch_stock_quantity) | 本次补充 | | `dwd.sql` | `dwd_goods_stock_summary` | 新建表(14 列) | 已有(之前已更新) | | `dwd.sql` | `dwd_goods_stock_movement` | 新建表(19 列) | 已有(之前已更新) | | `schema_dwd_doc.sql` | `dim_assistant_ex` | +4 列 | 已有(sync 脚本已处理) | | `schema_dwd_doc.sql` | `dim_table_ex` | +14 列 | 已有(sync 脚本已处理) | | `schema_dwd_doc.sql` | `dwd_assistant_service_log_ex` | +2 列(operator_id, operator_name) | 本次补充 | | `schema_dwd_doc.sql` | `dwd_member_balance_change_ex` | +1 列(relate_id) | 本次补充 | | `schema_dwd_doc.sql` | `dwd_store_goods_sale` | +1 列(discount_money),修正 COMMENT | 本次补充 | | `schema_dwd_doc.sql` | `dim_store_goods_ex` | +1 列(batch_stock_quantity) | 本次补充 | | `schema_dwd_doc.sql` | `dwd_goods_stock_summary` | 新建表 | 已有(sync 脚本已处理) | | `schema_dwd_doc.sql` | `dwd_goods_stock_movement` | 新建表 | 已有(sync 脚本已处理) | | `dws.sql` | 3 张库存汇总表 | 新建表 | 已有(之前已更新) | | `schema_dws.sql` | 3 张库存汇总表 | 新建表 | 已有(sync 脚本已处理) | ### 未同步项(待确认) | 表 | 字段 | 说明 | |------|------|------| | _(无)_ | — | 所有已知字段变更均已同步 | --- ## 迁移 15:dwd_assistant_trash_event_ex 新增 assistant_no_int 列 - 日期:2026-02-20 - 脚本:`db/etl_feiqiu/migrations/2026-02-20__add_assistant_trash_event_ex_assistant_no_int.sql` - 执行目标:测试库 `test_etl_feiqiu`(2026-02-21 已执行) ### 变更说明 | 表 | 变更类型 | 字段 | 类型 | 说明 | |------|---------|------|------|------| | `dwd.dwd_assistant_trash_event_ex` | ADD COLUMN | `assistant_no_int` | `INTEGER` | 助教编号整数形式,与主表 `assistant_no`(VARCHAR)同源自 ODS `assistanton`,便于数值比较和关联 | ### 兼容性 - ETL:`dwd_load_task.py` FACT_MAPPINGS 已同步添加 `("assistant_no_int", "assistanton", None)` 映射 - 后端 API:无直接影响(当前未查询 `_ex` 表此字段) - 小程序:无影响 - 新增列允许 NULL,不影响现有数据写入 ### 回滚策略 ```sql ALTER TABLE dwd.dwd_assistant_trash_event_ex DROP COLUMN IF EXISTS assistant_no_int; ``` ### DDL 同步 | DDL 文件 | 变更 | 状态 | |----------|------|------| | `dwd.sql` | `dwd_assistant_trash_event_ex` +1 列(assistant_no_int)+ COMMENT | ✅ 已同步 | | `schema_dwd_doc.sql` | `dwd_assistant_trash_event_ex` +1 列(assistant_no_int)+ COMMENT | ✅ 已同步 | ### 验证 SQL ```sql -- 1. 确认列存在 SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_assistant_trash_event_ex' AND column_name = 'assistant_no_int'; -- 2. 确认 COMMENT SELECT col_description( (SELECT oid FROM pg_class WHERE relname = 'dwd_assistant_trash_event_ex'), (SELECT ordinal_position FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_assistant_trash_event_ex' AND column_name = 'assistant_no_int') ); -- 3. 确认表完整列清单 SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'dwd' AND table_name = 'dwd_assistant_trash_event_ex' ORDER BY ordinal_position; ```