213 lines
8.3 KiB
Python
213 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
扫描 EXPORT_ROOT 下所有 ODS JSON 文件,按 order_trade_no 聚合,
|
||
计算每个总订单的复杂度并输出 Top 10。
|
||
|
||
复杂度维度:
|
||
- 子台桌使用记录数(table_fee_transactions)
|
||
- 台费折扣记录数(table_fee_discount_records)
|
||
- 助教服务记录数(assistant_service_records)
|
||
- 商品销售记录数(store_goods_sales_records)
|
||
- 团购核销记录数(group_buy_redemption_records)
|
||
- 支付记录数(payment_transactions,通过 relate_id 关联)
|
||
- 退款记录数(refund_transactions,通过 relate_id 关联)
|
||
|
||
总复杂度 = 各维度记录数之和
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import sys
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||
from _env_paths import get_output_path
|
||
|
||
|
||
def load_records_from_task_dirs(json_root: Path, dir_prefix: str, file_stem: str) -> list[dict]:
|
||
"""从 ODS 任务目录中加载所有记录(取最新 run)。"""
|
||
records = []
|
||
for task_dir in sorted(json_root.iterdir()):
|
||
if not task_dir.is_dir() or not task_dir.name.startswith(dir_prefix):
|
||
continue
|
||
for run_dir in sorted(task_dir.iterdir()):
|
||
if not run_dir.is_dir():
|
||
continue
|
||
for f in run_dir.iterdir():
|
||
if f.stem.startswith(file_stem) and f.suffix == ".json":
|
||
records.extend(_extract_records(f))
|
||
return records
|
||
|
||
|
||
def load_archive_records(json_root: Path, file_stem: str) -> list[dict]:
|
||
"""从 ODS_JSON_ARCHIVE 目录加载分页记录。"""
|
||
records = []
|
||
archive_dir = json_root / "ODS_JSON_ARCHIVE"
|
||
if not archive_dir.exists():
|
||
return records
|
||
for run_dir in archive_dir.iterdir():
|
||
if not run_dir.is_dir():
|
||
continue
|
||
for f in run_dir.iterdir():
|
||
if f.stem.startswith(file_stem) and f.suffix == ".json":
|
||
records.extend(_extract_archive_records(f))
|
||
return records
|
||
|
||
|
||
def _extract_records(filepath: Path) -> list[dict]:
|
||
"""从标准 ODS JSON(含 pages[].response.data)中提取记录。"""
|
||
try:
|
||
data = json.loads(filepath.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return []
|
||
items = []
|
||
for page in data.get("pages", []):
|
||
resp_data = page.get("response", {}).get("data", {})
|
||
# 不同 endpoint 的列表字段名不同,遍历所有 list 类型值
|
||
for v in resp_data.values():
|
||
if isinstance(v, list):
|
||
items.extend(v)
|
||
return items
|
||
|
||
|
||
def _extract_archive_records(filepath: Path) -> list[dict]:
|
||
"""从 archive 分页 JSON({code, data: [...]}) 中提取记录。"""
|
||
try:
|
||
data = json.loads(filepath.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return []
|
||
payload = data.get("data", [])
|
||
return payload if isinstance(payload, list) else []
|
||
|
||
|
||
def main():
|
||
json_root = get_output_path("EXPORT_ROOT")
|
||
|
||
# --- 1. 加载各类子记录 ---
|
||
# 数据源配置:(目录前缀, 文件名前缀, 关联字段, 维度名称)
|
||
sources = [
|
||
("ODS_TABLE_USE", "table_fee_transactions", "order_trade_no", "台桌使用"),
|
||
("ODS_TABLE_FEE_DISCOUNT", "table_fee_discount_records", "order_trade_no", "台费折扣"),
|
||
("ODS_ASSISTANT_LEDGER", "assistant_service_records", "order_trade_no", "助教服务"),
|
||
("ODS_STORE_GOODS_SALES", "store_goods_sales_records", "order_trade_no", "商品销售"),
|
||
("ODS_GROUP_BUY_REDEMPTION","group_buy_redemption_records", "order_trade_no", "团购核销"),
|
||
]
|
||
# 支付/退款通过 relate_id 关联到 order_settle_id,需要二次映射
|
||
payment_sources = [
|
||
("ODS_PAYMENT", "payment_transactions", "支付记录"),
|
||
]
|
||
refund_source = ("ODS_REFUND", "refund_transactions", "退款记录")
|
||
|
||
# order_trade_no → {维度名: 计数}
|
||
order_complexity: dict[int, dict[str, int]] = defaultdict(lambda: defaultdict(int))
|
||
# order_trade_no → 首条记录的基本信息(用于展示)
|
||
order_info: dict[int, dict] = {}
|
||
# order_settle_id → order_trade_no 的映射(从台桌使用记录建立)
|
||
settle_to_trade: dict[int, int] = {}
|
||
|
||
# 加载直接关联的子记录
|
||
for dir_prefix, file_stem, key_field, dim_name in sources:
|
||
recs = load_records_from_task_dirs(json_root, dir_prefix, file_stem)
|
||
recs += load_archive_records(json_root, file_stem)
|
||
seen_ids = set()
|
||
for r in recs:
|
||
trade_no = r.get(key_field)
|
||
if not trade_no or trade_no == 0:
|
||
continue
|
||
# 去重(同一记录可能出现在多个 run 中)
|
||
rec_id = r.get("id", id(r))
|
||
if rec_id in seen_ids:
|
||
continue
|
||
seen_ids.add(rec_id)
|
||
|
||
order_complexity[trade_no][dim_name] += 1
|
||
|
||
# 保存订单基本信息
|
||
if trade_no not in order_info:
|
||
order_info[trade_no] = {
|
||
"order_trade_no": trade_no,
|
||
"create_time": r.get("create_time", ""),
|
||
"ledger_name": r.get("ledger_name", r.get("tableName", "")),
|
||
}
|
||
|
||
# 建立 settle_id → trade_no 映射
|
||
settle_id = r.get("order_settle_id")
|
||
if settle_id and settle_id != 0:
|
||
settle_to_trade[settle_id] = trade_no
|
||
|
||
# 加载支付记录(通过 relate_id → order_settle_id → order_trade_no)
|
||
for dir_prefix, file_stem, dim_name in payment_sources:
|
||
recs = load_records_from_task_dirs(json_root, dir_prefix, file_stem)
|
||
recs += load_archive_records(json_root, file_stem)
|
||
seen_ids = set()
|
||
for r in recs:
|
||
rec_id = r.get("id", id(r))
|
||
if rec_id in seen_ids:
|
||
continue
|
||
seen_ids.add(rec_id)
|
||
relate_id = r.get("relate_id")
|
||
if not relate_id or relate_id == 0:
|
||
continue
|
||
trade_no = settle_to_trade.get(relate_id)
|
||
if trade_no:
|
||
order_complexity[trade_no]["支付记录"] += 1
|
||
|
||
# 加载退款记录
|
||
dir_prefix, file_stem, dim_name = refund_source
|
||
recs = load_records_from_task_dirs(json_root, dir_prefix, file_stem)
|
||
recs += load_archive_records(json_root, file_stem)
|
||
seen_ids = set()
|
||
for r in recs:
|
||
rec_id = r.get("id", id(r))
|
||
if rec_id in seen_ids:
|
||
continue
|
||
seen_ids.add(rec_id)
|
||
relate_id = r.get("relate_id")
|
||
if not relate_id or relate_id == 0:
|
||
continue
|
||
trade_no = settle_to_trade.get(relate_id)
|
||
if trade_no:
|
||
order_complexity[trade_no]["退款记录"] += 1
|
||
|
||
# --- 2. 计算总复杂度并排序 ---
|
||
all_dims = ["台桌使用", "台费折扣", "助教服务", "商品销售", "团购核销", "支付记录", "退款记录"]
|
||
scored = []
|
||
for trade_no, dims in order_complexity.items():
|
||
total = sum(dims.values())
|
||
# 额外加权:涉及的维度种类数(鼓励"广度"复杂)
|
||
breadth = sum(1 for d in all_dims if dims.get(d, 0) > 0)
|
||
score = total + breadth * 2
|
||
scored.append((trade_no, score, total, breadth, dims))
|
||
|
||
scored.sort(key=lambda x: x[1], reverse=True)
|
||
top10 = scored[:10]
|
||
|
||
# --- 3. 输出结果 ---
|
||
print("=" * 100)
|
||
print(f" 订单复杂度 Top 10(共扫描 {len(order_complexity)} 个总订单)")
|
||
print("=" * 100)
|
||
for rank, (trade_no, score, total, breadth, dims) in enumerate(top10, 1):
|
||
info = order_info.get(trade_no, {})
|
||
print(f"\n{'─' * 80}")
|
||
print(f" #{rank} order_trade_no = {trade_no}")
|
||
print(f" 创建时间: {info.get('create_time', '未知')}")
|
||
print(f" 复杂度得分: {score} (子记录总数={total}, 涉及维度={breadth})")
|
||
print(f" 各维度明细:")
|
||
for d in all_dims:
|
||
cnt = dims.get(d, 0)
|
||
if cnt > 0:
|
||
bar = "█" * min(cnt, 40)
|
||
print(f" {d:8s}: {cnt:4d} {bar}")
|
||
print(f"\n{'─' * 80}")
|
||
print(f"\n统计摘要:")
|
||
print(f" 总订单数: {len(order_complexity)}")
|
||
if scored:
|
||
avg_score = sum(s[1] for s in scored) / len(scored)
|
||
print(f" 平均复杂度得分: {avg_score:.1f}")
|
||
print(f" 最高复杂度得分: {scored[0][1]}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|