Files
Neo-ZQYY/scripts/ops/find_complex_orders_v2.py

240 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
按 order_settle_id结算单聚合找出多台桌、多助教的复杂订单。
order_settle_id 是一次结算的唯一标识,一次结算可包含:
- 多个台桌使用记录(不同 order_trade_no
- 多个助教服务记录
- 多条台费折扣
- 多条团购核销
- 多笔支付/退款
"""
from __future__ import annotations
import json
import sys
from collections import defaultdict
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _env_paths import get_output_path
def _extract_records(filepath: Path) -> list[dict]:
try:
data = json.loads(filepath.read_text(encoding="utf-8"))
except Exception:
return []
items = []
for page in data.get("pages", []):
resp_data = page.get("response", {}).get("data", {})
for v in resp_data.values():
if isinstance(v, list):
items.extend(v)
return items
def _extract_archive_records(filepath: Path) -> list[dict]:
try:
data = json.loads(filepath.read_text(encoding="utf-8"))
except Exception:
return []
payload = data.get("data", [])
return payload if isinstance(payload, list) else []
def load_all(json_root: Path, dir_prefix: str, file_stem: str) -> list[dict]:
records = []
for task_dir in sorted(json_root.iterdir()):
if not task_dir.is_dir():
continue
if task_dir.name.startswith(dir_prefix) or task_dir.name == "ODS_JSON_ARCHIVE":
for run_dir in task_dir.iterdir():
if not run_dir.is_dir():
continue
for f in run_dir.iterdir():
if f.stem.startswith(file_stem) and f.suffix == ".json":
if task_dir.name == "ODS_JSON_ARCHIVE":
records.extend(_extract_archive_records(f))
else:
records.extend(_extract_records(f))
return records
def dedup(records: list[dict]) -> list[dict]:
"""按 id 去重,保留首次出现的记录。"""
seen = set()
out = []
for r in records:
rid = r.get("id")
if rid and rid in seen:
continue
if rid:
seen.add(rid)
out.append(r)
return out
def main():
json_root = get_output_path("EXPORT_ROOT")
# 加载台桌使用记录
table_use = dedup(load_all(json_root, "ODS_TABLE_USE", "table_fee_transactions"))
# 加载助教服务记录
assistant = dedup(load_all(json_root, "ODS_ASSISTANT_LEDGER", "assistant_service_records"))
# 加载台费折扣
discount = dedup(load_all(json_root, "ODS_TABLE_FEE_DISCOUNT", "table_fee_discount_records"))
# 加载团购核销
groupbuy = dedup(load_all(json_root, "ODS_GROUP_BUY_REDEMPTION", "group_buy_redemption_records"))
# 加载支付
payments = dedup(load_all(json_root, "ODS_PAYMENT", "payment_transactions"))
# 加载退款
refunds = dedup(load_all(json_root, "ODS_REFUND", "refund_transactions"))
# --- 按 order_settle_id 聚合 ---
# settle_id → 各维度详情
settle_data: dict[int, dict] = defaultdict(lambda: {
"台桌": [], # 不同 order_trade_no 的台桌名
"台桌记录": [],
"助教": [], # 不同助教名
"助教记录": [],
"台费折扣": 0,
"团购核销": 0,
"支付": 0,
"退款": 0,
"create_time": "",
"trade_nos": set(),
})
# 台桌使用 → 按 order_settle_id 聚合
for r in table_use:
sid = r.get("order_settle_id")
if not sid or sid == 0:
continue
d = settle_data[sid]
tno = r.get("order_trade_no", 0)
tname = r.get("ledger_name", "?")
if tno not in d["trade_nos"]:
d["trade_nos"].add(tno)
d["台桌"].append(tname)
d["台桌记录"].append(r)
ct = r.get("create_time", "")
if ct and (not d["create_time"] or ct < d["create_time"]):
d["create_time"] = ct
# 助教服务 → 按 order_settle_id 聚合
for r in assistant:
sid = r.get("order_settle_id")
if not sid or sid == 0:
continue
d = settle_data[sid]
aname = r.get("assistantName", r.get("ledger_name", "?"))
d["助教"].append(aname)
d["助教记录"].append(r)
# 台费折扣
for r in discount:
sid = r.get("order_settle_id")
if sid and sid != 0:
settle_data[sid]["台费折扣"] += 1
# 团购核销
for r in groupbuy:
sid = r.get("order_settle_id")
if sid and sid != 0:
settle_data[sid]["团购核销"] += 1
# 支付relate_id = order_settle_id
for r in payments:
rid = r.get("relate_id")
if rid and rid in settle_data:
settle_data[rid]["支付"] += 1
# 退款
for r in refunds:
rid = r.get("relate_id")
if rid and rid in settle_data:
settle_data[rid]["退款"] += 1
# --- 筛选:多台桌 或 多助教 的结算单 ---
multi_table = []
multi_assistant = []
for sid, d in settle_data.items():
n_tables = len(d["台桌"])
n_assistants = len(set(d["助教"])) # 去重助教名
if n_tables >= 2:
multi_table.append((sid, d, n_tables, n_assistants))
if n_assistants >= 2:
multi_assistant.append((sid, d, n_tables, n_assistants))
multi_table.sort(key=lambda x: x[2], reverse=True)
multi_assistant.sort(key=lambda x: x[3], reverse=True)
# --- 输出:多台桌 ---
print("=" * 100)
print(f" 多台桌结算单 Top 10{len(multi_table)} 个结算单含 ≥2 台桌)")
print("=" * 100)
for i, (sid, d, nt, na) in enumerate(multi_table[:10], 1):
unique_assistants = sorted(set(d["助教"]))
print(f"\n{'' * 80}")
print(f" #{i} order_settle_id = {sid}")
print(f" 创建时间: {d['create_time']}")
print(f" 台桌数: {nt} | 助教数: {len(unique_assistants)} | 台费折扣: {d['台费折扣']} | 团购核销: {d['团购核销']} | 支付: {d['支付']} | 退款: {d['退款']}")
print(f" 台桌列表: {', '.join(d['台桌'])}")
if unique_assistants:
print(f" 助教列表: {', '.join(unique_assistants)}")
# 显示各台桌的金额
for r in d["台桌记录"]:
amt = r.get("ledger_amount", 0)
secs = r.get("real_table_use_seconds", r.get("ledger_count", 0))
hours = secs / 3600 if secs else 0
tno = r.get("order_trade_no", "?")
print(f"{r.get('ledger_name','?'):8s} 金额={amt:>8.2f} 时长={hours:.1f}h trade_no={tno}")
# --- 输出:多助教 ---
print(f"\n\n{'=' * 100}")
print(f" 多助教结算单 Top 10{len(multi_assistant)} 个结算单含 ≥2 位助教)")
print("=" * 100)
for i, (sid, d, nt, na) in enumerate(multi_assistant[:10], 1):
unique_assistants = sorted(set(d["助教"]))
print(f"\n{'' * 80}")
print(f" #{i} order_settle_id = {sid}")
print(f" 创建时间: {d['create_time']}")
print(f" 台桌数: {nt} | 助教数: {len(unique_assistants)} | 台费折扣: {d['台费折扣']} | 团购核销: {d['团购核销']} | 支付: {d['支付']} | 退款: {d['退款']}")
print(f" 台桌列表: {', '.join(d['台桌'])}")
print(f" 助教列表: {', '.join(unique_assistants)}")
# 显示各助教的服务详情
for r in d["助教记录"]:
aname = r.get("assistantName", r.get("ledger_name", "?"))
skill = r.get("skillName", "?")
amt = r.get("ledger_amount", 0)
tname = r.get("tableName", "?")
print(f" → 助教={aname:6s} 技能={skill:6s} 台桌={tname:6s} 金额={amt:>8.2f}")
# --- 输出:同时多台桌+多助教 ---
both = [(sid, d, nt, na) for sid, d, nt, na in multi_table if na >= 2]
both.sort(key=lambda x: x[2] + x[3], reverse=True)
if both:
print(f"\n\n{'=' * 100}")
print(f" 同时多台桌+多助教(共 {len(both)} 个)")
print("=" * 100)
for i, (sid, d, nt, na) in enumerate(both[:10], 1):
unique_assistants = sorted(set(d["助教"]))
print(f"\n{'' * 80}")
print(f" #{i} order_settle_id = {sid}")
print(f" 创建时间: {d['create_time']}")
print(f" 台桌数: {nt} | 助教数: {len(unique_assistants)} | 台费折扣: {d['台费折扣']} | 团购核销: {d['团购核销']} | 支付: {d['支付']} | 退款: {d['退款']}")
print(f" 台桌: {', '.join(d['台桌'])}")
print(f" 助教: {', '.join(unique_assistants)}")
print(f"\n{'' * 80}")
print(f"\n统计摘要:")
print(f" 总结算单数: {len(settle_data)}")
print(f" 含 ≥2 台桌: {len(multi_table)}")
print(f" 含 ≥2 助教: {len(multi_assistant)}")
print(f" 同时多台桌+多助教: {len(both)}")
if __name__ == "__main__":
main()