Files
Neo-ZQYY/scripts/ops/dataflow_analyzer.py

1367 lines
48 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据流结构分析 — 核心采集模块
从上游 SaaS API 采集 JSON 数据、递归展开 JSON 层级结构、
查询 PostgreSQL 表结构,输出结构化中间数据供 Kiro Agent 消费。
"""
from __future__ import annotations
import logging
from collections import OrderedDict
import json
from dataclasses import asdict, dataclass, field
from datetime import date, datetime
from pathlib import Path
from typing import Any
@dataclass
class AnalyzerConfig:
"""采集配置,由 CLI 参数或 Hook 构造"""
date_from: date | None = None
date_to: date | None = None
limit: int = 200
tables: list[str] | None = None
# 调用方必须显式传入(从 SYSTEM_ANALYZE_ROOT 环境变量读取)
output_dir: Path = field(default_factory=lambda: Path(""))
pg_dsn: str = ""
api_base: str = ""
api_token: str = ""
store_id: str = ""
@dataclass
class FieldInfo:
"""JSON 字段信息(递归展开后)"""
path: str # 完整路径,如 "data.settleList[].amount"
json_type: str # "string" | "integer" | "number" | "boolean" | "object" | "array" | "null"
sample: str # 样本值(截断到 60 字符)
depth: int # 层级深度0 为顶层)
occurrence: int # 在所有记录中出现的次数
total_records: int # 总记录数
# 多示例值:最多保留 MAX_SAMPLES 个不同值(用于枚举检测和报告展示)
samples: list[str] = field(default_factory=list)
@dataclass
class ColumnInfo:
"""数据库列信息"""
name: str
data_type: str
is_nullable: bool
column_default: str | None
comment: str | None # DDL COMMENT 注释(来自 pg_description
ordinal_position: int
@dataclass
class TableCollectionResult:
"""单张表的采集结果"""
table_name: str
task_code: str
description: str
endpoint: str
record_count: int
json_fields: OrderedDict[str, FieldInfo] = field(default_factory=OrderedDict)
ods_columns: list[ColumnInfo] = field(default_factory=list)
dwd_columns: list[ColumnInfo] = field(default_factory=list)
# 多张 DWD 表结构:{dwd_short_name -> [ColumnInfo]}
dwd_tables: dict[str, list[ColumnInfo]] = field(default_factory=dict)
raw_records_path: Path | None = None
error: str | None = None
# --- JSON 类型映射 ---
_JSON_TYPE_MAP: dict[type, str] = {
dict: "object",
list: "array",
str: "string",
int: "integer",
float: "number",
bool: "boolean",
}
def _json_type_name(value: Any) -> str:
"""将 Python 值映射为 JSON 类型名称。"""
if value is None:
return "null"
# bool 必须在 int 之前判断bool 是 int 的子类)
if isinstance(value, bool):
return "boolean"
return _JSON_TYPE_MAP.get(type(value), "string")
# 每个字段最多保留的不同示例值数量
MAX_SAMPLES = 8
def _truncate_sample(value: Any, max_len: int = 60) -> str:
"""将值转为字符串并截断到 max_len 字符。"""
s = str(value)
return s[:max_len] if len(s) > max_len else s
def _recurse_json(
obj: Any,
prefix: str,
depth: int,
field_map: dict[str, FieldInfo],
total_records: int,
) -> None:
"""
递归遍历 JSON 值,填充 field_map。
- dict: 遍历每个 key路径追加 ".key"
- list: 路径追加 "[]",遍历每个元素
- 标量: 记录类型、样本值、出现次数
"""
if isinstance(obj, dict):
for key, val in obj.items():
child_path = f"{prefix}.{key}" if prefix else key
child_depth = depth + 1 if prefix else 0
# depth = 路径中 '.' 的数量(顶层为 0
# 顶层字段 prefix="" → child_depth=0
# 嵌套字段 prefix="data" → child_depth=1
_recurse_json(val, child_path, child_path.replace("[]", "").count("."), field_map, total_records)
elif isinstance(obj, list):
arr_path = f"{prefix}[]" if prefix else "[]"
for item in obj:
_recurse_json(item, arr_path, depth, field_map, total_records)
else:
# 标量叶子节点 — 记录到 field_map
if not prefix:
return
actual_depth = prefix.replace("[]", "").count(".")
if prefix in field_map:
fi = field_map[prefix]
fi.occurrence += 1
# 如果之前是 null 类型,用新的非 null 类型覆盖
if fi.json_type == "null" and obj is not None:
fi.json_type = _json_type_name(obj)
fi.sample = _truncate_sample(obj)
# 收集多示例值(去重,限制数量)
if obj is not None:
s = _truncate_sample(obj)
if s and len(fi.samples) < MAX_SAMPLES and s not in fi.samples:
fi.samples.append(s)
else:
sample_str = _truncate_sample(obj)
field_map[prefix] = FieldInfo(
path=prefix,
json_type=_json_type_name(obj),
sample=sample_str,
depth=actual_depth,
occurrence=1,
total_records=total_records,
samples=[sample_str] if (obj is not None and sample_str) else [],
)
def flatten_json_tree(
records: list[dict],
) -> OrderedDict[str, FieldInfo]:
"""
递归展开 JSON 记录的完整层级结构。
算法:
1. 对每条记录递归遍历所有嵌套层级
2. 用 '.' 分隔符拼接路径,数组用 '[]' 标记
3. 遍历所有记录拼合最全字段集
4. 统计每个字段的出现频率
返回 path -> FieldInfo 的有序字典(按首次出现顺序)。
"""
total = len(records)
if total == 0:
return OrderedDict()
# 第一遍收集所有字段路径和样本occurrence 按叶子节点累加)
global_map: dict[str, FieldInfo] = {}
for record in records:
# 每条记录独立追踪出现的路径,避免同一记录内重复计数
per_record_map: dict[str, FieldInfo] = {}
_recurse_json(record, "", 0, per_record_map, total)
for path, fi in per_record_map.items():
if path in global_map:
global_map[path].occurrence += 1
# 用非 null 类型覆盖
if global_map[path].json_type == "null" and fi.json_type != "null":
global_map[path].json_type = fi.json_type
global_map[path].sample = fi.sample
# 合并示例值
for s in fi.samples:
if s and len(global_map[path].samples) < MAX_SAMPLES and s not in global_map[path].samples:
global_map[path].samples.append(s)
else:
fi.occurrence = 1
fi.total_records = total
global_map[path] = fi
# 按首次出现顺序构建 OrderedDictdict 在 Python 3.7+ 保持插入顺序)
result = OrderedDict()
for path, fi in global_map.items():
fi.total_records = total
result[path] = fi
return result
logger = logging.getLogger(__name__)
def query_table_columns(
conn,
schema: str,
table: str,
) -> list[ColumnInfo]:
"""
从 information_schema.columns + pg_description 查询表结构。
返回所有列(含版本控制列如 valid_from, valid_to, is_current, fetched_at
连接失败或表不存在时返回空列表并记录错误。
"""
sql = """
SELECT c.column_name, c.data_type, c.is_nullable,
c.column_default, c.ordinal_position,
pgd.description AS column_comment
FROM information_schema.columns c
LEFT JOIN pg_catalog.pg_statio_all_tables st
ON st.schemaname = c.table_schema
AND st.relname = c.table_name
LEFT JOIN pg_catalog.pg_description pgd
ON pgd.objoid = st.relid
AND pgd.objsubid = c.ordinal_position
WHERE c.table_schema = %s AND c.table_name = %s
ORDER BY c.ordinal_position;
"""
try:
with conn.cursor() as cur:
cur.execute(sql, (schema, table))
rows = cur.fetchall()
except Exception:
logger.error("查询表结构失败: %s.%s", schema, table, exc_info=True)
return []
if not rows:
logger.warning("表不存在或无列: %s.%s", schema, table)
return []
columns: list[ColumnInfo] = []
for row in rows:
col_name, data_type, is_nullable_str, col_default, ordinal, comment = row
columns.append(
ColumnInfo(
name=col_name,
data_type=data_type,
is_nullable=is_nullable_str == "YES",
column_default=col_default,
comment=comment,
ordinal_position=ordinal,
)
)
return columns
def collect_all_tables(
config: AnalyzerConfig,
specs: list[dict] | None = None,
fetch_fn=None,
) -> list[TableCollectionResult]:
"""
执行完整数据采集流程编排。
参数:
config: 采集配置
specs: ODS_SPECS 列表,每项包含 code/table/endpoint/description 等字段。
缺省时使用本模块的 ODS_SPECS。
fetch_fn: 可选的自定义 fetch 函数,签名 (spec, limit) -> list[dict]。
缺省时使用本模块的 fetch_records(spec, config)。
流程:
1. 根据 config.tables 过滤 specs
2. 建立数据库连接(可选)
3. 逐表API 采集 → JSON 展开 → ODS/DWD 表结构查询
4. 单表失败不中断,记录 error 继续
5. 关闭数据库连接,返回结果列表
"""
# 延迟导入 psycopg2避免模块级强依赖
try:
import psycopg2
except ImportError:
psycopg2 = None # type: ignore[assignment]
logger.warning("psycopg2 未安装,将跳过数据库表结构查询")
# 缺省使用本模块的 ODS_SPECS
if specs is None:
specs = ODS_SPECS
# ── 1. 过滤 specs ──
if config.tables:
table_set = {t.strip().lower() for t in config.tables}
filtered = [s for s in specs if s["table"].lower() in table_set]
else:
filtered = list(specs)
if not filtered:
logger.warning("过滤后无可分析的表config.tables=%s", config.tables)
return []
# ── 2. 建立数据库连接 ──
conn = None
if psycopg2 and config.pg_dsn:
try:
conn = psycopg2.connect(config.pg_dsn)
except Exception:
logger.error("数据库连接失败: %s", config.pg_dsn, exc_info=True)
# ── 2b. 解析 TABLE_MAP用于查询所有关联的 DWD 表) ──
_table_map = parse_table_map()
# ── 3. 逐表采集 ──
results: list[TableCollectionResult] = []
for spec in filtered:
table_name = spec["table"]
task_code = spec.get("code", "")
description = spec.get("description", "")
endpoint = spec.get("endpoint", "")
try:
# 3a. API 采集
if fetch_fn is not None:
records = fetch_fn(spec, config.limit)
else:
# 使用本模块的 fetch_records
records = fetch_records(spec, config)
# 3b. JSON 展开
json_fields = flatten_json_tree(records)
# 3c. ODS/DWD 表结构查询
# 通过 TABLE_MAP 查询所有关联的 DWD 表(一个 ODS 可映射多张 DWD
ods_cols: list[ColumnInfo] = []
dwd_cols: list[ColumnInfo] = []
dwd_tables_dict: dict[str, list[ColumnInfo]] = {}
if conn is not None:
ods_cols = query_table_columns(conn, "ods", table_name)
# 查询所有映射到此 ODS 表的 DWD 表
full_ods = f"ods.{table_name}"
dwd_table_names = [
dwd_t for dwd_t, ods_t in _table_map.items()
if ods_t == full_ods
]
for dwd_full in sorted(dwd_table_names):
dwd_short = dwd_full.split(".")[-1] if "." in dwd_full else dwd_full
cols = query_table_columns(conn, "dwd", dwd_short)
if cols:
dwd_tables_dict[dwd_short] = cols
dwd_cols.extend(cols)
results.append(
TableCollectionResult(
table_name=table_name,
task_code=task_code,
description=description,
endpoint=endpoint,
record_count=len(records),
json_fields=json_fields,
ods_columns=ods_cols,
dwd_columns=dwd_cols,
dwd_tables=dwd_tables_dict,
)
)
logger.info(
"采集完成: %s%d 条记录, %d 个 JSON 字段, ODS %d 列, DWD %d",
table_name, len(records), len(json_fields),
len(ods_cols), len(dwd_cols),
)
except Exception as exc:
# 单表失败不中断
logger.error("采集失败: %s%s", table_name, exc, exc_info=True)
results.append(
TableCollectionResult(
table_name=table_name,
task_code=task_code,
description=description,
endpoint=endpoint,
record_count=0,
error=str(exc),
)
)
# ── 4. 关闭数据库连接 ──
if conn is not None:
try:
conn.close()
except Exception:
logger.warning("关闭数据库连接失败", exc_info=True)
return results
def dump_collection_results(
results: list[TableCollectionResult],
output_dir: Path,
) -> dict[str, Path]:
"""
将采集结果序列化为 JSON 文件落盘。
输出结构:
{output_dir}/
json_trees/{table}.json — 展开后的字段结构
db_schemas/ods_{table}.json — ODS 表结构
db_schemas/dwd_{table}.json — DWD 表结构(每张 DWD 表独立文件)
field_mappings/{table}.json — 三层字段映射JSON→ODS→DWD含锚点
collection_manifest.json — 采集清单(表名、记录数、时间戳)
返回 {类别: 目录路径} 的字典。
"""
# CHANGE 2026-02-21 | 清理旧子目录后重建,避免 Windows 文件锁导致写入失败
import shutil as _shutil, time as _time
_sub_dirs = ["json_trees", "db_schemas", "field_mappings"]
for _name in _sub_dirs:
_d = output_dir / _name
if _d.exists():
try:
_shutil.rmtree(_d)
except (PermissionError, OSError):
# Windows 文件锁:无法删除也无法遍历,跳过(后面用备选名)
pass
# Windows rmtree 后句柄可能未释放,等待后再 mkdir
_time.sleep(1)
def _ensure_writable_dir(base: Path, name: str) -> Path:
"""确保目录可写,如果被锁则用带后缀的备选名"""
d = base / name
for _attempt in range(3):
try:
d.mkdir(parents=True, exist_ok=True)
_test = d / ".write_test"
_test.write_text("ok", encoding="utf-8")
_test.unlink()
return d
except (FileNotFoundError, PermissionError, OSError):
_time.sleep(1)
# 旧目录不可用,用带后缀的新目录
d = base / f"{name}_new"
d.mkdir(parents=True, exist_ok=True)
print(f" [警告] {name}/ 被锁定,使用备选目录 {d.name}/")
return d
json_trees_dir = _ensure_writable_dir(output_dir, "json_trees")
db_schemas_dir = _ensure_writable_dir(output_dir, "db_schemas")
field_mappings_dir = _ensure_writable_dir(output_dir, "field_mappings")
# 解析 TABLE_MAP / FACT_MAPPINGS用于构建字段映射
table_map = parse_table_map()
fact_mappings_data = parse_fact_mappings()
# 收集所有 DWD 表结构(用于 build_field_mappings
all_dwd_cols: dict[str, list[ColumnInfo]] = {}
for r in results:
for dwd_short, cols in r.dwd_tables.items():
all_dwd_cols[dwd_short] = cols
# ── 逐表落盘 ──
for r in results:
# json_trees/{table}.json — 展开后的字段结构
tree_data = {
"table": r.table_name,
"total_records": r.record_count,
"fields": [
{**asdict(fi), "samples": fi.samples}
for fi in r.json_fields.values()
],
}
_write_json(json_trees_dir / f"{r.table_name}.json", tree_data)
# db_schemas/ods_{table}.json — ODS 表结构
ods_data = {
"schema": "ods",
"table": r.table_name,
"columns": [asdict(c) for c in r.ods_columns],
}
_write_json(db_schemas_dir / f"ods_{r.table_name}.json", ods_data)
# db_schemas/dwd_{dwd_short}.json — 每张 DWD 表独立文件
for dwd_short, cols in r.dwd_tables.items():
dwd_data = {
"schema": "dwd",
"table": dwd_short,
"ods_source": r.table_name,
"columns": [asdict(c) for c in cols],
}
_write_json(db_schemas_dir / f"dwd_{dwd_short}.json", dwd_data)
# field_mappings/{table}.json — 三层字段映射
if r.error is None:
mapping = build_field_mappings(r, table_map, fact_mappings_data, all_dwd_cols)
_write_json(field_mappings_dir / f"{r.table_name}.json", mapping)
# ── collection_manifest.json — 采集清单 ──
manifest = {
"timestamp": datetime.now().astimezone().isoformat(),
"table_map": table_map,
"tables": [
{
"table": r.table_name,
"task_code": r.task_code,
"description": r.description,
"record_count": r.record_count,
"json_field_count": len(r.json_fields),
"ods_column_count": len(r.ods_columns),
"dwd_tables": list(r.dwd_tables.keys()),
"dwd_column_count": sum(len(cols) for cols in r.dwd_tables.values()),
"error": r.error,
}
for r in results
],
}
_write_json(output_dir / "collection_manifest.json", manifest)
# ── BD_manual 业务描述 ──
dump_bd_descriptions(results, output_dir)
return {
"json_trees": json_trees_dir,
"db_schemas": db_schemas_dir,
"field_mappings": field_mappings_dir,
"bd_descriptions": output_dir / "bd_descriptions",
"manifest": output_dir,
}
def _write_json(path: Path, data: Any) -> None:
"""UTF-8 编码写入 JSON 文件ensure_ascii=Falseindent=2。"""
content = json.dumps(data, ensure_ascii=False, indent=2, default=str)
try:
path.write_text(content, encoding="utf-8")
except PermissionError:
# CHANGE 2026-02-21 | Windows 文件锁重试:先删再写
import time
time.sleep(1)
try:
path.unlink(missing_ok=True)
except PermissionError:
pass
path.write_text(content, encoding="utf-8")
# ══════════════════════════════════════════════════════════════════
# ODS 任务规格(从 gen_full_dataflow_doc.py 迁移)
# ══════════════════════════════════════════════════════════════════
# 格式: code, table, endpoint, data_path, list_key, time_fields,
# requires_window, extra_params, description
# 注意: ODS_STORE_GOODS 的 extra_params 包含 {"siteId": ["__STORE_ID__"]}
# 在 fetch_records 中根据 config.store_id 动态替换。
ODS_SPECS: list[dict] = [
{
"code": "ODS_ASSISTANT_ACCOUNT",
"table": "assistant_accounts_master",
"dwd_table": "dim_assistant",
"endpoint": "/PersonnelManagement/SearchAssistantInfo",
"data_path": ("data",),
"list_key": "assistantInfos",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "助教账号档案",
},
{
"code": "ODS_SETTLEMENT_RECORDS",
"table": "settlement_records",
"dwd_table": "dwd_settlement_head",
"endpoint": "/Site/GetAllOrderSettleList",
"data_path": ("data",),
"list_key": "settleList",
"time_fields": ("rangeStartTime", "rangeEndTime"),
"requires_window": True,
"extra_params": {},
"description": "结账记录",
},
{
"code": "ODS_TABLE_USE",
"table": "table_fee_transactions",
"dwd_table": "dwd_table_fee_log",
"endpoint": "/Site/GetSiteTableOrderDetails",
"data_path": ("data",),
"list_key": "siteTableUseDetailsList",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "台费计费流水",
},
{
"code": "ODS_ASSISTANT_LEDGER",
"table": "assistant_service_records",
"dwd_table": "dwd_assistant_service_log",
"endpoint": "/AssistantPerformance/GetOrderAssistantDetails",
"data_path": ("data",),
"list_key": "orderAssistantDetails",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "助教服务流水",
},
{
"code": "ODS_STORE_GOODS_SALES",
"table": "store_goods_sales_records",
"dwd_table": "dwd_store_goods_sale",
"endpoint": "/TenantGoods/GetGoodsSalesList",
"data_path": ("data",),
"list_key": "orderGoodsLedgers",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "门店商品销售流水",
},
{
"code": "ODS_PAYMENT",
"table": "payment_transactions",
"dwd_table": "dwd_payment",
"endpoint": "/PayLog/GetPayLogListPage",
"data_path": ("data",),
"list_key": None,
"time_fields": ("StartPayTime", "EndPayTime"),
"requires_window": False,
"extra_params": {},
"description": "支付流水",
},
{
"code": "ODS_REFUND",
"table": "refund_transactions",
"dwd_table": "dwd_refund",
"endpoint": "/Order/GetRefundPayLogList",
"data_path": ("data",),
"list_key": None,
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "退款流水",
},
{
"code": "ODS_PLATFORM_COUPON",
"table": "platform_coupon_redemption_records",
"dwd_table": "dwd_platform_coupon_redemption",
"endpoint": "/Promotion/GetOfflineCouponConsumePageList",
"data_path": ("data",),
"list_key": None,
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "平台/团购券核销",
},
{
"code": "ODS_MEMBER",
"table": "member_profiles",
"dwd_table": "dim_member",
"endpoint": "/MemberProfile/GetTenantMemberList",
"data_path": ("data",),
"list_key": "tenantMemberInfos",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "会员档案",
},
{
"code": "ODS_MEMBER_CARD",
"table": "member_stored_value_cards",
"dwd_table": "dim_member_card_account",
"endpoint": "/MemberProfile/GetTenantMemberCardList",
"data_path": ("data",),
"list_key": "tenantMemberCards",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "会员储值卡",
},
{
"code": "ODS_MEMBER_BALANCE",
"table": "member_balance_changes",
"dwd_table": "dwd_member_balance_change",
"endpoint": "/MemberProfile/GetMemberCardBalanceChange",
"data_path": ("data",),
"list_key": "tenantMemberCardLogs",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "会员余额变动",
},
{
"code": "ODS_RECHARGE_SETTLE",
"table": "recharge_settlements",
"dwd_table": "dwd_recharge_order",
"endpoint": "/Site/GetRechargeSettleList",
"data_path": ("data",),
"list_key": "settleList",
"time_fields": ("rangeStartTime", "rangeEndTime"),
"requires_window": True,
"extra_params": {},
"description": "充值结算",
},
{
"code": "ODS_GROUP_PACKAGE",
"table": "group_buy_packages",
"dwd_table": "dim_groupbuy_package",
"endpoint": "/PackageCoupon/QueryPackageCouponList",
"data_path": ("data",),
"list_key": "packageCouponList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "团购套餐定义",
},
{
"code": "ODS_GROUP_BUY_REDEMPTION",
"table": "group_buy_redemption_records",
"dwd_table": "dwd_groupbuy_redemption",
"endpoint": "/Site/GetSiteTableUseDetails",
"data_path": ("data",),
"list_key": "siteTableUseDetailsList",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "团购套餐核销",
},
{
"code": "ODS_INVENTORY_STOCK",
"table": "goods_stock_summary",
"dwd_table": None,
"endpoint": "/TenantGoods/GetGoodsStockReport",
"data_path": ("data",),
"list_key": None,
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "库存汇总",
},
{
"code": "ODS_INVENTORY_CHANGE",
"table": "goods_stock_movements",
"dwd_table": None,
"endpoint": "/GoodsStockManage/QueryGoodsOutboundReceipt",
"data_path": ("data",),
"list_key": "queryDeliveryRecordsList",
"time_fields": ("startTime", "endTime"),
"requires_window": True,
"extra_params": {},
"description": "库存变化记录",
},
{
"code": "ODS_TABLES",
"table": "site_tables_master",
"dwd_table": "dim_table",
"endpoint": "/Table/GetSiteTables",
"data_path": ("data",),
"list_key": "siteTables",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "台桌维表",
},
{
"code": "ODS_GOODS_CATEGORY",
"table": "stock_goods_category_tree",
"dwd_table": "dim_goods_category",
"endpoint": "/TenantGoodsCategory/QueryPrimarySecondaryCategory",
"data_path": ("data",),
"list_key": "goodsCategoryList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "库存商品分类树",
},
{
"code": "ODS_STORE_GOODS",
"table": "store_goods_master",
"dwd_table": "dim_store_goods",
"endpoint": "/TenantGoods/GetGoodsInventoryList",
"data_path": ("data",),
"list_key": "orderGoodsList",
"time_fields": None,
"requires_window": False,
# STORE_ID 占位符,在 fetch_records 中动态替换为 config.store_id
"extra_params": {"siteId": ["__STORE_ID__"]},
"description": "门店商品档案",
},
{
"code": "ODS_TABLE_FEE_DISCOUNT",
"table": "table_fee_discount_records",
"dwd_table": "dwd_table_fee_adjust",
"endpoint": "/Site/GetTaiFeeAdjustList",
"data_path": ("data",),
"list_key": "taiFeeAdjustInfos",
"time_fields": ("startTime", "endTime"),
"requires_window": False,
"extra_params": {},
"description": "台费折扣/调账",
},
{
"code": "ODS_TENANT_GOODS",
"table": "tenant_goods_master",
"dwd_table": "dim_tenant_goods",
"endpoint": "/TenantGoods/QueryTenantGoods",
"data_path": ("data",),
"list_key": "tenantGoodsList",
"time_fields": None,
"requires_window": False,
"extra_params": {},
"description": "租户商品档案",
},
]
# 默认 list_key 候选(与 APIClient 一致)
DEFAULT_LIST_KEYS: tuple[str, ...] = (
"list", "rows", "records", "items", "dataList", "data_list",
"tenantMemberInfos", "tenantMemberCardLogs", "tenantMemberCards",
"settleList", "orderAssistantDetails", "assistantInfos", "siteTables",
"taiFeeAdjustInfos", "siteTableUseDetailsList", "tenantGoodsList",
"packageCouponList", "queryDeliveryRecordsList", "goodsCategoryList",
"orderGoodsList", "orderGoodsLedgers",
)
# ══════════════════════════════════════════════════════════════════
# API 调用(从 gen_full_dataflow_doc.py 迁移,适配 AnalyzerConfig
# ══════════════════════════════════════════════════════════════════
def _build_headers(config: AnalyzerConfig) -> dict[str, str]:
"""根据 config 构造浏览器风格请求头。"""
return {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json",
"Origin": "https://pc.ficoo.vip",
"Referer": "https://pc.ficoo.vip/",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36"
),
"Authorization": f"Bearer {config.api_token}" if config.api_token else "",
}
def api_post(endpoint: str, payload: dict, config: AnalyzerConfig) -> dict:
"""发送 POST 请求到 API。"""
import requests
url = f"{config.api_base.rstrip('/')}/{endpoint.lstrip('/')}"
headers = _build_headers(config)
resp = requests.post(url, json=payload, headers=headers, timeout=20)
resp.raise_for_status()
data = resp.json()
code = data.get("code")
if code not in (0, "0", None):
msg = data.get("msg") or data.get("message") or ""
raise ValueError(f"API 错误 code={code} msg={msg} endpoint={endpoint}")
return data
def extract_list(payload: dict, data_path: tuple, list_key: str | None) -> list:
"""从 API 响应中提取记录列表。"""
cur = payload
for key in data_path:
if isinstance(cur, dict):
cur = cur.get(key)
else:
cur = None
if cur is None:
break
if isinstance(cur, list):
return cur
if isinstance(cur, dict):
if list_key and isinstance(cur.get(list_key), list):
return cur[list_key]
for k in DEFAULT_LIST_KEYS:
if isinstance(cur.get(k), list):
return cur[k]
for v in cur.values():
if isinstance(v, list):
return v
return []
def _resolve_extra_params(extra_params: dict, config: AnalyzerConfig) -> dict:
"""将 extra_params 中的 __STORE_ID__ 占位符替换为 config.store_id。"""
if not extra_params:
return extra_params
resolved = {}
for k, v in extra_params.items():
if isinstance(v, list):
resolved[k] = [
config.store_id if item == "__STORE_ID__" else item
for item in v
]
elif v == "__STORE_ID__":
resolved[k] = config.store_id
else:
resolved[k] = v
return resolved
def fetch_records(spec: dict, config: AnalyzerConfig) -> list[dict]:
"""
获取 API 记录。
- 有时间字段的表:从今天往回 10 天一批,不够则继续扩展,最多 10 次重试
- 无时间字段的表:单次请求
参数:
spec: ODS_SPECS 中的单项配置
config: AnalyzerConfig提供 api_base/api_token/store_id/limit
"""
from datetime import timedelta
from zoneinfo import ZoneInfo
endpoint = spec["endpoint"]
data_path = spec["data_path"]
list_key = spec["list_key"]
time_fields = spec["time_fields"]
extra_params = _resolve_extra_params(spec.get("extra_params", {}), config)
target_count = config.limit
tz = ZoneInfo("Asia/Shanghai")
all_records: list[dict] = []
if time_fields:
# 有时间窗口:从今天往回扩展
start_key, end_key = time_fields
now = datetime.now(tz)
end_dt = now
batch_days = 10
max_retries = 10
for attempt in range(max_retries):
start_dt = end_dt - timedelta(days=batch_days)
params = {
"siteId": config.store_id,
"page": 1,
"limit": target_count,
start_key: start_dt.strftime("%Y-%m-%d %H:%M:%S"),
end_key: end_dt.strftime("%Y-%m-%d %H:%M:%S"),
**extra_params,
}
try:
resp = api_post(endpoint, params, config)
records = extract_list(resp, data_path, list_key)
all_records.extend(records)
except Exception as e:
logger.warning(
"API 请求失败 %s attempt=%d: %s", endpoint, attempt + 1, e
)
if len(all_records) >= target_count:
break
# 继续往前扩展
end_dt = start_dt
else:
# 无时间窗口:单次请求
params = {
"siteId": config.store_id,
"page": 1,
"limit": target_count,
**extra_params,
}
try:
resp = api_post(endpoint, params, config)
all_records = extract_list(resp, data_path, list_key)
except Exception as e:
logger.warning("API 请求失败 %s: %s", endpoint, e)
return all_records[:target_count]
# ══════════════════════════════════════════════════════════════════
# ETL 源码解析TABLE_MAP / FACT_MAPPINGS
# ══════════════════════════════════════════════════════════════════
import re
# DWD 加载任务源码的默认路径(使用绝对路径,避免 cwd 不在项目根时找不到)
# CHANGE 2026-02-21 | 相对路径 → 绝对路径,与 _env_paths 同源
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
_DWD_TASK_PY = _PROJECT_ROOT / "apps" / "etl" / "connectors" / "feiqiu" / "tasks" / "dwd" / "dwd_load_task.py"
def parse_table_map(py_path: Path | None = None) -> dict[str, str]:
"""
从 dwd_load_task.py 解析 TABLE_MAP: {dwd_table -> ods_table}。
返回如 {"dwd.dim_assistant": "ods.assistant_accounts_master", ...}
"""
py_path = py_path or _DWD_TASK_PY
if not py_path.exists():
logger.warning("TABLE_MAP 源文件不存在: %s", py_path)
return {}
text = py_path.read_text(encoding="utf-8")
match = re.search(
r"TABLE_MAP\s*(?::\s*dict\[.*?\])?\s*=\s*\{(.*?)\}",
text, re.DOTALL,
)
if not match:
return {}
body = match.group(1)
result = {}
for m in re.finditer(r'"([^"]+)"\s*:\s*"([^"]+)"', body):
result[m.group(1)] = m.group(2)
return result
def parse_fact_mappings(py_path: Path | None = None) -> dict[str, list[tuple]]:
"""
从 dwd_load_task.py 解析 FACT_MAPPINGS: {dwd_table -> [(dwd_col, ods_expr, cast), ...]}。
显式映射字段重命名、JSONB 提取、CAST 转换等)。
"""
py_path = py_path or _DWD_TASK_PY
if not py_path.exists():
logger.warning("FACT_MAPPINGS 源文件不存在: %s", py_path)
return {}
text = py_path.read_text(encoding="utf-8")
start = text.find("FACT_MAPPINGS")
if start < 0:
return {}
brace_start = text.find("{", start)
if brace_start < 0:
return {}
depth = 0
end = brace_start
for i in range(brace_start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
end = i + 1
break
block = text[brace_start:end]
result: dict[str, list[tuple]] = {}
table_pattern = re.compile(r'"([^"]+)"\s*:\s*\[', re.DOTALL)
for tm in table_pattern.finditer(block):
table_name = tm.group(1)
list_start = tm.end()
bracket_depth = 1
list_end = list_start
for i in range(list_start, len(block)):
if block[i] == "[":
bracket_depth += 1
elif block[i] == "]":
bracket_depth -= 1
if bracket_depth == 0:
list_end = i
break
list_body = block[list_start:list_end]
tuples = []
tuple_pattern = re.compile(
r'\(\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*(?:"([^"]+)"|None)\s*\)'
)
for tp in tuple_pattern.finditer(list_body):
tuples.append((tp.group(1), tp.group(2), tp.group(3)))
result[table_name] = tuples
return result
# ══════════════════════════════════════════════════════════════════
# BD_manual 文档解析:提取字段级业务描述
# ══════════════════════════════════════════════════════════════════
# BD_manual 文档根目录(使用绝对路径,与 _DWD_TASK_PY 同源)
# CHANGE 2026-02-21 | 相对路径 → 绝对路径,避免 cwd 不在项目根时找不到
_BD_DOCS_ROOT = _PROJECT_ROOT / "apps" / "etl" / "connectors" / "feiqiu" / "docs" / "database"
def parse_bd_manual_fields(doc_path: Path) -> dict[str, str]:
"""
从 BD_manual Markdown 文档中解析字段说明表格。
返回 {字段名(小写) -> 说明文本}。
支持 ODS/main/ 和 DWD/main/ 下的 BD_manual_*.md 格式。
"""
if not doc_path.exists():
return {}
text = doc_path.read_text(encoding="utf-8")
result: dict[str, str] = {}
# 查找 "## 字段说明" 后的表格
in_table = False
header_found = False
desc_col_idx = -1
name_col_idx = -1
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("## 字段说明"):
in_table = True
continue
if in_table and stripped.startswith("##"):
# 遇到下一个 section停止
break
if not in_table:
continue
if not stripped.startswith("|"):
continue
cols = [c.strip() for c in stripped.split("|")]
# 去掉首尾空元素(因为 | 开头和结尾)
cols = cols[1:-1] if len(cols) > 2 else cols
if not header_found:
# 查找表头行
for i, c in enumerate(cols):
if "字段名" in c or "字段" == c:
name_col_idx = i
if "说明" in c:
desc_col_idx = i
if name_col_idx >= 0 and desc_col_idx >= 0:
header_found = True
continue
# 跳过分隔行
if all(c.replace("-", "").replace(":", "").strip() == "" for c in cols):
continue
if name_col_idx < len(cols) and desc_col_idx < len(cols):
field_name = cols[name_col_idx].strip().strip("`")
desc = cols[desc_col_idx].strip()
if field_name and desc:
result[field_name.lower()] = desc
return result
def load_bd_descriptions(table_name: str) -> dict[str, dict[str, str]]:
"""
加载指定 ODS 表及其关联 DWD 表的 BD_manual 业务描述。
返回:
{
"ods": {字段名 -> 说明},
"dwd": {
"dim_assistant": {字段名 -> 说明},
...
}
}
"""
result: dict[str, dict[str, str]] = {"ods": {}, "dwd": {}}
# ODS BD_manual
ods_doc = _BD_DOCS_ROOT / "ODS" / "main" / f"BD_manual_{table_name}.md"
result["ods"] = parse_bd_manual_fields(ods_doc)
# DWD BD_manual — 需要通过 TABLE_MAP 找到关联的 DWD 表
dwd_dir = _BD_DOCS_ROOT / "DWD" / "main"
if dwd_dir.exists():
for f in sorted(dwd_dir.iterdir()):
if f.name.startswith("BD_manual_") and f.suffix == ".md":
dwd_short = f.stem.replace("BD_manual_", "")
result["dwd"][dwd_short] = parse_bd_manual_fields(f)
return result
def dump_bd_descriptions(
results: list[TableCollectionResult],
output_dir: Path,
) -> None:
"""
为每张 ODS 表解析 BD_manual 文档,输出 bd_descriptions/{table}.json。
结构:
{
"ods_table": "assistant_accounts_master",
"ods_fields": {"id": "助教账号主键 ID...", ...},
"dwd_fields": {
"dim_assistant": {"assistant_id": "助教唯一标识 ID", ...},
...
}
}
"""
bd_dir = output_dir / "bd_descriptions"
bd_dir.mkdir(parents=True, exist_ok=True)
for r in results:
descs = load_bd_descriptions(r.table_name)
data = {
"ods_table": r.table_name,
"ods_fields": descs["ods"],
"dwd_fields": {
dwd_short: descs["dwd"].get(dwd_short, {})
for dwd_short in r.dwd_tables.keys()
},
}
_write_json(bd_dir / f"{r.table_name}.json", data)
# ══════════════════════════════════════════════════════════════════
# 三层字段映射构建JSON → ODS → DWD含锚点 ID
# ══════════════════════════════════════════════════════════════════
SCD2_COLS = {"scd2_start_time", "scd2_end_time", "scd2_is_current", "scd2_version"}
def build_field_mappings(
result: TableCollectionResult,
table_map: dict[str, str],
fact_mappings: dict[str, list[tuple]],
all_dwd_cols: dict[str, list[ColumnInfo]],
) -> dict:
"""
为单张 ODS 表构建完整的三层字段映射关系。
返回结构:
{
"ods_table": "assistant_accounts_master",
"anchors": {
"api": "api-assistant-accounts-master",
"ods": "ods-assistant-accounts-master",
"dwd": {"dim_assistant": "dwd-dim-assistant", ...}
},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact", ...},
...
],
"ods_to_dwd": {
"id": [
{"dwd_table": "dim_assistant", "dwd_col": "assistant_id", "cast": null, "note": "字段重命名"},
...
],
...
},
"dwd_to_ods": {
"dim_assistant": [
{"dwd_col": "assistant_id", "type": "BIGINT", "ods_source": "id", "mapping_type": "直接", "note": "字段重命名"},
...
],
...
}
}
"""
ods_table = result.table_name
full_ods = f"ods.{ods_table}"
# 锚点 ID 生成(与旧文档格式一致)
anchor_base = ods_table.replace("_", "-")
anchors = {
"api": f"api-{anchor_base}",
"ods": f"ods-{anchor_base}",
"dwd": {},
}
# 找到所有映射到此 ODS 表的 DWD 表
dwd_tables_for_ods = sorted(
[dwd_t for dwd_t, ods_t in table_map.items() if ods_t == full_ods]
)
for dwd_full in dwd_tables_for_ods:
dwd_short = dwd_full.split(".")[-1] if "." in dwd_full else dwd_full
anchors["dwd"][dwd_short] = f"dwd-{dwd_short.replace('_', '-')}"
# ── JSON → ODS 映射 ──
ods_col_set = {c.name.lower() for c in result.ods_columns}
json_to_ods = []
for path, fi in result.json_fields.items():
# 叶子字段名(去掉嵌套前缀和 []
leaf = path.split(".")[-1].replace("[]", "")
leaf_lower = leaf.lower()
if leaf_lower in ods_col_set:
match_type = "exact" if leaf in ods_col_set else "case_insensitive"
json_to_ods.append({
"json_path": path,
"ods_col": leaf_lower,
"match_type": match_type,
"json_type": fi.json_type,
"occurrence_pct": round(fi.occurrence / fi.total_records * 100, 1) if fi.total_records > 0 else 0,
})
else:
json_to_ods.append({
"json_path": path,
"ods_col": None,
"match_type": "unmapped",
"json_type": fi.json_type,
"occurrence_pct": round(fi.occurrence / fi.total_records * 100, 1) if fi.total_records > 0 else 0,
})
# ── ODS → DWD 映射(按 ODS 列聚合所有下游 DWD 列) ──
ods_to_dwd: dict[str, list[dict]] = {}
dwd_to_ods: dict[str, list[dict]] = {}
for dwd_full in dwd_tables_for_ods:
dwd_short = dwd_full.split(".")[-1] if "." in dwd_full else dwd_full
dwd_cols = all_dwd_cols.get(dwd_short, [])
if not dwd_cols:
continue
mappings_list = fact_mappings.get(dwd_full, [])
mapping_dict = {m[0].lower(): (m[1], m[2]) for m in mappings_list}
is_dim = "dim_" in dwd_short
is_ex = dwd_short.endswith("_ex")
table_type = "维度" if is_dim else "事实"
if is_ex:
table_type += "(扩展)"
dwd_to_ods[dwd_short] = []
for c in dwd_cols:
col_lower = c.name.lower()
if col_lower in SCD2_COLS:
dwd_to_ods[dwd_short].append({
"dwd_col": c.name, "type": c.data_type,
"ods_source": "", "mapping_type": "SCD2",
"note": "SCD2 元数据",
})
continue
if col_lower in mapping_dict:
ods_expr, cast = mapping_dict[col_lower]
note = ""
if "->>" in ods_expr:
note = "JSONB 提取"
elif "CASE" in ods_expr.upper():
note = "派生计算"
elif ods_expr.lower() != col_lower:
note = "字段重命名"
dwd_to_ods[dwd_short].append({
"dwd_col": c.name, "type": c.data_type,
"ods_source": ods_expr, "mapping_type": f"{cast}" if cast else "直接",
"note": note,
})
# 反向ODS 列 → DWD 列
ods_key = ods_expr.lower() if "->>" not in ods_expr and "CASE" not in ods_expr.upper() else None
if ods_key and ods_key in ods_col_set:
ods_to_dwd.setdefault(ods_key, []).append({
"dwd_table": dwd_short, "dwd_col": c.name,
"cast": cast, "note": note,
})
elif col_lower in ods_col_set:
dwd_to_ods[dwd_short].append({
"dwd_col": c.name, "type": c.data_type,
"ods_source": c.name, "mapping_type": "直接",
"note": "同名直传",
})
ods_to_dwd.setdefault(col_lower, []).append({
"dwd_table": dwd_short, "dwd_col": c.name,
"cast": None, "note": "同名直传",
})
else:
dwd_to_ods[dwd_short].append({
"dwd_col": c.name, "type": c.data_type,
"ods_source": "", "mapping_type": "",
"note": "未显式映射",
})
return {
"ods_table": ods_table,
"anchors": anchors,
"json_to_ods": json_to_ods,
"ods_to_dwd": ods_to_dwd,
"dwd_to_ods": dwd_to_ods,
}