Files

20 KiB
Raw Permalink Blame History

设计文档

概览

scripts/ops/gen_full_dataflow_doc.py 进行重构,将其从单体运维脚本升级为"Python 数据采集 + Kiro Agent 语义分析"的双层架构。

核心设计原则:Python 脚本负责机械性数据准备API 调用、JSON 展开、DB 查询Kiro Agent 负责需要理解代码和业务上下文的语义工作(映射计算、字段作用推断、统计总结编排、报告组装)。

改进要点:

  1. JSON 层级完整展开:递归遍历 JSON 树,用 . 路径和 [] 数组标记展示完整层级,遍历所有记录拼合最全字段集
  2. 数据库驱动的表结构ODS/DWD 表结构从 information_schema + pg_description 查询,不依赖 DDL 文件
  3. 字段作用说明:由 Kiro Agent 结合 DDL COMMENT、数据样本、ETL 源码和映射关系推断
  4. 详细统计总结:由 Kiro Agent 编排有业务语义的字段统计和上下游映射总结
  5. CLI 任务化:支持 --date-from--date-to--limit--tables 参数,落盘到 SYSTEM_ANALYZE_ROOT
  6. Kiro Hook 集成userTriggered 类型 Hook 手动触发,先执行数据采集脚本,再由 Agent 完成语义分析

架构

graph TB
    subgraph "触发方式"
        CLI["CLI 命令行<br/>python scripts/ops/analyze_dataflow.py"]
        HOOK["Kiro Hook<br/>userTriggered 手动触发"]
    end

    subgraph "第一阶段Python 数据采集"
        ANALYZER["dataflow_analyzer.py<br/>核心采集模块"]
        CLI_ENTRY["analyze_dataflow.py<br/>CLI 入口 (argparse)"]
    end

    subgraph "数据源"
        FEIQIU_API["飞球 SaaS API"]
        PG_INFO["PostgreSQL<br/>information_schema.columns"]
        PG_COMMENT["PostgreSQL<br/>pg_catalog.pg_description"]
    end

    subgraph "中间产物 (SYSTEM_ANALYZE_ROOT/)"
        JSON_DUMP["api_samples/<br/>各表 JSON 原始数据"]
        JSON_TREE["json_trees/<br/>各表展开后的字段结构 (JSON)"]
        DB_SCHEMA["db_schemas/<br/>ODS/DWD 表结构 (JSON)"]
    end

    subgraph "第二阶段Kiro Agent 语义分析"
        MAPPING["映射计算<br/>JSON→ODS, ODS→DWD"]
        PURPOSE["字段作用推断<br/>COMMENT + 源码 + 样本"]
        SUMMARY["统计总结编排"]
        REPORT["Markdown 报告组装"]
    end

    subgraph "参考源码"
        ETL_SRC["ETL 源码<br/>loaders/ tasks/ models/ scd/"]
    end

    subgraph "最终输出"
        MD_FILE["Markdown 报告<br/>SYSTEM_ANALYZE_ROOT/dataflow_YYYY-MM-DD_HHMMSS.md"]
    end

    CLI --> CLI_ENTRY
    HOOK --> CLI_ENTRY
    CLI_ENTRY --> ANALYZER
    ANALYZER --> FEIQIU_API
    ANALYZER --> PG_INFO
    ANALYZER --> PG_COMMENT
    ANALYZER --> JSON_DUMP
    ANALYZER --> JSON_TREE
    ANALYZER --> DB_SCHEMA

    JSON_TREE --> MAPPING
    DB_SCHEMA --> MAPPING
    DB_SCHEMA --> PURPOSE
    JSON_DUMP --> PURPOSE
    ETL_SRC --> PURPOSE
    ETL_SRC --> MAPPING
    MAPPING --> SUMMARY
    PURPOSE --> SUMMARY
    SUMMARY --> REPORT
    REPORT --> MD_FILE

组件与接口

1. 核心采集模块 scripts/ops/dataflow_analyzer.py

gen_full_dataflow_doc.py 重构提取,专注于机械性数据采集,不做语义推断。

from dataclasses import dataclass, field
from datetime import date
from pathlib import Path
from collections import OrderedDict


@dataclass
class AnalyzerConfig:
    """采集配置,由 CLI 参数或 Hook 构造"""
    date_from: date | None = None
    date_to: date | None = None
    limit: int = 200                    # 每端点最大记录数
    tables: list[str] | None = None     # 指定表名None=全部
    output_dir: Path = Path("docs/reports")  # 落盘目录
    pg_dsn: str = ""
    api_base: str = ""
    api_token: str = ""
    store_id: str = ""


@dataclass
class FieldInfo:
    """JSON 字段信息(递归展开后)"""
    path: str           # 完整路径,如 "data.settleList[].amount"
    json_type: str      # "string" | "integer" | "number" | "boolean" | "object" | "array" | "null"
    sample: str         # 样本值(截断到 60 字符)
    depth: int          # 层级深度0 为顶层)
    occurrence: int     # 在所有记录中出现的次数
    total_records: int  # 总记录数


@dataclass
class ColumnInfo:
    """数据库列信息"""
    name: str
    data_type: str
    is_nullable: bool
    column_default: str | None
    comment: str | None       # DDL COMMENT 注释
    ordinal_position: int


@dataclass
class TableCollectionResult:
    """单张表的采集结果"""
    table_name: str
    task_code: str
    description: str
    endpoint: str
    record_count: int
    json_fields: OrderedDict[str, FieldInfo]   # path -> FieldInfo
    ods_columns: list[ColumnInfo]
    dwd_columns: list[ColumnInfo]
    raw_records_path: Path | None              # JSON 原始数据文件路径
    error: str | None = None


def flatten_json_tree(
    records: list[dict],
) -> OrderedDict[str, FieldInfo]:
    """
    递归展开 JSON 记录的完整层级结构。

    算法:
    1. 对每条记录递归遍历所有嵌套层级
    2. 用 '.' 分隔符拼接路径,数组用 '[]' 标记
    3. 遍历所有记录拼合最全字段集
    4. 统计每个字段的出现频率

    返回 path -> FieldInfo 的有序字典(按首次出现顺序)。
    """
    ...


def _recurse_json(
    obj: Any,
    prefix: str,
    depth: int,
    field_map: dict[str, FieldInfo],
    total_records: int,
):
    """
    递归遍历 JSON 值,填充 field_map。

    - dict: 遍历每个 key路径追加 ".key"
    - list: 路径追加 "[]",遍历每个元素
    - 标量: 记录类型、样本值、出现次数
    """
    ...


def query_table_columns(
    conn, schema: str, table: str,
) -> list[ColumnInfo]:
    """
    从 information_schema.columns + pg_description 查询表结构。

    SQL:
        SELECT c.column_name, c.data_type, c.is_nullable,
               c.column_default, c.ordinal_position,
               pgd.description AS column_comment
        FROM information_schema.columns c
        LEFT JOIN pg_catalog.pg_statio_all_tables st
            ON st.schemaname = c.table_schema
            AND st.relname = c.table_name
        LEFT JOIN pg_catalog.pg_description pgd
            ON pgd.objoid = st.relid
            AND pgd.objsubid = c.ordinal_position
        WHERE c.table_schema = %s AND c.table_name = %s
        ORDER BY c.ordinal_position;

    返回所有列(含版本控制列如 valid_from, valid_to, is_current, fetched_at    """
    ...


def collect_all_tables(
    config: AnalyzerConfig,
) -> list[TableCollectionResult]:
    """
    执行完整数据采集流程:
    1. 根据 config.tables 过滤要分析的表
    2. 对每张表:调用 API 获取 JSON → flatten_json_tree 展开
    3. 对每张表query_table_columns 查询 ODS 和 DWD 表结构
    4. 将原始 JSON 和结构化结果落盘到 config.output_dir
    5. 返回所有表的采集结果

    错误处理:单表失败不中断,记录 error 字段继续处理其余表。
    """
    ...


def dump_collection_results(
    results: list[TableCollectionResult],
    output_dir: Path,
) -> dict[str, Path]:
    """
    将采集结果序列化为 JSON 文件落盘。

    输出结构:
        {output_dir}/
            api_samples/{table}.json        — API 原始记录
            json_trees/{table}.json         — 展开后的字段结构
            db_schemas/ods_{table}.json     — ODS 表结构
            db_schemas/dwd_{table}.json     — DWD 表结构
            collection_manifest.json        — 采集清单(表名、记录数、时间戳)

    返回 {类别: 目录路径} 的字典。
    """
    ...

2. CLI 入口 scripts/ops/analyze_dataflow.py

"""
数据流结构分析 — CLI 入口

用法:
    python scripts/ops/analyze_dataflow.py
    python scripts/ops/analyze_dataflow.py --date-from 2025-01-01 --date-to 2025-01-15
    python scripts/ops/analyze_dataflow.py --limit 100 --tables settlement_records,payment_transactions
"""
import argparse
from pathlib import Path

def build_parser() -> argparse.ArgumentParser:
    """
    构造 CLI 参数解析器。

    参数:
        --date-from   数据获取起始日期 (YYYY-MM-DD)
        --date-to     数据获取截止日期 (YYYY-MM-DD)
        --limit       每端点最大记录数 (默认 200)
        --tables      要分析的表名列表 (逗号分隔,缺省=全部)
    """
    ...

def resolve_output_dir() -> Path:
    """
    确定输出目录:
    1. 优先读取环境变量 SYSTEM_ANALYZE_ROOT
    2. 回退到 docs/reports/
    3. 在根目录下创建按日期组织的子目录
    """
    ...

def main():
    """
    1. 解析 CLI 参数
    2. 加载环境变量(.env 分层叠加)
    3. 构造 AnalyzerConfig
    4. 调用 collect_all_tables() 执行采集
    5. 调用 dump_collection_results() 落盘
    6. 输出采集摘要到 stdout
    """
    ...

3. Kiro Hook .kiro/hooks/dataflow-analyze.kiro.hook

{
  "enabled": true,
  "name": "数据流结构分析",
  "description": "手动触发数据流结构分析:先执行 Python 脚本采集 API JSON 和 DB 表结构,再由 Kiro Agent 基于采集数据执行语义分析(映射计算、字段作用推断、统计总结),生成 Markdown 报告。",
  "version": "1",
  "when": {
    "type": "userTriggered"
  },
  "then": {
    "type": "prompt",
    "prompt": "执行数据流结构分析:\n1. 先运行 `python scripts/ops/analyze_dataflow.py` 完成数据采集\n2. 读取采集结果SYSTEM_ANALYZE_ROOT 或 docs/reports/ 下的 JSON 文件)\n3. 读取 ETL 源码loaders/、tasks/、models/、scd/ 等模块)理解数据流转逻辑\n4. 为每个字段推断作用说明(优先使用 DDL COMMENT结合源码和样本\n5. 计算 JSON→ODS 和 ODS→DWD 映射关系\n6. 编排统计总结(字段覆盖率、类型分布、上下游映射)\n7. 组装最终 Markdown 报告并保存"
  }
}

Hook 执行流程说明:

  • type: "userTriggered" — 开发者在 Kiro 中手动触发
  • type: "prompt" — 触发后由 Kiro Agent 执行 prompt 中描述的完整流程
  • Agent 先调用 Python 脚本完成数据采集,再基于采集数据执行语义分析
  • 当前仅覆盖飞球连接器;架构预留多连接器扩展(通过 --tables 参数或连接器发现机制)

4. 中间数据格式

Python 脚本采集后落盘的 JSON 文件,供 Kiro Agent 消费。

json_trees/{table}.json — 展开后的字段结构:

{
  "table": "settlement_records",
  "total_records": 200,
  "fields": [
    {
      "path": "data.settleList[].amount",
      "json_type": "number",
      "sample": "128.50",
      "depth": 2,
      "occurrence": 198,
      "total_records": 200
    }
  ]
}

db_schemas/ods_{table}.json — ODS 表结构:

{
  "schema": "ods",
  "table": "settlement_records",
  "columns": [
    {
      "name": "amount",
      "data_type": "numeric",
      "is_nullable": true,
      "column_default": null,
      "comment": "结算金额",
      "ordinal_position": 5
    }
  ]
}

collection_manifest.json — 采集清单:

{
  "timestamp": "2025-01-15T14:30:22+08:00",
  "config": {
    "date_from": "2025-01-01",
    "date_to": "2025-01-15",
    "limit": 200,
    "tables": null
  },
  "tables": [
    {
      "table": "settlement_records",
      "task_code": "ODS_SETTLEMENT_RECORDS",
      "record_count": 200,
      "ods_column_count": 25,
      "dwd_column_count": 18,
      "error": null
    }
  ]
}

数据模型

FieldInfoJSON 字段信息)

字段 类型 说明
path str 完整层级路径,如 data.settleList[].amount
json_type str JSON 类型string / integer / number / boolean / object / array / null
sample str 样本值(截断到 60 字符)
depth int 层级深度0 为顶层)
occurrence int 在所有记录中出现的次数
total_records int 总记录数

ColumnInfo数据库列信息

字段 类型 说明
name str 列名
data_type str PostgreSQL 数据类型
is_nullable bool 是否可空
column_default str | None 默认值
comment str | None DDL COMMENT 注释(来自 pg_description
ordinal_position int 列序号

TableCollectionResult单表采集结果

字段 类型 说明
table_name str 表名
task_code str ETL 任务编码,如 ODS_SETTLEMENT_RECORDS
description str 中文描述
endpoint str API 端点路径
record_count int 获取的记录数
json_fields OrderedDict[str, FieldInfo] 展开后的 JSON 字段结构
ods_columns list[ColumnInfo] ODS 表结构
dwd_columns list[ColumnInfo] DWD 表结构
raw_records_path Path | None 原始 JSON 文件路径
error str | None 错误信息None 表示成功)

AnalyzerConfig采集配置

字段 类型 默认值 说明
date_from date | None None 数据获取起始日期
date_to date | None None 数据获取截止日期
limit int 200 每端点最大记录数
tables list[str] | None None 指定表名列表None=全部
output_dir Path docs/reports/ 落盘目录
pg_dsn str "" PostgreSQL 连接串
api_base str "" API 基础 URL
api_token str "" API 认证令牌
store_id str "" 门店 ID

数据库查询 SQL

-- 查询表结构(含 COMMENT 注释)
SELECT
    c.column_name,
    c.data_type,
    c.is_nullable,
    c.column_default,
    c.ordinal_position,
    pgd.description AS column_comment
FROM information_schema.columns c
LEFT JOIN pg_catalog.pg_statio_all_tables st
    ON st.schemaname = c.table_schema AND st.relname = c.table_name
LEFT JOIN pg_catalog.pg_description pgd
    ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
WHERE c.table_schema = %s AND c.table_name = %s
ORDER BY c.ordinal_position;

Kiro Agent 语义分析输出格式

Agent 在消费中间数据后,生成的最终 Markdown 报告遵循以下表格结构:

API 源字段表格(每张表一个):

# JSON 路径 类型 层级 出现率 示例值 → ODS 列 字段作用 处理

ODS 表格(每张表一个):

# 列名 数据类型 可空 COMMENT ← JSON 来源 → DWD 列 字段作用

DWD 表格(每张表一个):

# 列名 数据类型 可空 COMMENT ← ODS 来源 字段作用 来源类型

"来源类型"列标注:直接映射 / ETL 派生 / SCD2 版本控制 / 元数据

正确性属性

正确性属性是一种在系统所有合法执行路径上都应成立的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

以下属性基于需求文档中的验收标准推导,聚焦于 Python 数据采集模块(dataflow_analyzer.py)中可自动化测试的核心逻辑。

注:需求 3字段作用说明、需求 4统计总结的验收标准由 Kiro Agent 执行语义分析,不属于可自动化测试的代码逻辑,因此不生成正确性属性。需求 6Kiro Hook为平台集成配置同样不生成属性。

Property 1: JSON 递归展开路径正确性

对于任意嵌套 JSON 对象(含 dict、list、标量的任意组合flatten_json_tree 的输出应满足:

  • 每个叶子节点的路径使用 . 分隔层级
  • 数组层级使用 [] 标记
  • 每个 FieldInfo 的 depth 等于路径中 . 分隔符的数量(即实际嵌套深度)
  • 输出中不遗漏任何叶子节点

Validates: Requirements 1.1, 1.3, 1.4

Property 2: 多记录字段合并完整性与出现频率准确性

对于任意一组 JSON 记录列表,flatten_json_tree 的输出应满足:

  • 合并后的字段路径集合是所有单条记录字段路径集合的并集(不遗漏任何记录中出现过的字段)
  • 每个字段的 occurrence 等于实际包含该字段的记录数
  • 每个字段的 occurrencetotal_records
  • total_records 等于输入记录列表的长度

Validates: Requirements 1.2, 1.5

Property 3: 输出文件名格式正确性

对于任意合法的日期时间值,生成的输出文件名应匹配模式 dataflow_YYYY-MM-DD_HHMMSS.md,且文件名中的日期时间与输入时间一致。

Validates: Requirements 5.6

错误处理

数据采集阶段Python 脚本)

错误场景 处理策略 对应需求
API 请求失败(超时/HTTP 错误) 记录错误到 TableCollectionResult.error,跳过该表继续处理其余表 2.4
数据库连接失败 记录错误信息ODS/DWD 列信息置空,不中断其余表分析 2.4
表不存在information_schema 查询返回空) 记录警告,该表的 ods_columns/dwd_columns 为空列表 2.4
SYSTEM_ANALYZE_ROOT 未配置 回退到 docs/reports/ 默认目录 5.5
SYSTEM_ANALYZE_ROOT 目录不存在 自动创建目录(mkdir -p 5.4
JSON 记录为空列表 json_fields 为空 OrderedDictrecord_count=0正常落盘 1.2
API 返回非 JSON 响应 捕获 json.JSONDecodeError,记录错误,跳过该表
CLI 参数格式错误(如日期格式不合法) argparse 自动报错退出,显示用法帮助 5.1

Kiro Agent 语义分析阶段

错误场景 处理策略
采集数据文件缺失或损坏 Agent 报告错误,建议重新运行数据采集脚本
ETL 源码结构变更导致映射推断失败 Agent 在报告中标注"映射待确认",不阻塞其余字段分析
某张表采集失败error 非空) Agent 在报告中标注该表采集失败原因,继续处理其余表

测试策略

测试框架

  • 单元测试:pytest
  • 属性测试:hypothesispytest 插件模式)
  • 测试位置:tests/test_dataflow_analyzer.pyMonorepo 级属性测试目录)

属性测试

使用 hypothesis 生成随机嵌套 JSON 结构,验证 flatten_json_tree 的核心正确性属性。

每个属性测试至少运行 100 次迭代。

属性 测试方法 生成器
Property 1: JSON 递归展开路径正确性 生成任意嵌套 JSONdict/list/标量组合),调用 flatten_json_tree,验证路径格式、[] 标记、depth 一致性 hypothesis.strategies 递归策略生成嵌套 JSON
Property 2: 多记录字段合并完整性 生成随机记录列表(部分字段随机缺失),调用 flatten_json_tree,验证并集完整性和 occurrence 准确性 st.lists(st.dictionaries(...))
Property 3: 输出文件名格式 生成随机 datetime调用文件名生成函数验证格式匹配 st.datetimes()

标注格式:

# Feature: dataflow-structure-audit, Property 1: JSON 递归展开路径正确性
# Validates: Requirements 1.1, 1.3, 1.4
@given(...)
def test_flatten_json_tree_path_correctness(...):
    ...

单元测试

针对具体示例和边界情况:

测试场景 验证内容 对应需求
空 JSON 记录列表 flatten_json_tree([]) 返回空 OrderedDict 1.2 边界
单层 JSON无嵌套 路径无 . 分隔depth=0 1.1
深层嵌套5+ 层) 路径正确拼接depth 正确 1.1, 1.4
数组内嵌套对象 items[].name 格式正确 1.3
SYSTEM_ANALYZE_ROOT 环境变量存在/缺失 输出目录正确回退 5.4, 5.5
CLI 参数解析 --date-from--date-to--limit--tables 正确解析 5.1, 5.2, 5.3
数据库连接失败时的错误隔离 单表失败不影响其余表 2.4
Hook 配置文件格式 JSON 结构正确type 为 userTriggered 6.1