7.2 KiB
7.2 KiB
- [P20260218-110820] 2026-02-18 11:08:20 +0800
- summary: CONTEXT TRANSFER: We are continuing a conversation that had gotten too long. Here is a summary: --- ## TASK 1: ODS 层 Ear…
- prompt:
CONTEXT TRANSFER: We are continuing a conversation that had gotten too long. Here is a summary:
---
## TASK 1: ODS 层 Early-Cutoff 软删除保护机制
**STATUS**: done
**USER QUERIES**: 1 (问题1:软删除参数分析), 2 (确认实现方向), 3-5 (代码实现过程)
**DETAILS**:
- 用户希望在 `SnapshotMode.WINDOW` 模式下,当 API 返回数据被截断时(如请求 01-01~02-18 但只返回 02-01 起的数据),不要误删截断点之前的数据
- 新增配置参数 `run.snapshot_protect_early_cutoff`(默认 `True`)
- 新增方法 `_collect_earliest_time()` 从 API 返回记录中提取 `snapshot_time_column` 的最小值
- 在 `execute()` 中收集每个 segment 的实际最早时间戳,将 `_mark_missing_as_deleted` 的 `window_start` 收窄至 `max(seg_start, actual_earliest)`
- 单元测试通过(456 passed,唯一失败的 `test_gen_audit_dashboard.py` 是已有问题,与本次改动无关)
- 架构文档已更新,新增附录 A 描述软删除机制和 early-cutoff 保护
**FILEPATHS**:
- `apps/etl/pipelines/feiqiu/config/defaults.py`
- `apps/etl/pipelines/feiqiu/tasks/ods/ods_tasks.py`
- `docs/etl-feiqiu-architecture.md`
---
## TASK 2: 跨层强制全量更新参数 `run.force_full_update`
**STATUS**: in-progress
**USER QUERIES**: 1 (问题2初始讨论), 2 (确认需求), 6 (明确要求同时控制 ODS 和 DWD)
**DETAILS**:
- 用户要求一个统一参数 `run.force_full_update`(默认 `false`),同时控制 ODS 和 DWD 两层
- 当开启时:ODS 层跳过 content_hash 去重 + 去掉 WHERE IS DISTINCT FROM 条件;DWD 层跳过维度表逐列对比 + 去掉事实表 WHERE IS DISTINCT FROM 条件
- 用途:表结构变化后(新增字段),强制全量刷新所有数据
**已完成的修改**:
1. `config/defaults.py`: 已添加 `"force_full_update": False` 到 `run` 配置节
2. `tasks/ods/ods_tasks.py` - `_insert_records_schema_aware()`:
- 已添加 `force_full_update` 读取,`compare_latest` 条件中加入 `and not force_full_update`(跳过 hash 去重)
- 已修改 `ON CONFLICT ... WHERE IS DISTINCT FROM` 条件,`force_full_update` 时去掉 WHERE 子句
**未完成的修改**:
3. `tasks/dwd/dwd_load_task.py` - `_merge_dim_scd2()` (Line ~876):
- 需要在 `_is_row_changed` 判断处加入 `force_full_update` 逻辑
- 当前代码: `if current and not self._is_row_changed(current, incoming, dwd_cols): continue`
- 需改为: 当 `force_full_update=True` 时跳过此判断,视所有已存在的行为"已变更",强制关闭旧版+插入新版
4. `tasks/dwd/dwd_load_task.py` - `_merge_fact_increment()` (Line ~1200):
- 当前代码在 `snapshot_mode or fact_upsert` 时生成 `WHERE ... IS DISTINCT FROM` 条件
- 需改为: 当 `force_full_update=True` 时去掉 WHERE 子句,无条件 DO UPDATE SET
5. 文档更新: `docs/etl-feiqiu-architecture.md` 需要在附录中补充 `force_full_update` 参数说明
6. 语法检查 + 单元测试验证
**NEXT STEPS**:
- 修改 `tasks/dwd/dwd_load_task.py` 中 `_merge_dim_scd2` 方法(约 Line 876),在 `_is_row_changed` 判断前读取 `force_full_update` 配置,为 True 时跳过变更检测
- 修改 `tasks/dwd/dwd_load_task.py` 中 `_merge_fact_increment` 方法(约 Line 1200),在构建 ON CONFLICT 的 WHERE 子句时,`force_full_update` 为 True 则不加 WHERE IS DISTINCT FROM
- 更新 `docs/etl-feiqiu-architecture.md` 附录,补充 `run.force_full_update` 参数文档
- 运行语法检查和单元测试
**FILEPATHS**:
- `apps/etl/pipelines/feiqiu/config/defaults.py`
- `apps/etl/pipelines/feiqiu/tasks/ods/ods_tasks.py`
- `apps/etl/pipelines/feiqiu/tasks/dwd/dwd_load_task.py`
- `docs/etl-feiqiu-architecture.md`
---
## USER CORRECTIONS AND INSTRUCTIONS:
- 所有说明性文字使用简体中文
- 代码注释写"为什么/边界/假设",避免复述代码
- 改动命中高风险路径(`config/`、`tasks/`),完成后需提醒用户运行 `/audit`
- 脚本执行优先用 Python 脚本,避免 PowerShell 复杂逻辑
- 单元测试命令: `cd apps/etl/pipelines/feiqiu && pytest tests/unit`
- 语法检查命令: `python -c "import ast; ast.parse(open(r'C:\NeoZQYY\...', encoding='utf-8').read()); print('OK')"`
- `test_gen_audit_dashboard.py::TestRealAuditFiles::test_parses_all_real_files` 是已有失败,与本次改动无关
## Files to read
- `apps/etl/pipelines/feiqiu/tasks/dwd/dwd_load_task.py`
- `apps/etl/pipelines/feiqiu/tasks/ods/ods_tasks.py`
- `apps/etl/pipelines/feiqiu/config/defaults.py`
- `docs/etl-feiqiu-architecture.md`
USER QUERIES(most recent first):
1. 这个参数应该同时控制API - ODS - DWD。对hash校验没有差别时也更新数据。
2. Output:
Command executed successfully with no output.
Exit Code: 0
3. Output:
Command executed successfully with no output.
Exit Code: 0
4. 这个参数应该同时控制API - ODS - DWD。对hash校验没有差别时也更新数据。
5. Output:
Command executed successfully with no output.
Exit Code: 0
6. Output:
Command executed successfully with no output.
Exit Code: 0
7. Output:
Command executed successfully with no output.
Exit Code: 0
8. 问题1 :- 仅对 SnapshotMode.WINDOW 有意义,FULL_TABLE 模式不涉及时间窗口。对的,符合我的预期。- 需要考虑 snapshot_time_column 可能为 None 或记录中该字段缺失的情况这种情况现在已经处理了, 不会触发软删除,我说的保险机制是返回数据了,但被过期截断的状态。- 建议加一个配置开关(如 run.snapshot_protect_early_cutoff,默认 True),方便紧急关闭可以加入这个参数。修改吧,改完代码记得落到各种文档中。----------------------------------问题2:API→ODS 层:需要有个全量校验,当API和ODS的表结构发生变化,需要做新字段/全字段的全量的更新。ODS - DWD:情况相同,但我记得除了Hash校验外,还有一个校验,专门应对这个情况。
9. Output:
Command executed successfully with no output.
Exit Code: 0
10. Output:
Command executed successfully with no output.
Exit Code: 0
11. Output:
Command executed successfully with no output.
Exit Code: 0
12. 在 ETL feiqiu连接器下,几个修改意见,帮我分析,是否科学合理:1 软删除参数API - ODS - DWD 是否都适用?API - ODS层的软删除,我希望能加入一个机制,即API返回的json,时间窗口中,最早的数据以前的数据不会被软删除标记,举例:我申请2026-01-01 到 2026-02-18 的数据,返回内容中,最早的是2026-02-01的数据,则2026-02-01之前的数据不会标记软删除。防止API返回的过早数据被切断的问题。2 现在ODS - DWD的处理流程,有个参数是控制HASH校验或全量校验对吧。API - ODS 及 DWD - DWS/INDEX 也有这个参数么?要不要将此参数放在公共参数中?
---
METADATA:
The previous conversation had 6 messages.
INSTRUCTIONS:
Continue working until the user query has been fully addressed. Do not ask for clarification - proceed with the work based on the context provided.
IMPORTANT: you need to read from the files to Read section