Files
Neo-ZQYY/scripts/ops/gen_full_dataflow_doc.py

1131 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
全链路数据流文档生成器API JSON → ODS → DWD。
从真实 API 获取 JSON 样本,结合 DDL 和 ETL 源码,生成带跨层跳转链接的 Markdown 文档。
用法: python scripts/ops/gen_full_dataflow_doc.py
输出: $FULL_DATAFLOW_DOC_ROOT/dataflow_api_ods_dwd.md由 .env 配置)
$API_SAMPLE_CACHE_ROOT/*.jsonAPI 原始响应缓存)
"""
import json
import os
import re
import sys
import time
from collections import OrderedDict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import requests
from dotenv import load_dotenv
ROOT = Path(__file__).resolve().parents[2]
ETL = ROOT / "apps" / "etl" / "pipelines" / "feiqiu"
DB = ROOT / "db" / "etl_feiqiu" / "schemas"
# 从 .env 读取输出路径(缺失时抛 KeyError
from _env_paths import get_output_path as _get_path
OUT = _get_path("FULL_DATAFLOW_DOC_ROOT") / "dataflow_api_ods_dwd.md"
SAMPLE_DIR = _get_path("API_SAMPLE_CACHE_ROOT")
TZ = ZoneInfo("Asia/Shanghai")
# ── 加载环境变量 ──────────────────────────────────────────────────
load_dotenv(ETL / ".env", override=True)
load_dotenv(ROOT / ".env")
API_BASE = os.environ.get("API_BASE", "").rstrip("/")
API_TOKEN = os.environ.get("API_TOKEN", "")
STORE_ID = os.environ.get("STORE_ID", "2790685415443269")
API_TIMEOUT = int(os.environ.get("API_TIMEOUT", "20"))
# ETL 元数据列(不来自 API
ETL_META_COLS = {
"content_hash", "source_file", "source_endpoint",
"fetched_at", "payload", "record_index",
}
# ── ODS 任务规格(从 ODS_TASK_SPECS 提取的关键信息) ─────────────
# 格式: (code, table_name, endpoint, data_path, list_key, time_fields, requires_window, extra_params, description)
ODS_SPECS = [
{
"code": "ODS_ASSISTANT_ACCOUNT",
"table": "assistant_accounts_master",
"endpoint": "/PersonnelManagement/SearchAssistantInfo",
"data_path": ("data",),
"list_key": "assistantInfos",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "助教账号档案",
},
{
"code": "ODS_SETTLEMENT_RECORDS",
"table": "settlement_records",
"endpoint": "/Site/GetAllOrderSettleList",
"data_path": ("data",),
"list_key": "settleList",
"time_fields": ("rangeStartTime", "rangeEndTime"),
"requires_window": True,
"extra_params": {},
"description": "结账记录",
},
{
"code": "ODS_TABLE_USE",
"table": "table_fee_transactions",
"endpoint": "/Site/GetSiteTableOrderDetails",
"data_path": ("data",),
"list_key": "siteTableUseDetailsList",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "台费计费流水",
},
{
"code": "ODS_ASSISTANT_LEDGER",
"table": "assistant_service_records",
"endpoint": "/AssistantPerformance/GetOrderAssistantDetails",
"data_path": ("data",),
"list_key": "orderAssistantDetails",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "助教服务流水",
},
{
"code": "ODS_STORE_GOODS_SALES",
"table": "store_goods_sales_records",
"endpoint": "/TenantGoods/GetGoodsSalesList",
"data_path": ("data",),
"list_key": "orderGoodsLedgers",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "门店商品销售流水",
},
{
"code": "ODS_PAYMENT",
"table": "payment_transactions",
"endpoint": "/PayLog/GetPayLogListPage",
"data_path": ("data",),
"list_key": None,
"time_fields": ("StartPayTime", "EndPayTime"),
"requires_window": False,
"extra_params": {},
"description": "支付流水",
},
{
"code": "ODS_REFUND",
"table": "refund_transactions",
"endpoint": "/Order/GetRefundPayLogList",
"data_path": ("data",),
"list_key": None,
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "退款流水",
},
{
"code": "ODS_PLATFORM_COUPON",
"table": "platform_coupon_redemption_records",
"endpoint": "/Promotion/GetOfflineCouponConsumePageList",
"data_path": ("data",),
"list_key": None,
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "平台/团购券核销",
},
{
"code": "ODS_MEMBER",
"table": "member_profiles",
"endpoint": "/MemberProfile/GetTenantMemberList",
"data_path": ("data",),
"list_key": "tenantMemberInfos",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "会员档案",
},
{
"code": "ODS_MEMBER_CARD",
"table": "member_stored_value_cards",
"endpoint": "/MemberProfile/GetTenantMemberCardList",
"data_path": ("data",),
"list_key": "tenantMemberCards",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "会员储值卡",
},
{
"code": "ODS_MEMBER_BALANCE",
"table": "member_balance_changes",
"endpoint": "/MemberProfile/GetMemberCardBalanceChange",
"data_path": ("data",),
"list_key": "tenantMemberCardLogs",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "会员余额变动",
},
{
"code": "ODS_RECHARGE_SETTLE",
"table": "recharge_settlements",
"endpoint": "/Site/GetRechargeSettleList",
"data_path": ("data",),
"list_key": "settleList",
"time_fields": ("rangeStartTime", "rangeEndTime"),
"requires_window": True,
"extra_params": {},
"description": "充值结算",
},
{
"code": "ODS_GROUP_PACKAGE",
"table": "group_buy_packages",
"endpoint": "/PackageCoupon/QueryPackageCouponList",
"data_path": ("data",),
"list_key": "packageCouponList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "团购套餐定义",
},
{
"code": "ODS_GROUP_BUY_REDEMPTION",
"table": "group_buy_redemption_records",
"endpoint": "/Site/GetSiteTableUseDetails",
"data_path": ("data",),
"list_key": "siteTableUseDetailsList",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "团购套餐核销",
},
{
"code": "ODS_INVENTORY_STOCK",
"table": "goods_stock_summary",
"endpoint": "/TenantGoods/GetGoodsStockReport",
"data_path": ("data",),
"list_key": None,
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "库存汇总",
},
{
"code": "ODS_INVENTORY_CHANGE",
"table": "goods_stock_movements",
"endpoint": "/GoodsStockManage/QueryGoodsOutboundReceipt",
"data_path": ("data",),
"list_key": "queryDeliveryRecordsList",
"time_fields": ("startTime", "endTime"),
"requires_window": True,
"extra_params": {},
"description": "库存变化记录",
},
{
"code": "ODS_TABLES",
"table": "site_tables_master",
"endpoint": "/Table/GetSiteTables",
"data_path": ("data",),
"list_key": "siteTables",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "台桌维表",
},
{
"code": "ODS_GOODS_CATEGORY",
"table": "stock_goods_category_tree",
"endpoint": "/TenantGoodsCategory/QueryPrimarySecondaryCategory",
"data_path": ("data",),
"list_key": "goodsCategoryList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "库存商品分类树",
},
{
"code": "ODS_STORE_GOODS",
"table": "store_goods_master",
"endpoint": "/TenantGoods/GetGoodsInventoryList",
"data_path": ("data",),
"list_key": "orderGoodsList",
"time_fields": None,
"requires_window": False,
"extra_params": {"siteId": [STORE_ID]},
"description": "门店商品档案",
},
{
"code": "ODS_TABLE_FEE_DISCOUNT",
"table": "table_fee_discount_records",
"endpoint": "/Site/GetTaiFeeAdjustList",
"data_path": ("data",),
"list_key": "taiFeeAdjustInfos",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "台费折扣/调账",
},
{
"code": "ODS_TENANT_GOODS",
"table": "tenant_goods_master",
"endpoint": "/TenantGoods/QueryTenantGoods",
"data_path": ("data",),
"list_key": "tenantGoodsList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "租户商品档案",
},
]
# ── 浏览器风格请求头 ──────────────────────────────────────────────
HEADERS = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
"Origin": "https://pc.ficoo.vip",
"Referer": "https://pc.ficoo.vip/",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36"
),
"Authorization": f"Bearer {API_TOKEN}" if API_TOKEN else "",
}
# 默认 list_key 候选(与 APIClient 一致)
DEFAULT_LIST_KEYS = (
"list", "rows", "records", "items", "dataList", "data_list",
"tenantMemberInfos", "tenantMemberCardLogs", "tenantMemberCards",
"settleList", "orderAssistantDetails", "assistantInfos", "siteTables",
"taiFeeAdjustInfos", "siteTableUseDetailsList", "tenantGoodsList",
"packageCouponList", "queryDeliveryRecordsList", "goodsCategoryList",
"orderGoodsList", "orderGoodsLedgers",
)
# ══════════════════════════════════════════════════════════════════
# 1. API 请求
# ══════════════════════════════════════════════════════════════════
def api_post(endpoint: str, payload: dict) -> dict:
"""发送 POST 请求到 API。"""
url = f"{API_BASE}/{endpoint.lstrip('/')}"
resp = requests.post(url, json=payload, headers=HEADERS, timeout=API_TIMEOUT)
resp.raise_for_status()
data = resp.json()
code = data.get("code")
if code not in (0, "0", None):
msg = data.get("msg") or data.get("message") or ""
raise ValueError(f"API 错误 code={code} msg={msg} endpoint={endpoint}")
return data
def extract_list(payload: dict, data_path: tuple, list_key: str | None) -> list:
"""从 API 响应中提取记录列表。"""
cur = payload
for key in data_path:
if isinstance(cur, dict):
cur = cur.get(key)
else:
cur = None
if cur is None:
break
if isinstance(cur, list):
return cur
if isinstance(cur, dict):
if list_key and isinstance(cur.get(list_key), list):
return cur[list_key]
for k in DEFAULT_LIST_KEYS:
if isinstance(cur.get(k), list):
return cur[k]
for v in cur.values():
if isinstance(v, list):
return v
return []
def fetch_records(spec: dict, target_count: int = 200) -> list[dict]:
"""
获取 API 记录。
- 有时间字段的表:从今天往回 10 天一批,不够则继续扩展,最多 10 次重试
- 无时间字段的表:单次请求 200 条
"""
endpoint = spec["endpoint"]
data_path = spec["data_path"]
list_key = spec["list_key"]
time_fields = spec["time_fields"]
extra_params = spec.get("extra_params", {})
all_records = []
if time_fields:
# 有时间窗口:从今天往回扩展
start_key, end_key = time_fields
now = datetime.now(TZ)
end_dt = now
batch_days = 10
max_retries = 10
for attempt in range(max_retries):
start_dt = end_dt - timedelta(days=batch_days)
params = {
"siteId": STORE_ID,
"page": 1,
"limit": target_count,
start_key: start_dt.strftime("%Y-%m-%d %H:%M:%S"),
end_key: end_dt.strftime("%Y-%m-%d %H:%M:%S"),
**extra_params,
}
try:
resp = api_post(endpoint, params)
records = extract_list(resp, data_path, list_key)
all_records.extend(records)
except Exception as e:
print(f" 警告: API 请求失败 {endpoint} attempt={attempt+1}: {e}")
if len(all_records) >= target_count:
break
# 继续往前扩展
end_dt = start_dt
else:
# 无时间窗口:单次请求
params = {
"siteId": STORE_ID,
"page": 1,
"limit": target_count,
**extra_params,
}
try:
resp = api_post(endpoint, params)
all_records = extract_list(resp, data_path, list_key)
except Exception as e:
print(f" 警告: API 请求失败 {endpoint}: {e}")
return all_records[:target_count]
# ══════════════════════════════════════════════════════════════════
# 2. JSON 字段分析
# ══════════════════════════════════════════════════════════════════
def merge_record_layers(record: dict) -> dict:
"""模拟 ETL 的 merge_record_layers展平 data 和 settleList 嵌套层。"""
merged = dict(record)
data_part = merged.get("data")
while isinstance(data_part, dict):
merged = {**data_part, **merged}
data_part = data_part.get("data")
settle_inner = merged.get("settleList")
if isinstance(settle_inner, dict):
merged = {**settle_inner, **merged}
return merged
def analyze_json_fields(records: list[dict]) -> OrderedDict[str, dict]:
"""
分析多条 JSON 记录,合并所有字段信息。
返回 OrderedDict: field_name -> {"type": str, "sample": str, "nested": bool}
"""
fields: OrderedDict[str, dict] = OrderedDict()
for record in records:
merged = merge_record_layers(record)
for k, v in merged.items():
if k not in fields:
vtype = _json_type(v)
sample = _sample_value(v)
nested = isinstance(v, (dict, list))
fields[k] = {"type": vtype, "sample": sample, "nested": nested}
elif fields[k]["type"] == "null" and v is not None:
fields[k]["type"] = _json_type(v)
fields[k]["sample"] = _sample_value(v)
fields[k]["nested"] = isinstance(v, (dict, list))
return fields
def _json_type(v: Any) -> str:
if v is None:
return "null"
if isinstance(v, bool):
return "boolean"
if isinstance(v, int):
return "integer"
if isinstance(v, float):
return "number"
if isinstance(v, str):
return "string"
if isinstance(v, list):
return "array"
if isinstance(v, dict):
return "object"
return type(v).__name__
def _sample_value(v: Any, max_len: int = 40) -> str:
if v is None:
return "null"
if isinstance(v, (dict, list)):
s = json.dumps(v, ensure_ascii=False)
return s[:max_len] + "..." if len(s) > max_len else s
s = str(v)
return s[:max_len] + "..." if len(s) > max_len else s
# ══════════════════════════════════════════════════════════════════
# 3. DDL 解析
# ══════════════════════════════════════════════════════════════════
def parse_ddl_tables(sql_path: Path, schema: str) -> dict[str, list[dict]]:
"""解析 CREATE TABLE 语句,返回 {table_name: [{col, type}, ...]}(不含 schema 前缀)"""
text = sql_path.read_text(encoding="utf-8")
tables: dict[str, list[dict]] = {}
pattern = re.compile(
r"CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+"
r"(?:(\w+)\.)?(\w+)\s*\((.*?)\)\s*;",
re.DOTALL | re.IGNORECASE,
)
for m in pattern.finditer(text):
tname = m.group(2)
body = m.group(3)
cols = []
for line in body.split("\n"):
line = line.strip().rstrip(",")
if not line or line.upper().startswith("PRIMARY") or line.startswith("--"):
continue
if re.match(r"^(CONSTRAINT|UNIQUE|CHECK|FOREIGN|EXCLUDE)\b", line, re.I):
continue
parts = line.split()
if len(parts) >= 2:
col_name = parts[0].strip('"')
col_type = parts[1]
if len(parts) > 2 and parts[2].startswith("("):
col_type += parts[2]
cols.append({"col": col_name, "type": col_type})
tables[tname] = cols
return tables
# ══════════════════════════════════════════════════════════════════
# 4. TABLE_MAP / FACT_MAPPINGS 解析(复用 gen_dataflow_doc.py 逻辑)
# ══════════════════════════════════════════════════════════════════
def parse_table_map(py_path: Path) -> dict[str, str]:
"""解析 TABLE_MAP: {dwd_table -> ods_table}"""
text = py_path.read_text(encoding="utf-8")
match = re.search(
r"TABLE_MAP\s*(?::\s*dict\[.*?\])?\s*=\s*\{(.*?)\}",
text, re.DOTALL,
)
if not match:
return {}
body = match.group(1)
result = {}
for m in re.finditer(r'"([^"]+)"\s*:\s*"([^"]+)"', body):
result[m.group(1)] = m.group(2)
return result
def parse_fact_mappings(py_path: Path) -> dict[str, list[tuple]]:
"""解析 FACT_MAPPINGS: {dwd_table -> [(dwd_col, ods_expr, cast), ...]}"""
text = py_path.read_text(encoding="utf-8")
start = text.find("FACT_MAPPINGS")
if start < 0:
return {}
brace_start = text.find("{", start)
if brace_start < 0:
return {}
depth = 0
end = brace_start
for i in range(brace_start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
end = i + 1
break
block = text[brace_start:end]
result = {}
table_pattern = re.compile(r'"([^"]+)"\s*:\s*\[', re.DOTALL)
for tm in table_pattern.finditer(block):
table_name = tm.group(1)
list_start = tm.end()
bracket_depth = 1
list_end = list_start
for i in range(list_start, len(block)):
if block[i] == "[":
bracket_depth += 1
elif block[i] == "]":
bracket_depth -= 1
if bracket_depth == 0:
list_end = i
break
list_body = block[list_start:list_end]
tuples = []
tuple_pattern = re.compile(
r'\(\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*(?:"([^"]+)"|None)\s*\)'
)
for tp in tuple_pattern.finditer(list_body):
tuples.append((tp.group(1), tp.group(2), tp.group(3)))
result[table_name] = tuples
return result
# ══════════════════════════════════════════════════════════════════
# 5. 映射计算
# ══════════════════════════════════════════════════════════════════
def make_anchor(prefix: str, table: str) -> str:
"""生成 Markdown 锚点 ID。"""
return f"{prefix}-{table.replace('.', '-').replace('_', '-')}"
def compute_api_to_ods_mapping(
api_fields: OrderedDict[str, dict],
ods_cols: list[dict],
) -> list[dict]:
"""
计算 API JSON 字段 → ODS 列的映射。
ETL 使用大小写不敏感匹配。
"""
ods_by_lower = {}
for c in ods_cols:
col_name = c["col"]
if col_name.lower() not in ETL_META_COLS:
ods_by_lower[col_name.lower()] = col_name
mappings = []
matched_ods = set()
for api_key, info in api_fields.items():
api_lower = api_key.lower()
api_type = info["type"]
# 跳过展平源对象
if api_type == "object" and api_lower in ("data", "settlelist", "siteprofile", "tableprofile"):
mappings.append({
"api_field": api_key,
"api_type": api_type,
"ods_col": "",
"note": "嵌套对象,展平后各字段独立映射" if api_lower in ("data", "settlelist") else "嵌套对象,不直接映射",
})
continue
if api_lower in ods_by_lower:
ods_col = ods_by_lower[api_lower]
matched_ods.add(api_lower)
note = "同名映射" if api_key == ods_col else "大小写不敏感匹配"
mappings.append({
"api_field": api_key,
"api_type": api_type,
"ods_col": ods_col,
"note": note,
})
else:
mappings.append({
"api_field": api_key,
"api_type": api_type,
"ods_col": "",
"note": "仅存于 payload JSONB",
})
# ODS 中有但 API 中没有的非元数据列
for c in ods_cols:
col_name = c["col"]
if col_name.lower() not in matched_ods and col_name.lower() not in ETL_META_COLS:
# 检查是否已在 api_fields 中(可能来自嵌套展平)
if col_name.lower() not in {k.lower() for k in api_fields}:
mappings.append({
"api_field": "",
"api_type": "",
"ods_col": col_name,
"note": "ETL 派生/嵌套提取",
})
return mappings
def build_ods_to_dwd_info(
ods_table: str,
ods_cols: list[dict],
table_map: dict[str, str],
fact_mappings: dict[str, list[tuple]],
dwd_ddl: dict[str, list[dict]],
) -> list[dict]:
"""
构建 ODS 表到 DWD 表的映射信息。
返回 [{dwd_table, dwd_cols_info: [{col, type, ods_source, cast, note}]}]
"""
full_ods = f"ods.{ods_table}"
# 找到所有映射到此 ODS 表的 DWD 表
dwd_tables = [dwd_t for dwd_t, ods_t in table_map.items() if ods_t == full_ods]
if not dwd_tables:
return []
ods_col_names_lower = {c["col"].lower() for c in ods_cols}
results = []
for dwd_full in sorted(dwd_tables):
dwd_short = dwd_full.split(".")[-1] if "." in dwd_full else dwd_full
dwd_cols = dwd_ddl.get(dwd_short, [])
if not dwd_cols:
continue
mappings_list = fact_mappings.get(dwd_full, [])
mapping_dict = {m[0].lower(): (m[1], m[2]) for m in mappings_list}
scd2_cols = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
cols_info = []
for c in dwd_cols:
col_name = c["col"]
col_type = c["type"]
col_lower = col_name.lower()
if col_lower in scd2_cols:
cols_info.append({
"col": col_name, "type": col_type,
"ods_source": "", "cast": "", "note": "SCD2 元数据",
})
continue
if col_lower in mapping_dict:
ods_expr, cast = mapping_dict[col_lower]
cast_str = f"{cast}" if cast else "直接"
note = ""
if "->>" in ods_expr:
note = "JSONB 提取"
elif "CASE" in ods_expr.upper():
note = "派生计算"
elif ods_expr.lower() != col_lower:
note = "字段重命名"
cols_info.append({
"col": col_name, "type": col_type,
"ods_source": ods_expr, "cast": cast_str, "note": note,
})
elif col_lower in ods_col_names_lower:
cols_info.append({
"col": col_name, "type": col_type,
"ods_source": col_name, "cast": "直接", "note": "同名直传",
})
else:
cols_info.append({
"col": col_name, "type": col_type,
"ods_source": "", "cast": "", "note": "未显式映射",
})
is_dim = "dim_" in dwd_short
is_ex = dwd_short.endswith("_ex")
table_type = "维度" if is_dim else "事实"
if is_ex:
table_type += "(扩展)"
results.append({
"dwd_table": dwd_full,
"dwd_short": dwd_short,
"table_type": table_type,
"cols_info": cols_info,
})
return results
# ══════════════════════════════════════════════════════════════════
# 6. Markdown 文档生成
# ══════════════════════════════════════════════════════════════════
def generate_document(
specs: list[dict],
api_data: dict[str, list[dict]], # table -> records
api_fields: dict[str, OrderedDict], # table -> fields
ods_ddl: dict[str, list[dict]], # table -> cols
dwd_ddl: dict[str, list[dict]], # table -> cols
table_map: dict[str, str],
fact_mappings: dict[str, list[tuple]],
) -> str:
"""生成完整的全链路数据流 Markdown 文档。"""
lines: list[str] = []
# ── 文档头 ──
lines.append("# API → ODS → DWD 全链路数据流文档")
lines.append("")
lines.append(f"> 自动生成于 `scripts/ops/gen_full_dataflow_doc.py`,基于真实 API 响应 + DDL + ETL 源码。")
lines.append(f"> 生成时间:{datetime.now(TZ).strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
# ── 概览 ──
api_ok = sum(1 for v in api_data.values() if v)
lines.append("## 概览")
lines.append("")
lines.append(f"- API 端点数量: {len(specs)}")
lines.append(f"- 成功获取数据: {api_ok}/{len(specs)}")
lines.append(f"- ODS 表数量: {len(ods_ddl)}")
lines.append(f"- DWD 表数量: {len(dwd_ddl)}")
lines.append(f"- TABLE_MAP 映射: {len(table_map)}")
lines.append("")
# ── 图例 ──
lines.append("## 图例")
lines.append("")
lines.append("- 🔗 跳转链接:点击可在 JSON / ODS / DWD 层之间跳转")
lines.append("- ✅ 已映射到下游")
lines.append("- ⚠️ 仅存于 payload未入 ODS 列")
lines.append("- 🔄 大小写不敏感匹配")
lines.append("- 📦 嵌套对象")
lines.append("")
# ── 目录 ──
lines.append("## 目录")
lines.append("")
for i, spec in enumerate(specs, 1):
table = spec["table"]
desc = spec["description"]
anchor = make_anchor("api", table)
lines.append(f"{i}. [{desc} (`{table}`)](#{anchor})")
lines.append("")
lines.append("---")
lines.append("")
# ── 逐表详情 ──
for spec in specs:
table = spec["table"]
desc = spec["description"]
code = spec["code"]
endpoint = spec["endpoint"]
records = api_data.get(table, [])
fields = api_fields.get(table, OrderedDict())
ods_cols = ods_ddl.get(table, [])
api_anchor = make_anchor("api", table)
ods_anchor = make_anchor("ods", table)
# ════════════════════════════════════════════════════════
# API JSON 层
# ════════════════════════════════════════════════════════
lines.append(f'<a id="{api_anchor}"></a>')
lines.append("")
lines.append(f"## {desc} (`{table}`)")
lines.append("")
lines.append(f"- 任务编码: `{code}`")
lines.append(f"- API 端点: `{endpoint}`")
if spec["time_fields"]:
lines.append(f"- 时间字段: `{spec['time_fields'][0]}` / `{spec['time_fields'][1]}`")
lines.append(f"- 获取记录数: {len(records)}")
lines.append("")
if not fields:
lines.append("*未获取到 API 数据,跳过 JSON 字段分析。*")
lines.append("")
else:
# 计算 API→ODS 映射
api_ods_map = compute_api_to_ods_mapping(fields, ods_cols)
lines.append(f"### API 源字段({len(fields)} 个) [🔗 ODS](#{ods_anchor})")
lines.append("")
lines.append("| # | JSON 字段 | 类型 | 示例值 | → ODS 列 | 说明 |")
lines.append("|---|----------|------|--------|----------|------|")
idx = 0
for m in api_ods_map:
if m["api_field"] == "":
continue
idx += 1
api_f = m["api_field"]
api_t = m["api_type"]
ods_c = m["ods_col"]
note = m["note"]
sample = fields.get(api_f, {}).get("sample", "")
# 转义 Markdown 表格中的管道符
sample = sample.replace("|", "\\|")
if ods_c != "":
ods_link = f"[`{ods_c}`](#{ods_anchor})"
icon = "🔄" if "不敏感" in note else ""
else:
ods_link = ""
icon = "📦" if "嵌套" in note else "⚠️"
lines.append(f"| {idx} | `{api_f}` | {api_t} | {sample} | {ods_link} | {icon} {note} |")
# 统计
mapped = sum(1 for m in api_ods_map if m["ods_col"] != "" and m["api_field"] != "")
unmapped = sum(1 for m in api_ods_map if m["ods_col"] == "" and m["api_field"] != ""
and m["api_type"] not in ("object",))
lines.append("")
lines.append(f"> 映射统计:{mapped} 个字段映射到 ODS{unmapped} 个仅存于 payload。")
lines.append("")
# ════════════════════════════════════════════════════════
# ODS 层
# ════════════════════════════════════════════════════════
if ods_cols:
# 构建 DWD 映射信息
dwd_info_list = build_ods_to_dwd_info(table, ods_cols, table_map, fact_mappings, dwd_ddl)
# ODS 列 → DWD 列的反向查找(用于标注下游链接)
ods_to_dwd_cols: dict[str, list[str]] = {} # ods_col_lower -> [(dwd_table, dwd_col)]
for dwd_info in dwd_info_list:
for ci in dwd_info["cols_info"]:
src = ci["ods_source"]
if src != "":
# 提取纯列名(去掉 JSONB 表达式等)
src_lower = src.split("->")[0].strip().strip('"').lower()
key = src_lower
if key not in ods_to_dwd_cols:
ods_to_dwd_cols[key] = []
ods_to_dwd_cols[key].append((dwd_info["dwd_table"], ci["col"]))
lines.append(f'<a id="{ods_anchor}"></a>')
lines.append("")
lines.append(f"### ODS: `ods.{table}` ({len(ods_cols)} 列) [🔗 API](#{api_anchor})")
lines.append("")
# 过滤掉 ETL 元数据列
biz_cols = [c for c in ods_cols if c["col"].lower() not in ETL_META_COLS]
meta_cols = [c for c in ods_cols if c["col"].lower() in ETL_META_COLS]
lines.append("| # | ODS 列名 | 类型 | ← JSON 源 | → DWD 目标 |")
lines.append("|---|---------|------|-----------|-----------|")
for idx, c in enumerate(biz_cols, 1):
col_name = c["col"]
col_type = c["type"]
# 上游 JSON 链接
json_match = None
if fields:
for api_key in fields:
if api_key.lower() == col_name.lower():
json_match = api_key
break
if json_match:
json_link = f"[`{json_match}`](#{api_anchor})"
else:
json_link = ""
# 下游 DWD 链接
dwd_targets = ods_to_dwd_cols.get(col_name.lower(), [])
if dwd_targets:
dwd_links = []
for dwd_t, dwd_c in dwd_targets[:3]: # 最多显示 3 个
dwd_short = dwd_t.split(".")[-1] if "." in dwd_t else dwd_t
dwd_a = make_anchor("dwd", dwd_short)
dwd_links.append(f"[`{dwd_c}`](#{dwd_a})")
dwd_link = ", ".join(dwd_links)
if len(dwd_targets) > 3:
dwd_link += f" +{len(dwd_targets)-3}"
else:
dwd_link = ""
lines.append(f"| {idx} | `{col_name}` | {col_type} | {json_link} | {dwd_link} |")
if meta_cols:
lines.append("")
lines.append(f"*ETL 元数据列({len(meta_cols)} 个):" +
", ".join(f"`{c['col']}`" for c in meta_cols) + "*")
lines.append("")
# ════════════════════════════════════════════════════════
# DWD 层
# ════════════════════════════════════════════════════════
dwd_info_list = build_ods_to_dwd_info(table, ods_cols, table_map, fact_mappings, dwd_ddl)
if dwd_info_list:
for dwd_info in dwd_info_list:
dwd_short = dwd_info["dwd_short"]
dwd_full = dwd_info["dwd_table"]
table_type = dwd_info["table_type"]
cols_info = dwd_info["cols_info"]
dwd_anchor = make_anchor("dwd", dwd_short)
lines.append(f'<a id="{dwd_anchor}"></a>')
lines.append("")
lines.append(f"### DWD: `{dwd_full}` — {table_type} ({len(cols_info)} 列) "
f"[🔗 ODS](#{ods_anchor})")
lines.append("")
lines.append("| # | DWD 列名 | 类型 | ← ODS 来源 | 转换 | 说明 |")
lines.append("|---|---------|------|-----------|------|------|")
for idx, ci in enumerate(cols_info, 1):
col = ci["col"]
ctype = ci["type"]
src = ci["ods_source"]
cast = ci["cast"]
note = ci["note"]
if src != "":
ods_link = f"[`{src}`](#{ods_anchor})"
else:
ods_link = ""
lines.append(f"| {idx} | `{col}` | {ctype} | {ods_link} | {cast} | {note} |")
lines.append("")
else:
lines.append(f"*该 ODS 表暂无 DWD 映射(仅用于 DWS 或其他下游)*")
lines.append("")
lines.append("---")
lines.append("")
# ── 附录 ──
lines.append("## 附录")
lines.append("")
_append_appendix(lines)
return "\n".join(lines)
def _append_appendix(lines: list[str]):
"""添加附录内容。"""
lines.append("### ETL 元数据列")
lines.append("")
lines.append("所有 ODS 表均包含以下 ETL 元数据列,不映射到 DWD")
lines.append("")
lines.append("| 列名 | 类型 | 说明 |")
lines.append("|------|------|------|")
lines.append("| `content_hash` | TEXT | 记录内容哈希,用于去重和变更检测 |")
lines.append("| `source_file` | TEXT | 原始导出文件名,用于数据追溯 |")
lines.append("| `source_endpoint` | TEXT | 采集来源接口/文件路径 |")
lines.append("| `fetched_at` | TIMESTAMPTZ | 采集/入库时间戳 |")
lines.append("| `payload` | JSONB | 完整原始 JSON 记录快照 |")
lines.append("")
lines.append("### DWD 维度表 SCD2 列")
lines.append("")
lines.append("| 列名 | 类型 | 说明 |")
lines.append("|------|------|------|")
lines.append("| `scd2_start_time` | TIMESTAMPTZ | 版本生效起点 |")
lines.append("| `scd2_end_time` | TIMESTAMPTZ | 版本失效时间9999-12-31 = 当前) |")
lines.append("| `scd2_is_current` | INT | 当前版本标记1=当前0=历史) |")
lines.append("| `scd2_version` | INT | 版本号(自增) |")
lines.append("")
lines.append("### DWD 事实表增量策略")
lines.append("")
lines.append("事实表按时间窗口增量写入,优先使用以下业务时间列进行过滤:")
lines.append("")
lines.append("1. `pay_time` — 支付时间")
lines.append("2. `create_time` — 创建时间")
lines.append("3. `update_time` — 更新时间")
lines.append("4. `occur_time` — 发生时间")
lines.append("5. `settle_time` — 结算时间")
lines.append("6. `start_use_time` — 开始使用时间")
lines.append("7. `fetched_at` — 入库时间(兜底)")
lines.append("")
# ══════════════════════════════════════════════════════════════════
# 7. 主流程
# ══════════════════════════════════════════════════════════════════
def main():
print("=" * 60)
print("全链路数据流文档生成器")
print("=" * 60)
# 检查 API 配置
if not API_BASE or not API_TOKEN:
print("错误: 未配置 API_BASE 或 API_TOKEN请检查 .env 文件", file=sys.stderr)
sys.exit(1)
print(f"API_BASE: {API_BASE}")
print(f"STORE_ID: {STORE_ID}")
print()
# ── 解析 DDL ──
print("解析 DDL...")
ods_ddl = parse_ddl_tables(DB / "ods.sql", "ods")
dwd_ddl = parse_ddl_tables(DB / "dwd.sql", "dwd")
print(f" ODS 表: {len(ods_ddl)}, DWD 表: {len(dwd_ddl)}")
# ── 解析 TABLE_MAP / FACT_MAPPINGS ──
print("解析 TABLE_MAP / FACT_MAPPINGS...")
dwd_task_py = ETL / "tasks" / "dwd" / "dwd_load_task.py"
table_map = parse_table_map(dwd_task_py)
fact_mappings = parse_fact_mappings(dwd_task_py)
print(f" TABLE_MAP: {len(table_map)} 条, FACT_MAPPINGS: {len(fact_mappings)}")
# ── 创建样本目录 ──
SAMPLE_DIR.mkdir(parents=True, exist_ok=True)
# ── 获取 API 数据 ──
print()
print("从 API 获取数据...")
api_data: dict[str, list[dict]] = {}
api_fields: dict[str, OrderedDict] = {}
for spec in ODS_SPECS:
table = spec["table"]
cache_file = SAMPLE_DIR / f"{table}.json"
# 检查缓存24 小时内有效)
if cache_file.exists():
mtime = datetime.fromtimestamp(cache_file.stat().st_mtime, tz=TZ)
if (datetime.now(TZ) - mtime).total_seconds() < 86400:
print(f" [{spec['code']}] {table}: 使用缓存 ({cache_file.name})")
with open(cache_file, "r", encoding="utf-8") as f:
records = json.load(f)
api_data[table] = records
api_fields[table] = analyze_json_fields(records)
continue
print(f" [{spec['code']}] {table}: 请求 API...", end=" ", flush=True)
try:
records = fetch_records(spec, target_count=200)
api_data[table] = records
print(f"获取 {len(records)}")
# 保存缓存
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2, default=str)
# 分析字段
api_fields[table] = analyze_json_fields(records)
# 请求间隔,避免触发限流
time.sleep(0.5)
except Exception as e:
print(f"失败: {e}")
api_data[table] = []
api_fields[table] = OrderedDict()
# ── 生成文档 ──
print()
print("生成文档...")
doc = generate_document(
specs=ODS_SPECS,
api_data=api_data,
api_fields=api_fields,
ods_ddl=ods_ddl,
dwd_ddl=dwd_ddl,
table_map=table_map,
fact_mappings=fact_mappings,
)
OUT.parent.mkdir(parents=True, exist_ok=True)
OUT.write_text(doc, encoding="utf-8")
line_count = doc.count("\n") + 1
print(f"文档已生成: {OUT}")
print(f" 总行数: {line_count}")
print(f" API 样本缓存: {SAMPLE_DIR}")
print()
# 统计
ok = sum(1 for v in api_data.values() if v)
fail = len(ODS_SPECS) - ok
print(f"完成: {ok} 个表成功获取数据, {fail} 个未获取")
if __name__ == "__main__":
main()