Files
ZQYY.FQ-ETL/.kiro/steering/structure.md

7.4 KiB
Raw Permalink Blame History

项目结构

FQ-ETL/                     # 工作区根目录C:\ZQYY\FQ-ETL
├── cli/                    # CLI 入口main.py
├── config/                 # 配置默认值、环境变量解析、AppConfig、调度任务配置
│   └── scheduled_tasks.json
├── api/                    # API 客户端HTTP、本地 JSON 回放、录制)
│   └── endpoint_routing.py # 端点路由映射
├── database/               # 数据库连接、操作、DDL Schema、种子脚本、迁移
│   ├── migrations/         # 迁移脚本(纯 SQL日期前缀命名
│   ├── schema_*.sql        # DDL 定义
│   └── seed_*.sql          # 种子数据
├── tasks/                  # ETL 任务实现(按数据层分目录)
│   ├── base_task.py        # BaseTask 基类,提供 Extract/Transform/Load 模板
│   ├── ods/                # ODS 层抓取任务16 个业务实体 + ods_tasks 工厂)
│   ├── dwd/                # DWD 层装载任务base_dwd_task、维度/事实装载、质量检查)
│   ├── dws/                # DWS 汇总与指数任务
│   │   └── index/          # 指数计算任务(亲密度、新客转化、召回、关系、赢回)
│   ├── utility/            # 工具类任务Schema 初始化、手动入库、完整性检查、DWS 构建等)
│   └── verification/       # ETL 后置校验任务ODS/DWD/DWS/指数校验器)
├── loaders/                # 数据加载器ODS、维度、事实
│   ├── base_loader.py      # BaseLoader 基类,定义 upsert 接口
│   ├── ods/                # 通用 ODS 加载器
│   ├── dimensions/         # SCD2 维度加载器(会员、助教、商品、台桌、套餐)
│   └── facts/              # 事实表加载器(订单、支付、退款、小票、充值等)
├── scd/                    # SCD2缓慢变化维度处理器
├── orchestration/          # 调度器、任务注册表、游标管理、运行记录
│   ├── pipeline_runner.py  # 管线运行器
│   ├── task_executor.py    # 任务执行器
│   ├── task_registry.py    # 任务注册表
│   ├── scheduler.py        # ETL 调度器
│   ├── cursor_manager.py   # 游标(水位)管理
│   └── run_tracker.py      # 运行记录追踪
├── quality/                # 数据质量检查器(余额一致性、完整性)
│   └── integrity_service.py # 完整性检查服务
├── models/                 # 解析器与验证器
├── utils/                  # 工具函数日志、JSON 存储、报告、窗口切分
├── gui/                    # PySide6 桌面 GUI
│   ├── main_window.py
│   ├── widgets/            # UI 面板与组件
│   ├── workers/            # 后台工作线程
│   ├── models/             # GUI 数据模型(任务、调度)
│   ├── utils/              # GUI 专用工具设置、CLI 构建器)
│   └── resources/          # 样式表
├── scripts/                # 运维/工具脚本
│   ├── run_update.py       # 一键增量更新入口ODS → DWD → DWS
│   ├── run_ods.bat         # ODS 批处理入口
│   ├── audit/              # 仓库审计脚本(扫描器、分析器、报告生成)
│   ├── check/              # 数据检查脚本完整性、ODS 缺口、DWD 服务、内容哈希等)
│   ├── db_admin/           # 数据库管理脚本Excel 导入)
│   ├── export/             # 数据导出脚本(指数、团购、亲密度、会员明细等)
│   ├── rebuild/            # 数据重建脚本(全量 ODS→DWD 重建)
│   └── repair/             # 数据修复脚本回填、去重、hash 修复、维度修复、索引调优)
├── tests/                  # 测试套件
│   ├── unit/               # 单元测试FakeDB/FakeAPI无需真实数据库
│   └── integration/        # 集成测试(需要 TEST_DB_DSN 或真实数据库)
├── docs/                   # 文档
│   ├── audit/              # 仓库审计报告(自动生成)
│   ├── bd_manual/          # 业务数据手册DWD/DWS 表说明)
│   │   ├── DWD/            # DWD 层表手册main + Ex 扩展)
│   │   └── dws/            # DWS 层表手册
│   ├── dictionary/         # 数据字典
│   ├── index/              # 指数算法文档
│   ├── requirements/       # 需求文档
│   ├── reports/            # 分析报告
│   ├── data_exports/       # 数据导出文档与 CSV
│   ├── templates/          # 模板文件Excel 等)
│   ├── api-reference/      # API 参考文档(标准化,替代 test-json-doc
│   │   ├── api_registry.json  # API 注册表25 个端点定义)
│   │   ├── endpoints/     # 每个 API 一个 .md 文档25 个)
│   │   └── samples/       # 最新响应样本JSON
│   ├── test-json-doc/      # [已废弃] 旧版 API 测试 JSON 样本与分析
│   └── 开发笔记/           # 开发备忘
├── reports/                # 质检输出JSON已 gitignore
├── export/                 # JSON 落盘与日志(已 gitignore
├── logs/                   # 运行日志(已 gitignore
└── .Deleted/               # 已归档/废弃文件(隐藏目录,已 gitignore

架构模式

  • 任务模式:每个 ETL 任务继承 BaseTaskExtract → Transform → Load 模板方法),在 orchestration/task_registry.py 中注册。
  • 加载器模式:每张目标表对应一个加载器,继承 BaseLoader 并实现 upsert() 方法。维度加载器在 loaders/dimensions/,事实加载器在 loaders/facts/
  • 配置分层DEFAULTS 字典 → .env 覆盖 → CLI 参数覆盖。通过 AppConfig.get("dotted.path") 访问。
  • 管线流程FULL(抓取 + 入库)、FETCH_ONLY(仅抓取)、INGEST_ONLY(仅入库)。由 --pipeline-flow CLI 参数或 PIPELINE_FLOW 环境变量控制。
  • 调度器ETLScheduler 编排任务执行,管理游标(水位),在 etl_admin Schema 中记录运行状态。
  • API 抽象APIClientHTTPLocalJsonClient(离线回放)、RecordingAPIClient(抓取 + 落盘)共享相同接口,任务代码无需关心数据来源。

编码约定

  • 文件编码UTF-8文件头加 # -*- coding: utf-8 -*-
  • 日志格式:通过 utils/logging_utils.py 统一
  • 任务代码:大写蛇形命名(如 DWD_LOAD_FROM_ODSDWS_ASSISTANT_DAILY
  • SQL 文件:纯 SQL不使用 ORM通过 psycopg2 执行
  • 数据库操作:批量 upsert + 冲突处理,显式 commit/rollback
  • 中文注释和文档字符串是正常且预期的