Files
Neo-ZQYY/scripts/ops/gen_dataflow_doc.py

341 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
从源代码和 DDL 中提取 API → ODS → DWD 数据流映射,生成 Markdown 文档。
用法: python scripts/ops/gen_dataflow_doc.py
输出: $FULL_DATAFLOW_DOC_ROOT/dataflow_api_ods_dwd.md由 .env 配置)
"""
import re
import ast
import sys
import os
from pathlib import Path
from collections import OrderedDict
ROOT = Path(__file__).resolve().parents[2]
ETL = ROOT / "apps" / "etl" / "pipelines" / "feiqiu"
DB = ROOT / "db" / "etl_feiqiu" / "schemas"
from _env_paths import get_output_path as _get_path
OUT = _get_path("FULL_DATAFLOW_DOC_ROOT") / "dataflow_api_ods_dwd.md"
# ── 1. 从 DDL 解析表结构 ──────────────────────────────────────────
def parse_ddl_tables(sql_path: Path, schema: str) -> dict[str, list[dict]]:
"""解析 CREATE TABLE 语句,返回 {schema.table: [{col, type}, ...]}"""
text = sql_path.read_text(encoding="utf-8")
tables: dict[str, list[dict]] = {}
# 匹配 CREATE TABLE IF NOT EXISTS table_name (...)
pattern = re.compile(
r"CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+"
r"(?:(\w+)\.)?(\w+)\s*\((.*?)\)\s*;",
re.DOTALL | re.IGNORECASE,
)
for m in pattern.finditer(text):
s = m.group(1) or schema
tname = m.group(2)
body = m.group(3)
cols = []
for line in body.split("\n"):
line = line.strip().rstrip(",")
if not line or line.upper().startswith("PRIMARY") or line.startswith("--"):
continue
# 跳过约束行
if re.match(r"^(CONSTRAINT|UNIQUE|CHECK|FOREIGN|EXCLUDE)\b", line, re.I):
continue
parts = line.split()
if len(parts) >= 2:
col_name = parts[0].strip('"')
col_type = parts[1]
# 合并类型修饰符
if len(parts) > 2 and parts[2].startswith("("):
col_type += parts[2]
cols.append({"col": col_name, "type": col_type})
full = f"{s}.{tname}"
tables[full] = cols
return tables
# ── 2. 从 Python 源码解析 TABLE_MAP ──────────────────────────────
def parse_table_map(py_path: Path) -> dict[str, str]:
"""解析 TABLE_MAP: dict[str, str] = {...}"""
text = py_path.read_text(encoding="utf-8")
# 找到 TABLE_MAP 字典
match = re.search(
r"TABLE_MAP\s*(?::\s*dict\[.*?\])?\s*=\s*\{(.*?)\}",
text,
re.DOTALL,
)
if not match:
return {}
body = match.group(1)
result = {}
for m in re.finditer(r'"([^"]+)"\s*:\s*"([^"]+)"', body):
result[m.group(1)] = m.group(2)
return result
# ── 3. 从 Python 源码解析 FACT_MAPPINGS ──────────────────────────
def parse_fact_mappings(py_path: Path) -> dict[str, list[tuple]]:
"""解析 FACT_MAPPINGS 字典,返回 {dwd_table: [(dwd_col, ods_expr, cast), ...]}"""
text = py_path.read_text(encoding="utf-8")
# 找到 FACT_MAPPINGS 块
start = text.find("FACT_MAPPINGS")
if start < 0:
return {}
# 找到第一个 { 后的内容
brace_start = text.find("{", start)
if brace_start < 0:
return {}
# 手动匹配大括号
depth = 0
end = brace_start
for i in range(brace_start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
end = i + 1
break
block = text[brace_start:end]
result = {}
# 匹配每个表的映射列表
table_pattern = re.compile(r'"([^"]+)"\s*:\s*\[', re.DOTALL)
for tm in table_pattern.finditer(block):
table_name = tm.group(1)
list_start = tm.end()
# 找到对应的 ]
bracket_depth = 1
list_end = list_start
for i in range(list_start, len(block)):
if block[i] == "[":
bracket_depth += 1
elif block[i] == "]":
bracket_depth -= 1
if bracket_depth == 0:
list_end = i
break
list_body = block[list_start:list_end]
# 匹配 (dwd_col, ods_expr, cast|None)
tuples = []
tuple_pattern = re.compile(
r'\(\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*(?:"([^"]+)"|None)\s*\)'
)
for tp in tuple_pattern.finditer(list_body):
tuples.append((tp.group(1), tp.group(2), tp.group(3)))
result[table_name] = tuples
return result
# ── 4. 从 Python 源码解析 ODS_TASK_SPECS ─────────────────────────
def parse_ods_specs(py_path: Path) -> list[dict]:
"""解析 ODS_TASK_SPECS提取 code, table_name, endpoint, list_key, description"""
text = py_path.read_text(encoding="utf-8")
specs = []
# 匹配每个 OdsTaskSpec(...)
pattern = re.compile(r"OdsTaskSpec\s*\((.*?)\)\s*,", re.DOTALL)
for m in pattern.finditer(text):
body = m.group(1)
spec = {}
for key in ("code", "table_name", "endpoint", "list_key", "description"):
km = re.search(rf'{key}\s*=\s*"([^"]*)"', body)
if km:
spec[key] = km.group(1)
if "code" in spec:
specs.append(spec)
return specs
# ── 5. 生成文档 ──────────────────────────────────────────────────
def generate_doc():
ods_ddl = parse_ddl_tables(DB / "ods.sql", "ods")
dwd_ddl = parse_ddl_tables(DB / "dwd.sql", "dwd")
dwd_task_py = ETL / "tasks" / "dwd" / "dwd_load_task.py"
table_map = parse_table_map(dwd_task_py)
fact_mappings = parse_fact_mappings(dwd_task_py)
ods_specs = parse_ods_specs(ETL / "tasks" / "ods" / "ods_tasks.py")
# ODS 表 → API 端点映射
ods_to_api: dict[str, dict] = {}
for spec in ods_specs:
tn = spec.get("table_name", "")
ods_to_api[tn] = spec
lines = []
lines.append("# API → ODS → DWD 数据流对比文档")
lines.append("")
lines.append("> 自动生成于 `scripts/ops/gen_dataflow_doc.py`,基于 DDL 和 ETL 源码解析。")
lines.append("")
lines.append("## 概览")
lines.append("")
lines.append(f"- ODS 表数量: {len(ods_ddl)}")
lines.append(f"- DWD 表数量: {len(dwd_ddl)}")
lines.append(f"- TABLE_MAP 映射条目: {len(table_map)}")
lines.append(f"- ODS 任务数量: {len(ods_specs)}")
lines.append("")
# ── 按 ODS 表分组 ──
# 先建立 ODS 表 → DWD 表列表的反向映射
ods_to_dwd: dict[str, list[str]] = {}
for dwd_t, ods_t in table_map.items():
ods_to_dwd.setdefault(ods_t, []).append(dwd_t)
# 收集所有涉及的 ODS 表(去重、排序)
all_ods = sorted(set(list(ods_to_dwd.keys()) + [s.get("table_name", "") for s in ods_specs]))
lines.append("## 目录")
lines.append("")
for i, ods_t in enumerate(all_ods, 1):
anchor = ods_t.replace(".", "").replace("_", "-")
short = ods_t.split(".")[-1] if "." in ods_t else ods_t
lines.append(f"{i}. [{short}](#{anchor})")
lines.append("")
lines.append("---")
lines.append("")
# ── 逐表详情 ──
for ods_t in all_ods:
short = ods_t.split(".")[-1] if "." in ods_t else ods_t
lines.append(f"## {short}")
lines.append("")
# API 信息
api_info = ods_to_api.get(ods_t, {})
if api_info:
lines.append("### API 端点")
lines.append("")
lines.append(f"- 任务编码: `{api_info.get('code', 'N/A')}`")
lines.append(f"- 端点: `{api_info.get('endpoint', 'N/A')}`")
lk = api_info.get("list_key")
if lk:
lines.append(f"- 数据路径: `data.{lk}`")
desc = api_info.get("description", "")
if desc:
lines.append(f"- 说明: {desc}")
lines.append("")
# ODS 表字段
ods_cols = ods_ddl.get(ods_t, [])
if ods_cols:
lines.append(f"### ODS 表: `{ods_t}` ({len(ods_cols)} 列)")
lines.append("")
lines.append("| # | 列名 | 类型 |")
lines.append("|---|------|------|")
for idx, c in enumerate(ods_cols, 1):
lines.append(f"| {idx} | `{c['col']}` | {c['type']} |")
lines.append("")
# DWD 表
dwd_tables = ods_to_dwd.get(ods_t, [])
if dwd_tables:
for dwd_t in sorted(dwd_tables):
dwd_cols = dwd_ddl.get(dwd_t, [])
is_dim = "dim_" in dwd_t
is_ex = dwd_t.endswith("_ex")
table_type = "维度" if is_dim else "事实"
if is_ex:
table_type += "(扩展)"
mappings = fact_mappings.get(dwd_t, [])
lines.append(f"### DWD 表: `{dwd_t}` — {table_type} ({len(dwd_cols)} 列)")
lines.append("")
# 字段对比表
lines.append("| # | DWD 列名 | DWD 类型 | ODS 来源表达式 | 转换 | 备注 |")
lines.append("|---|----------|----------|----------------|------|------|")
# 建立映射查找
mapping_dict = {m[0]: (m[1], m[2]) for m in mappings}
for idx, c in enumerate(dwd_cols, 1):
col_name = c["col"]
col_type = c["type"]
# SCD2 列
scd2_cols = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
if col_name.lower().replace("scd2_", "scd2_") in scd2_cols or col_name.lower() in scd2_cols:
lines.append(f"| {idx} | `{col_name}` | {col_type} | — | — | DWD 慢变元数据 |")
continue
if col_name in mapping_dict:
ods_expr, cast = mapping_dict[col_name]
cast_str = f"CAST → {cast}" if cast else "直接映射"
# 判断是否为 JSONB 提取
note = ""
if "->>" in ods_expr:
note = "JSONB 提取"
elif "CASE" in ods_expr.upper():
note = "派生计算"
elif ods_expr != col_name:
note = "字段重命名"
lines.append(f"| {idx} | `{col_name}` | {col_type} | `{ods_expr}` | {cast_str} | {note} |")
else:
# 同名直传
ods_col_names = {oc["col"].lower() for oc in ods_cols}
if col_name.lower() in ods_col_names:
lines.append(f"| {idx} | `{col_name}` | {col_type} | `{col_name}` | 直接映射 | 同名直传 |")
else:
lines.append(f"| {idx} | `{col_name}` | {col_type} | — | — | 未在 FACT_MAPPINGS 中显式映射 |")
lines.append("")
else:
lines.append(f"*该 ODS 表暂无 DWD 映射(仅用于 DWS 或其他下游)*")
lines.append("")
lines.append("---")
lines.append("")
# ── 附录ETL 元数据列说明 ──
lines.append("## 附录ETL 元数据列")
lines.append("")
lines.append("所有 ODS 表均包含以下 ETL 元数据列,不映射到 DWD")
lines.append("")
lines.append("| 列名 | 类型 | 说明 |")
lines.append("|------|------|------|")
lines.append("| `content_hash` | TEXT | 记录内容哈希,用于去重和变更检测 |")
lines.append("| `source_file` | TEXT | 原始导出文件名,用于数据追溯 |")
lines.append("| `source_endpoint` | TEXT | 采集来源接口/文件路径 |")
lines.append("| `fetched_at` | TIMESTAMPTZ | 采集/入库时间戳 |")
lines.append("| `payload` | JSONB | 完整原始 JSON 记录快照 |")
lines.append("")
lines.append("## 附录DWD 维度表 SCD2 列")
lines.append("")
lines.append("所有 DWD 维度表(`dim_*`)均包含以下 SCD2 慢变维度列:")
lines.append("")
lines.append("| 列名 | 类型 | 说明 |")
lines.append("|------|------|------|")
lines.append("| `scd2_start_time` | TIMESTAMPTZ | 版本生效起点 |")
lines.append("| `scd2_end_time` | TIMESTAMPTZ | 版本失效时间9999-12-31 = 当前) |")
lines.append("| `scd2_is_current` | INT | 当前版本标记1=当前0=历史) |")
lines.append("| `scd2_version` | INT | 版本号(自增) |")
lines.append("")
lines.append("## 附录DWD 事实表增量策略")
lines.append("")
lines.append("事实表按时间窗口增量写入,优先使用以下业务时间列进行过滤(按优先级排序):")
lines.append("")
lines.append("1. `pay_time` — 支付时间")
lines.append("2. `create_time` — 创建时间")
lines.append("3. `update_time` — 更新时间")
lines.append("4. `occur_time` — 发生时间")
lines.append("5. `settle_time` — 结算时间")
lines.append("6. `start_use_time` — 开始使用时间")
lines.append("7. `fetched_at` — 入库时间(兜底)")
lines.append("")
# 写入文件
OUT.parent.mkdir(parents=True, exist_ok=True)
OUT.write_text("\n".join(lines), encoding="utf-8")
print(f"文档已生成: {OUT}")
print(f" ODS 表: {len(ods_ddl)}, DWD 表: {len(dwd_ddl)}")
print(f" TABLE_MAP: {len(table_map)} 条, FACT_MAPPINGS: {len(fact_mappings)}")
print(f" ODS 任务: {len(ods_specs)}")
if __name__ == "__main__":
generate_doc()