Files
Neo-ZQYY/docs/etl-feiqiu-architecture.md

744 lines
32 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 飞球 ETL 连接器架构文档
> 生成日期2026-02-18 | 基于代码实际阅读
---
## 目录
1. [DWD → DWS/INDEX 数据流](#1-dwd--dwsindex-数据流)
2. [DWS 层任务详解](#2-dws-层任务详解)
3. [INDEX 层指数算法任务详解](#3-index-层指数算法任务详解)
4. [任务调度与编排系统](#4-任务调度与编排系统)
5. [CLI 入口与参数](#5-cli-入口与参数)
---
## 1. DWD → DWS/INDEX 数据流
### 1.1 整体分层
```
上游 SaaS API
┌─────────┐
│ ODS │ 原始数据落地(保留源 payload
└────┬────┘
│ DWD_LOAD_FROM_ODS
┌─────────┐
│ DWD │ 明细层(维度 SCD2事实按时间增量
└────┬────┘
├──────────────────────────┐
▼ ▼
┌─────────┐ ┌───────────┐
│ DWS │ 汇总层 │ INDEX │ 指数算法层
└─────────┘ └───────────┘
```
### 1.2 类继承体系
```
BaseTask # 提供 E/T/L 模板方法 + 窗口分段
├── ODS 任务(略)
├── DwdLoadTask # DWD 装载
└── BaseDwsTask # DWS 层基类(提供默认 extract/load 模板方法)
│ # 子类声明 DATE_COL + 实现 _do_extract()/transform()
├── AssistantDailyTask # 助教日度
├── AssistantMonthlyTask # 助教月度
├── ...(其他 DWS 任务)
├── FinanceBaseTask # 财务任务共享基类(共享提取方法)
│ ├── FinanceDailyTask
│ ├── FinanceRechargeTask
│ ├── FinanceIncomeStructureTask
│ └── FinanceDiscountDetailTask
├── DwsMaintenanceTask # 统一维护MV 刷新 + 数据清理)
└── BaseIndexTask # 指数算法基类
├── MemberIndexBaseTask # 会员指数共享逻辑(模板 execute
│ ├── WinbackIndexTask # WBI实现 _calculate_scores/_save_results
│ └── NewconvIndexTask # NCI
├── RelationIndexTask # RS/OS/MS/ML
└── MlManualImportTask # ML 人工台账导入
```
### 1.3 BaseTask 核心机制
`BaseTask.execute()` 是所有任务的统一入口,流程:
1. `_build_context(cursor_data)` → 构建 `TaskContext`store_id, window_start/end, cursor
2. `build_window_segments()` → 根据配置将大窗口拆分为多段day/week/month
3. 对每个分段循环执行:`extract() → transform() → load() → commit()`
4. 累加各段统计,返回汇总结果
时间窗口确定优先级:
- `run.window_override.start/end`CLI `--window-start/--window-end`> 游标 `cursor.last_end` > 默认回溯
闲时/忙时自适应窗口:
- 闲时04:0016:00默认 180 分钟
- 忙时:默认 30 分钟
- 游标存在时:从 `cursor.last_end - overlap_seconds` 开始
### 1.4 BaseDwsTask 通用能力
`BaseDwsTask` 继承 `BaseTask`,为 DWS 层提供:
| 能力 | 说明 |
|------|------|
| `DATE_COL` | 类属性,子类声明日期列名(用于默认 extract/load |
| `_do_extract(context)` | 抽象方法,子类实现数据提取逻辑 |
| `extract(context)` | 默认实现:调用 `_do_extract()` 并包装为标准字典 |
| `load(transformed, context)` | 默认实现:`delete_existing_data(date_col=DATE_COL)` + `bulk_insert()`,返回标准统计字典 |
| `get_target_table()` | 抽象方法,返回目标表名(不含 schema |
| `get_primary_keys()` | 抽象方法,返回主键列表(用于幂等更新) |
| `iter_dwd_rows()` / `query_dwd()` | 从 DWD 层读取数据 |
| `load_config_cache()` | 缓存配置表绩效档位、等级定价、奖金规则、区域分类、技能类型TTL 300 秒 |
| `get_assistant_level_asof()` | SCD2 维度 as-of 取值(获取指定日期生效的助教等级) |
| `get_member_card_balance_asof()` | 获取会员储值卡余额 |
| `delete_existing_data()` | 按主键删除已有数据delete-before-insert 幂等) |
| `bulk_insert()` / `upsert()` | 批量写入 / UPSERT |
| `calculate_rolling_stats()` | 滚动窗口统计7/10/15/30/60/90 天) |
| `calculate_rank_with_ties()` | 带并列的排名计算 |
| 时间分层枚举 | `TimeLayer`近2天/1月/3月/6月/全量)、`TimeWindow`(本周/上周/本月/上月/季度等) |
**DWS 子类开发模式**:大多数 DWS 子类只需声明 `DATE_COL` 并实现 `_do_extract()``transform()`,即可复用基类的默认 `extract()``load()` 实现。有自定义逻辑的子类(如 `AssistantSalaryTask` 的月度删除)可覆盖默认实现。
**公共辅助模块**`tasks/dws/dws_helpers.py` 提供 `mask_mobile()``calc_days_since()``parse_id_list()``safe_division()` 等纯函数,消除子类间的重复代码。
---
## 2. DWS 层任务详解
### 2.1 已注册的 DWS 任务
| 任务代码 | 类名 | 目标表 | 主键 | 说明 |
|----------|------|--------|------|------|
| `DWS_BUILD_ORDER_SUMMARY` | DwsBuildOrderSummaryTask | — | — | 订单汇总(工具类) |
| `DWS_ASSISTANT_DAILY` | AssistantDailyTask | `dws_assistant_daily_detail` | site_id, assistant_id, stat_date | 助教日度业绩明细 |
| `DWS_ASSISTANT_MONTHLY` | AssistantMonthlyTask | — | — | 助教月度汇总 |
| `DWS_ASSISTANT_CUSTOMER` | AssistantCustomerTask | — | — | 助教-客户关系 |
| `DWS_ASSISTANT_SALARY` | AssistantSalaryTask | — | — | 助教工资计算 |
| `DWS_ASSISTANT_FINANCE` | AssistantFinanceTask | — | — | 助教财务汇总 |
| `DWS_MEMBER_CONSUMPTION` | MemberConsumptionTask | — | — | 会员消费分析 |
| `DWS_MEMBER_VISIT` | MemberVisitTask | — | — | 会员到访分析 |
| `DWS_FINANCE_DAILY` | FinanceDailyTask | — | — | 财务日报 |
| `DWS_FINANCE_RECHARGE` | FinanceRechargeTask | — | — | 充值分析 |
| `DWS_FINANCE_INCOME_STRUCTURE` | FinanceIncomeStructureTask | — | — | 收入结构分析 |
| `DWS_FINANCE_DISCOUNT_DETAIL` | FinanceDiscountDetailTask | — | — | 优惠明细 |
| `DWS_RETENTION_CLEANUP` | ~~DwsRetentionCleanupTask~~ | — | — | [已合并] 见 DWS_MAINTENANCE |
| `DWS_MV_REFRESH_FINANCE_DAILY` | ~~DwsMvRefreshFinanceDailyTask~~ | — | — | [已合并] 见 DWS_MAINTENANCE |
| `DWS_MV_REFRESH_ASSISTANT_DAILY` | ~~DwsMvRefreshAssistantDailyTask~~ | — | — | [已合并] 见 DWS_MAINTENANCE |
| `DWS_MAINTENANCE` | DwsMaintenanceTask | — | — | 统一维护MV 刷新 + 数据清理) |
### 2.2 DWS 任务典型流程(以 AssistantDailyTask 为例)
```
extract(context):
├── 从 dwd.dwd_assistant_service_log 提取助教服务记录
├── 从 dwd.dwd_assistant_trash_event 提取废除记录
└── 加载配置缓存(技能类型映射等)
transform(extracted, context):
├── 按 (assistant_id, stat_date) 聚合
├── 排除废除记录
├── 通过 SCD2 as-of 获取统计日当日助教等级
├── 通过 skill_id 映射课程类型(基础课/附加课)
└── 汇总:服务次数、计费时长、计费金额、客户数、台桌数
load(transformed, context):
├── delete_existing_data() # 按日期窗口删除
└── bulk_insert() # 批量写入 dws.dws_assistant_daily_detail
```
更新策略:每小时增量更新,幂等方式为 delete-before-insert按日期窗口
---
## 3. INDEX 层指数算法任务详解
### 3.1 已注册的 INDEX 任务
| 任务代码 | 类名 | 指数类型 | 目标表 | 主键 |
|----------|------|----------|--------|------|
| `DWS_WINBACK_INDEX` | WinbackIndexTask | WBI | `dws_member_winback_index` | site_id, member_id |
| `DWS_NEWCONV_INDEX` | NewconvIndexTask | NCI | `dws_member_newconv_index` | site_id, member_id |
| `DWS_RELATION_INDEX` | RelationIndexTask | RS/OS/MS/ML | `dws_member_assistant_relation_index` | site_id, member_id, assistant_id |
| `DWS_ML_MANUAL_IMPORT` | MlManualImportTask | ML | `dws_ml_manual_source` + `dws_member_assistant_relation_index` | — |
### 3.2 BaseIndexTask 核心能力
`BaseIndexTask` 继承 `BaseDwsTask`,提供指数计算通用功能:
| 能力 | 方法 | 说明 |
|------|------|------|
| 半衰期衰减 | `decay(days, halflife)` | `exp(-ln(2) * d / h)`d=h 时权重=0.5 |
| 分位数计算 | `calculate_percentiles(scores, lower, upper)` | 默认 P5/P95 |
| Winsorize 截断 | `winsorize(value, lower, upper)` | 将值截断到分位点范围 |
| 归一化映射 | `normalize_to_display(value, min, max, compression)` | 映射到 010 分 |
| 批量归一化 | `batch_normalize_to_display(raw_scores, ...)` | 计算分位点 → Winsorize → 映射 |
| 参数加载 | `load_index_parameters(index_type)` | 从 `dws.cfg_index_parameters` 读取TTL 300 秒缓存 |
| 分位点历史 | `save_percentile_history()` / `get_last_percentile_history()` | 写入/读取 `dws.dws_index_percentile_history` |
| EWMA 平滑 | `_apply_ewma_smoothing()` | 对分位点做指数加权移动平均,避免跳变 |
| 压缩函数 | `_resolve_compression()` | 支持 none / log1p / asinh 三种压缩模式 |
归一化流程:
```
Raw Score → [可选压缩: ln(1+x) 或 asinh(x)]
→ Winsorize(P5, P95)
→ MinMax 映射: score = 10 * (x - P5) / (P95 - P5)
→ clip(0, 10)
```
### 3.3 参数加载机制
所有指数参数存储在 `dws.cfg_index_parameters` 表中:
```sql
SELECT param_name, param_value
FROM dws.cfg_index_parameters
WHERE index_type = %s
AND effective_from <= CURRENT_DATE
AND (effective_to IS NULL OR effective_to >= CURRENT_DATE)
ORDER BY effective_from DESC
```
加载优先级:数据库参数 > 代码中的 `DEFAULT_PARAMS`(子类定义)。
缓存策略:按 `index_type` 隔离TTL 300 秒。
运行时覆盖:`run.index_lookback_days` 可覆盖 `lookback_days_recency`
### 3.4 WBI — 老客挽回指数
业务含义:衡量老客流失风险,分数越高越需要挽回干预。
执行流程:
1. 获取 site_id / tenant_id
2. `_load_params()` → 合并数据库参数与 `DEFAULT_PARAMS`
3. `_build_member_activity()` → 构建会员活动特征(来自 MemberIndexBaseTask
4. 对每个会员计算 `MemberWinbackData`
- `overdue_old`:逾期衰减分(基于理想到访间隔 vs 实际间隔)
- `drop_old`:消费下降分
- `recharge_old`:充值衰减分
- `value_old`:价值分(消费+余额)
5. 加权求和:`raw_score = w_over * overdue + w_drop * drop + w_re * recharge + w_value * value`
6. `batch_normalize_to_display()` → 归一化到 010
默认参数(`DEFAULT_PARAMS`
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `lookback_days_recency` | 60 | 近期活跃度回溯天数 |
| `visit_lookback_days` | 180 | 到访历史回溯天数 |
| `percentile_lower` / `upper` | 5 / 95 | 分位点截断 |
| `compression_mode` | 0 | 压缩模式0=none |
| `use_smoothing` | 1 | 启用 EWMA 平滑 |
| `ewma_alpha` | 0.2 | EWMA 平滑系数 |
| `new_visit_threshold` | 2 | 新客到访次数阈值 |
| `new_days_threshold` | 30 | 新客天数阈值 |
| `overdue_alpha` | 2.0 | 逾期衰减指数 |
| `overdue_weight_halflife_days` | 30 | 逾期权重半衰期 |
| `h_recharge` | 7 | 充值衰减半衰期(天) |
| `amount_base_M0` | 300 | 消费基准金额 |
| `balance_base_B0` | 500 | 余额基准金额 |
| `w_over` / `w_drop` / `w_re` / `w_value` | 2.0 / 1.0 / 0.4 / 1.2 | 四维权重 |
| `enable_stop_high_balance_exception` | 0 | STOP 高余额例外(默认关闭) |
| `high_balance_threshold` | 1000 | 高余额阈值 |
### 3.5 NCI — 新客转化指数
业务含义:衡量新客转化为忠实客户的潜力,分数越高越值得重点跟进。
执行流程与 WBI 类似,但评分维度不同:
1. `_build_member_activity()` → 构建会员活动特征
2. 对每个新客计算:
- `welcome`:欢迎窗口内的互动分
- `need`:需求紧迫度(基于到访间隔)
- `recharge`:充值行为分
- `value`:价值分
3. 加权求和:`raw_score = w_welcome * welcome + w_need * need + w_re * recharge + w_value * value`
4. 归一化到 010
默认参数(`DEFAULT_PARAMS`
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `no_touch_days_new` | 3 | 新客免打扰天数 |
| `t2_target_days` | 7 | 第二次到访目标天数 |
| `salvage_start` / `salvage_end` | 30 / 60 | 挽救窗口(天) |
| `welcome_window_days` | 3 | 欢迎窗口天数 |
| `active_new_visit_threshold_14d` | 2 | 14 天内活跃新客到访阈值 |
| `active_new_recency_days` | 7 | 活跃新客近期天数 |
| `active_new_penalty` | 0.2 | 活跃新客惩罚系数 |
| `w_welcome` / `w_need` / `w_re` / `w_value` | 1.0 / 1.6 / 0.8 / 1.0 | 四维权重 |
### 3.6 RelationIndexTask — 关系指数RS/OS/MS/ML
业务含义:衡量助教与会员之间的关系强度,一个任务产出四个子指数。
执行流程:
1. 提取服务记录 `_extract_service_records()`
2. 合并会话 `_group_and_merge_sessions()`(按 `session_merge_hours` 合并相邻服务)
3. 分别计算四个子指数:
- **RSRelation Strength**:关系强度,基于服务频次 + 时长衰减
- **OSOwnership Score**:归属度,基于 RS 占比判断主教练/共管/无归属
- **MSMomentum Score**:动量分,短期 vs 长期趋势对比
- **MLMoney Link**:资金关联度,基于人工台账的充值归因
4. 各子指数独立归一化到 010
RS 默认参数:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `lookback_days` | 60 | 回溯天数 |
| `session_merge_hours` | 4 | 会话合并间隔(小时) |
| `incentive_weight` | 1.5 | 激励权重 |
| `halflife_session` | 14.0 | 会话频次半衰期(天) |
| `halflife_last` | 10.0 | 最近一次服务半衰期(天) |
| `weight_f` / `weight_d` | 1.0 / 0.7 | 频次/时长权重 |
| `gate_alpha` | 0.6 | 门控系数 |
OS 默认参数:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `min_rs_raw_for_ownership` | 0.05 | 参与归属判定的最低 RS |
| `min_total_rs_raw` | 0.10 | 总 RS 最低阈值 |
| `ownership_main_threshold` | 0.60 | 主教练占比阈值 |
| `ownership_comanage_threshold` | 0.35 | 共管占比阈值 |
| `ownership_gap_threshold` | 0.15 | 归属差距阈值 |
MS 默认参数:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `halflife_short` | 7.0 | 短期半衰期(天) |
| `halflife_long` | 30.0 | 长期半衰期(天) |
ML 默认参数:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `amount_base` | 500.0 | 充值基准金额 |
| `halflife_recharge` | 21.0 | 充值衰减半衰期(天) |
### 3.7 MlManualImportTask — ML 人工台账导入
业务含义:从 Excel 文件导入人工分配的充值归因数据,作为 ML 指数的数据源。
执行流程:
1. `_resolve_file_path()` → 解析 Excel 文件路径
2. `_read_excel_rows()` → 读取 Excel 数据
3. `_normalize_row()` → 标准化每行数据
4. `_build_source_row()` → 构建源数据行 → 写入 `dws_ml_manual_source`
5. `_build_alloc_rows()` → 构建分配行 → UPSERT 到 `dws_member_assistant_relation_index`
6.`resolve_scope()` 确定导入范围P30 桶),幂等删除后重新插入
导入范围:按 `(site_id, biz_date)` 解析为 P30 桶30 天滚动窗口),同一桶内的数据整体替换。
---
## 4. 任务调度与编排系统
### 4.1 架构概览
```
CLI (cli/main.py)
├── 传统模式 ──→ TaskExecutor.run_tasks(task_codes)
└── Flow 模式 ──→ FlowRunner.run(flow, processing_mode, ...)
├── _resolve_tasks(layers) → 层→任务映射(拓扑排序)
├── TaskExecutor.run_tasks(auto_tasks) → 增量 ETL
└── _run_verification() → 后置校验(可选)
```
核心组件:
| 组件 | 文件 | 职责 |
|------|------|------|
| `TaskRegistry` | `orchestration/task_registry.py` | 任务注册与工厂,维护 task_code → TaskMeta 映射 |
| `TaskExecutor` | `orchestration/task_executor.py` | 单任务执行生命周期(游标、运行记录、抓取/入库) |
| `FlowRunner` | `orchestration/flow_runner.py` | Flow 编排(层→任务映射、拓扑排序、校验编排) |
| `CursorManager` | `orchestration/cursor_manager.py` | 游标管理(`meta.etl_cursor` |
| `RunTracker` | `orchestration/run_tracker.py` | 运行记录(`meta.etl_run` |
| `ETLScheduler` | `orchestration/scheduler.py` | 薄包装层(已弃用,兼容旧调用方) |
| `topological_sort` | `orchestration/topological_sort.py` | 任务依赖拓扑排序Kahn's algorithm |
### 4.2 两种执行模式
#### 4.2.1 传统模式(`--tasks`
直接指定任务代码列表,由 `TaskExecutor.run_tasks()` 顺序执行。
```bash
python -m cli.main --tasks ODS_MEMBER,ODS_ORDER,DWD_LOAD_FROM_ODS
```
流程:
1. 解析 `--tasks` 为任务代码列表
2. `TaskExecutor.run_tasks(task_codes, data_source)` 逐个执行
3. 每个任务走 `run_single_task()` 完整生命周期
#### 4.2.2 Flow 模式(`--flow` / `--layers`
指定 Flow 类型或层组合,由 `FlowRunner` 自动编排多层 ETL。
```bash
# 使用 --flow 快捷别名
python -m cli.main --flow api_full --processing-mode increment_verify
# 使用 --layers 自由组合
python -m cli.main --layers ODS,DWD,DWS --processing-mode increment_only
```
> `--layers` 和 `--flow` 互斥。
### 4.3 Flow 定义7 种 Flow
| Flow 名称 | 包含层 | 典型用途 |
|-----------|--------|----------|
| `api_ods` | ODS | 仅从 API 抓取到 ODS |
| `api_ods_dwd` | ODS → DWD | 抓取 + 清洗装载 |
| `api_full` | ODS → DWD → DWS → INDEX | 全链路 |
| `ods_dwd` | DWD | 仅 ODS→DWD 装载 |
| `dwd_dws` | DWS | 仅 DWD→DWS 汇总 |
| `dwd_dws_index` | DWS → INDEX | DWS 汇总 + 指数计算 |
| `dwd_index` | INDEX | 仅指数计算 |
### 4.4 四种处理模式
| 模式 | 说明 |
|------|------|
| `increment_only` | 仅增量处理(默认) |
| `verify_only` | 跳过增量,直接校验数据一致性并自动补齐 |
| `increment_verify` | 先增量处理,再校验补齐 |
| `full_window` | 用 API 返回数据的实际时间范围处理全部层ODS→DWD→DWS→INDEX跳过 MAX(fetched_at) 兜底和校验 |
`verify_only` 模式可选 `--fetch-before-verify`:校验前先从 API 获取最新数据。
`full_window` 模式ODS 层直接使用基础窗口CLI 指定或默认 24h 回溯),不走 `_get_max_fetched_at` 兜底逻辑。适用于需要全量重跑某时间段数据的场景API 返回数据即为真实来源,无游标偏移风险。
### 4.5 三种数据源模式DataSource
| 模式 | 说明 | 旧参数映射 |
|------|------|-----------|
| `online` | 仅在线抓取API → JSON 落盘) | `FETCH_ONLY` |
| `offline` | 仅本地入库JSON 回放 → DB | `INGEST_ONLY` |
| `hybrid` | 抓取 + 入库(默认) | `FULL` |
### 4.6 TaskExecutor 单任务执行生命周期
`TaskExecutor.run_single_task()` 是单任务执行的核心方法:
```
run_single_task(task_code, run_uuid, store_id, data_source)
├── 工具类任务? → _run_utility_task() → 直接执行,跳过游标和运行记录
├── _load_task_config() → 从 meta.etl_task_config 加载任务配置
│ └── 未启用/不存在 → 返回 SKIP
├── cursor_mgr.get_or_create() → 获取/创建游标
├── run_tracker.create_run() → 创建运行记录(状态=RUNNING
├── ODS 任务?
│ ├── data_source 包含 fetch
│ │ └── _execute_ods_record_and_load() → RecordingAPIClient 抓取 + 直接入库
│ └── 否则
│ └── _execute_ingest() → LocalJsonClient 回放入库
├── 非 ODS 任务DWD/DWS/INDEX
│ ├── data_source 包含 fetch
│ │ └── _execute_fetch() → RecordingAPIClient 抓取落盘
│ │ └── data_source == online → 仅抓取,不入库,直接返回
│ ├── data_source 包含 ingest
│ │ └── _execute_ingest() → LocalJsonClient 回放 → task.execute()
│ └── 对于 DWS/INDEX 任务:直接走 task.execute(cursor_data)
├── run_tracker.update_run() → 更新运行记录(状态/计数/窗口)
└── cursor_mgr.advance() → 成功后推进游标水位
```
关键设计:
- ODS 任务在 `execute()` 内完成 DB upsert走特殊路径
- DWS/INDEX 任务的 `requires_db_config=False`,被标记为工具类任务,跳过游标和运行记录管理
- `RecordingAPIClient` 包装原始 API 客户端,抓取时同时落盘 JSON
- `LocalJsonClient` 从本地 JSON 文件回放数据,用于离线模式
### 4.7 FlowRunner 层→任务解析
`_resolve_tasks(layers)` 将层列表解析为具体任务代码,并执行拓扑排序确保依赖顺序:
| 层 | 解析优先级 |
|----|-----------|
| ODS | 配置 `run.ods_tasks` > `TaskRegistry.get_tasks_by_layer("ODS")` |
| DWD | 固定 `DWD_LOAD_FROM_ODS` |
| DWS | 配置 `run.dws_tasks` > `TaskRegistry.get_tasks_by_layer("DWS")` |
| INDEX | 配置 `run.index_tasks` > `TaskRegistry.get_tasks_by_layer("INDEX")` |
> 已移除所有硬编码回退列表,统一走 `TaskRegistry.get_tasks_by_layer()` 获取任务。空 Registry + 无配置时记录警告并返回空列表。
> DWS/INDEX 层支持轻量级校验(跳过完整性校验或仅执行行数校验,返回 SKIPPED 状态)。
### 4.8 TaskRegistry 任务元数据
每个任务注册时携带 `TaskMeta`
```python
@dataclass
class TaskMeta:
task_class: type
requires_db_config: bool = True # False → 工具类任务,跳过游标/运行记录
layer: str | None = None # "ODS" / "DWD" / "DWS" / "INDEX" / None
task_type: str = "etl" # "etl" / "utility" / "verification"
depends_on: list[str] = field(default_factory=list) # 依赖的任务代码列表
```
**任务依赖声明**:通过 `depends_on` 字段声明任务间的依赖关系,`_resolve_tasks()` 会对任务列表执行拓扑排序Kahn's algorithm确保被依赖任务先于依赖方执行。循环依赖会抛出 `ValueError`,缺失依赖(不在当前执行列表中)会记录警告但继续执行。
已知依赖关系:
- `DWS_ASSISTANT_FINANCE``DWS_ASSISTANT_SALARY`
- `DWS_ASSISTANT_MONTHLY``DWS_ASSISTANT_DAILY`
- `DWS_MAINTENANCE` → 所有其他 DWS 任务
- `DWS_WINBACK_INDEX` / `DWS_NEWCONV_INDEX``DWS_MEMBER_VISIT`, `DWS_MEMBER_CONSUMPTION`
- `DWS_RELATION_INDEX``DWS_ASSISTANT_DAILY`
当前注册统计:
- ODS 层:动态注册(由 `ODS_TASK_CLASSES` 字典生成)
- DWD 层2 个DWD_LOAD_FROM_ODS, DWD_QUALITY_CHECK
- DWS 层13 个(含统一维护任务 DWS_MAINTENANCE原 3 个 MV 刷新/清理任务已合并)
- INDEX 层4 个WBI, NCI, Relation, ML 导入)
- 工具类7 个手动入库、Schema 初始化、JSON 归档、截止检查、种子配置)
- 校验类1 个DATA_INTEGRITY_CHECK
### 4.9 游标管理CursorManager
存储在 `meta.etl_cursor` 表中,每个 `(task_id, store_id)` 一条记录:
| 字段 | 说明 |
|------|------|
| `last_start` | 上次窗口开始时间 |
| `last_end` | 上次窗口结束时间 |
| `last_id` | 上次处理的最大 ID可选 |
| `last_run_id` | 关联的运行记录 ID |
| `updated_at` | 更新时间 |
推进逻辑:任务成功后,`advance()``last_start/last_end` 更新为本次窗口范围,`last_id``GREATEST(旧值, 新值)`
### 4.10 运行记录RunTracker
存储在 `meta.etl_run` 表中,每次任务执行创建一条记录:
| 字段 | 说明 |
|------|------|
| `run_uuid` | 本次运行唯一标识 |
| `task_id` / `store_id` | 任务和门店 |
| `status` | SUCC / FAIL / PARTIAL |
| `window_start` / `window_end` / `window_minutes` | 时间窗口 |
| `fetched_count` / `loaded_count` / `updated_count` / `skipped_count` / `error_count` | 计数统计 |
| `export_dir` / `log_path` | 输出目录和日志路径 |
| `request_params` | API 请求参数JSONB |
| `manifest` / `extra` | 扩展信息JSONB |
---
## 5. CLI 入口与参数
### 5.1 入口文件
`apps/etl/connectors/feiqiu/cli/main.py`
### 5.2 完整参数列表
#### 基本参数
| 参数 | 类型 | 说明 |
|------|------|------|
| `--store-id` | int | 门店 ID |
| `--tasks` | str | 任务列表,逗号分隔(传统模式) |
| `--dry-run` | flag | 试运行(不提交) |
#### Flow 参数
| 参数 | 可选值 | 默认值 | 说明 |
|------|--------|--------|------|
| `--flow` | api_ods, api_ods_dwd, api_full, ods_dwd, dwd_dws, dwd_dws_index, dwd_index | — | Flow 类型 |
| `--layers` | ODS,DWD,DWS,INDEX 的任意组合 | — | ETL 层自由组合(与 `--flow` 互斥) |
| `--processing-mode` | increment_only, verify_only, increment_verify, full_window | increment_only | 处理模式 |
| `--fetch-before-verify` | flag | — | 校验前先从 API 获取数据 |
| `--verify-tables` | str | — | 仅校验指定表(逗号分隔) |
| `--window-split` | none, day, week, month | none | 时间窗口切分 |
| `--lookback-hours` | int | 24 | 回溯小时数 |
| `--overlap-seconds` | int | 3600 | 冗余秒数 |
#### 数据源参数
| 参数 | 可选值 | 默认值 | 说明 |
|------|--------|--------|------|
| `--data-source` | online, offline, hybrid | hybrid | 数据源模式 |
| `--pipeline-flow` | FULL, FETCH_ONLY, INGEST_ONLY | — | [已弃用] 请使用 --data-source |
| `--fetch-root` | str | — | 抓取 JSON 输出根目录 |
| `--ingest-source` | str | — | 本地清洗入库源目录 |
| `--write-pretty-json` | flag | — | 抓取 JSON 美化输出 |
#### 时间窗口参数
| 参数 | 类型 | 说明 |
|------|------|------|
| `--window-start` | str | 固定时间窗口开始(优先级高于游标) |
| `--window-end` | str | 固定时间窗口结束 |
| `--force-window-override` | flag | 强制使用 window_start/end不走 MAX(fetched_at) 兜底 |
| `--window-split-unit` | str | 窗口切分单位day/week/month/none |
| `--window-split-days` | 1/10/30 | 按天切分的天数 |
| `--window-compensation-hours` | int | 窗口前后补偿小时数 |
#### 数据库 / API / 目录参数
| 参数 | 说明 |
|------|------|
| `--pg-dsn` / `--pg-host` / `--pg-port` / `--pg-name` / `--pg-user` / `--pg-password` | PostgreSQL 连接参数 |
| `--api-base` / `--api-token` / `--api-timeout` / `--api-page-size` / `--api-retry-max` | API 连接参数 |
| `--export-root` / `--log-root` | 输出目录 |
| `--idle-start` / `--idle-end` | 闲时窗口HH:MM |
| `--allow-empty-advance` | 允许空结果推进窗口 |
### 5.3 典型使用示例
```bash
# 传统模式:指定任务
python -m cli.main --tasks ODS_MEMBER,ODS_ORDER --store-id 1
# Flow 全链路增量
python -m cli.main --flow api_full --processing-mode increment_only
# Flow 仅指数计算
python -m cli.main --flow dwd_index
# Flow DWS + INDEX
python -m cli.main --flow dwd_dws_index
# 使用 --layers 自由组合
python -m cli.main --layers ODS,DWD --store-id 1
# 仅执行 DWS 层
python -m cli.main --layers DWS
# 指定时间窗口
python -m cli.main --flow api_ods_dwd --window-start "2026-02-01" --window-end "2026-02-02"
# 校验并修复(先获取 API 数据)
python -m cli.main --flow api_full --processing-mode verify_only --fetch-before-verify
# 增量 + 校验
python -m cli.main --flow api_full --processing-mode increment_verify
# 离线模式(从本地 JSON 入库)
python -m cli.main --tasks DWD_LOAD_FROM_ODS --data-source offline --ingest-source ./data/json
```
### 5.4 配置优先级
```
代码默认值 < 根 .env < 应用 .env.local < 环境变量 < CLI 参数
```
时间窗口优先级:
```
--window-start/--window-endCLI 显式指定)
> run.window_override.start/end配置文件
> cursor.last_end - overlap_seconds游标推进
> now - window_minutes默认回溯
```
---
## 附录 AODS 层软删除与 Early-Cutoff 保护
### A.1 快照软删除机制
ODS 层通过快照对比实现软删除:将本次 API 返回的业务 ID 集合与数据库中已有 ID 对比,缺失的 ID 插入一条 `is_delete=1` 的新版本行INSERT 而非 UPDATE保留历史版本
三种快照模式(`SnapshotMode`
| 模式 | 行为 | 适用场景 |
|------|------|----------|
| `NONE` | 不做快照对比 | 流水类数据(支付、退款等) |
| `FULL_TABLE` | 对比全表所有记录 | 维度类数据(助教档案、会员卡等) |
| `WINDOW` | 仅对比时间窗口内的记录 | 按时间分布的流水(台费、服务记录等) |
运行时配置参数:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `run.snapshot_missing_delete` | `true` | 总开关,设为 `false` 可紧急关闭所有软删除 |
| `run.snapshot_allow_empty_delete` | `false` | API 返回空结果时是否仍触发软删除 |
| `run.snapshot_protect_early_cutoff` | `true` | Early-cutoff 保护开关(仅 WINDOW 模式) |
### A.2 Early-Cutoff 保护2026-02-18 新增)
问题场景:请求时间窗口 `2026-01-01 ~ 2026-02-18`,但 API 实际只返回 `2026-02-01` 起的数据(上游截断了更早的数据)。若直接用请求窗口做快照对比,`01-01 ~ 01-31` 的数据会被误标为软删除。
保护机制:在 `SnapshotMode.WINDOW` 模式下,收集每个 segment 内 API 返回记录中 `snapshot_time_column` 的实际最小值,将软删除的窗口起点从 `seg_start` 收窄至 `max(seg_start, actual_earliest)`
```
请求窗口: [--- 01-01 -------- 02-18 ---]
API 实际返回: [-- 02-01 -- 02-18 --]
↑ actual_earliest
软删除范围: [-- 02-01 -- 02-18 --] ← 收窄后
↑ 01-01~01-31 不受影响
```
触发条件:
- `run.snapshot_protect_early_cutoff = true`(默认)
- 任务的 `snapshot_mode = WINDOW`
- API 返回数据中 `snapshot_time_column` 的最小值晚于请求的 `seg_start`
关闭方式:设置 `run.snapshot_protect_early_cutoff = false`
---
## 附录 B窗口分段机制
`utils/windowing.py` 提供窗口拆分能力:
| 切分单位 | 行为 |
|----------|------|
| `none` | 不切分,整个窗口作为一段 |
| `day` | 按天切分(可配置 `split_days`=1/10/30 |
| `week` | 按 7 天切分 |
| `month` | 按自然月切分 |
补偿机制:`compensation_hours` 在窗口前后各扩展指定小时数,避免边界数据遗漏。
`BaseTask.execute()` 中的窗口分段仅在 `run.window_override` 存在时生效(`override_only=True`),否则整个窗口作为一段执行。
---
## 附录 C跨层强制全量更新2026-02-18 新增)
### C.1 参数
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `run.force_full_update` | `false` | 跳过所有变更检测,无条件写入 |
### C.2 适用场景
表结构发生变化(新增/修改字段)后,历史数据需要按新结构全量刷新。正常流程中 content_hash 去重和 IS DISTINCT FROM 对比会跳过"看起来没变"的行,导致新字段无法回填。
### C.3 各层行为
| 层 | 正常模式 | `force_full_update = true` |
|----|----------|---------------------------|
| ODS | content_hash 相同则跳过ON CONFLICT 带 WHERE IS DISTINCT FROM | 跳过 hash 去重ON CONFLICT 无条件 DO UPDATE SET |
| DWD 维度表 | `_is_row_changed()` 逐列对比,无变更则跳过 | 跳过变更检测,所有已存在行强制关闭旧版 + 插入新版 |
| DWD 事实表 | ON CONFLICT 带 WHERE IS DISTINCT FROM | ON CONFLICT 无条件 DO UPDATE SET |
### C.4 注意事项
- 该参数会显著增加数据库写入量,仅在结构变更后的一次性刷新中使用
- 刷新完成后应立即关闭(恢复为 `false`
- 可通过 CLI 临时覆盖:`--set run.force_full_update=true`