Files
Neo-ZQYY/apps/etl/connectors/feiqiu/docs/architecture/system_overview.md

6.5 KiB
Raw Permalink Blame History

系统整体架构

技术栈

类别 技术
语言 Python 3.10+
数据库 PostgreSQL远程实例
DB 驱动 psycopg2-binary
HTTP 客户端 requests
日期处理 python-dateutil / tzdata
配置管理 python-dotenv
Excel 导入导出 openpyxl
后端 API FastAPI + uvicorn
管理后台 React + Vite + Ant Designapps/admin-web/
测试 pytest / hypothesis

模块交互关系

┌─────────────────────────────────────────────────────────┐
│                      入口层                              │
│  cli/main.pyCLI                                      │
└──────────┬──────────────────────────────────────────────┘
           │ AppConfig
           ▼
┌─────────────────────────────────────────────────────────┐
│                     编排层                               │
│  orchestration/                                          │
│  ├── flow_runner.py       Flow 运行器(执行流程编排)    │
│  ├── task_executor.py     任务执行器                     │
│  ├── task_registry.py     任务注册表                     │
│  ├── topological_sort.py  任务依赖拓扑排序               │
│  ├── scheduler.py         ETL 调度器(已弃用)           │
│  ├── cursor_manager.py    游标(水位)管理               │
│  └── run_tracker.py       运行记录追踪                   │
└──────────┬──────────────────────────────────────────────┘
           │
           ▼
┌─────────────────────────────────────────────────────────┐
│                     执行层                               │
│  tasks/                                                  │
│  ├── base_task.py         BaseTask 基类                  │
│  ├── ods/                 ODS 抓取任务23 个业务实体)   │
│  ├── dwd/                 DWD 装载任务(维度/事实/质检)  │
│  ├── dws/                 DWS 汇总与指数任务             │
│  │   └── index/           指数计算WBI/NCI/RS/OS/MS/ML│
│  ├── utility/             工具任务Schema 初始化等)     │
│  └── verification/        ETL 后置校验                   │
└──────────┬──────────────────────────────────────────────┘
           │
           ▼
┌──────────────────────┐  ┌───────────────────────────────┐
│      数据源层         │  │         数据装载层             │
│  api/                 │  │  loaders/                      │
│  ├── APIClient        │  │  ├── base_loader.py            │
│  ├── LocalJsonClient  │  │  ├── ods/         ODS 加载器   │
│  └── RecordingClient  │  │  ├── dimensions/  SCD2 维度    │
│                       │  │  └── facts/       事实表       │
└───────────────────────┘  └───────────────────────────────┘
           │                          │
           ▼                          ▼
┌─────────────────────────────────────────────────────────┐
│                   PostgreSQL                             │
│  ods │ dwd │ dws │ meta │ core │ app                     │
└─────────────────────────────────────────────────────────┘

执行链路

系统采用三层架构,执行流程如下:

  1. CLI 层cli/main.py):解析命令行参数 → 生成 AppConfig → 依赖注入
  2. 编排层orchestration/flow_runner.pyFlow 名称或 --layers 组合 → 层 → 任务列表解析 → 拓扑排序;processing_mode 控制增量/校验流程
  3. 执行层orchestration/task_executor.pyDataSource 枚举决定 fetch/ingest 路径,含游标管理、运行记录、失败标记

核心架构模式

任务模式

每个 ETL 任务继承 BaseTask,遵循 Extract → Transform → Load 模板方法,在 orchestration/task_registry.py 中注册。任务代码采用大写蛇形命名(如 DWD_LOAD_FROM_ODS)。

加载器模式

每张目标表对应一个加载器,继承 BaseLoader 并实现 upsert() 方法。维度加载器位于 loaders/dimensions/(走 SCD2事实加载器位于 loaders/facts/(增量写入)。核心是批量 upsert + 冲突处理策略。

配置分层

配置按优先级叠加:config/defaults.py(默认值)→ .env / 环境变量 → CLI 参数覆盖。通过 AppConfig.get("dotted.path") 统一访问。

API 抽象

APIClientHTTP 在线抓取)、LocalJsonClient(离线 JSON 回放)、RecordingAPIClient(抓取 + 落盘)共享相同接口,任务代码无需关心数据来源。

Flow执行流程模式

术语说明:Connector(数据源连接器)指对接的上游 SaaS 平台(如飞球);Flow(执行流程)指 ETL 任务的处理链路。CLI 参数 --flow 传递 Flow ID--pipeline 作为已弃用别名保留),--layers 支持自由组合层级。

通过 --data-source 参数控制:

  • hybrid:在线抓取 + 入库(默认)
  • online:仅在线抓取
  • offline:仅离线入库

调度与水位

FlowRunner 编排任务执行,cursor_manager 管理增量水位,run_trackermeta Schema 中记录运行状态,确保增量正确性和可重复性。--force-full 参数可强制全量重跑,忽略游标。