# -*- coding: utf-8 -*- """ 精准抓取复杂订单的原始 JSON 数据。 策略: - 无时间窗口 endpoint:全量分页拉取,本地按目标 ID 过滤 - 有时间窗口 endpoint:根据订单时间计算最小窗口 - 输出到 EXPORT_ROOT/complex_order_samples/ 使用方式: cd apps/etl/connectors/feiqiu python C:/NeoZQYY/scripts/ops/fetch_complex_orders.py """ from __future__ import annotations import json import sys from datetime import datetime, timedelta from pathlib import Path from zoneinfo import ZoneInfo # 加载 ETL 配置(cwd 必须是 apps/etl/connectors/feiqiu) sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "apps" / "etl" / "connectors" / "feiqiu")) sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from ops._env_paths import get_output_path from config.settings import AppConfig from api.client import APIClient TZ = ZoneInfo("Asia/Shanghai") # 目标结算单 ID(多台桌 Top 10 + 多助教 1 个,去重) TARGET_SETTLE_IDS = { 3074058525934981, 2990834049419973, 2988030668589701, 2985297809476293, 3038949978147525, 3038784012766405, 2996296638500997, 2973323104635077, 3079943731579077, 3077096251540933, 3090257727786373, # 多助教 } # 目标 order_trade_no(按 trade_no 复杂度 Top 10) TARGET_TRADE_NOS = { 3046483289245381, 3089564402716037, 3083627248651781, 3026717319448517, 3022503741327173, 3010113035667909, 2995425896140997, 2988736415615685, 3092230766020741, 3091871356586309, } # 合并所有需要关注的 ID ALL_SETTLE_IDS = TARGET_SETTLE_IDS.copy() ALL_TRADE_NOS = TARGET_TRADE_NOS.copy() def fetch_all_pages(client: APIClient, endpoint: str, params: dict, list_key: str | None = None) -> list[dict]: """全量分页拉取,返回所有记录。""" records, _ = client.get_paginated( endpoint=endpoint, params=params, page_size=200, list_key=list_key, ) return records def filter_by_settle_or_trade(records: list[dict], settle_ids: set, trade_nos: set) -> list[dict]: """按 order_settle_id 或 order_trade_no 过滤。""" out = [] for r in records: sid = r.get("order_settle_id", 0) tno = r.get("order_trade_no", 0) if sid in settle_ids or tno in trade_nos: out.append(r) return out def filter_by_relate_id(records: list[dict], settle_ids: set) -> list[dict]: """按 relate_id(= order_settle_id)过滤支付/退款。""" return [r for r in records if r.get("relate_id", 0) in settle_ids] def save_json(out_dir: Path, filename: str, data: list[dict], meta: dict | None = None): """保存 JSON 文件。""" payload = {"meta": meta or {}, "count": len(data), "records": data} fp = out_dir / filename fp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8") print(f" → 保存 {fp.name}: {len(data)} 条记录") def main(): cfg = AppConfig.load() site_id = cfg.get("app.store_id") base_url = cfg.get("api.base_url") token = cfg.get("api.token") if not token: raise RuntimeError("API_TOKEN 未配置,请检查 .env") client = APIClient(base_url=base_url, token=token, timeout=cfg.get("api.timeout_sec", 20)) # 输出目录 export_root = get_output_path("EXPORT_ROOT") out_dir = export_root / "complex_order_samples" out_dir.mkdir(parents=True, exist_ok=True) base_params = {"siteId": site_id} # 第一步:先拉台桌使用记录,建立 settle_id ↔ trade_no 映射 print("=== 1/7 拉取台桌使用记录 ===") table_use_all = fetch_all_pages(client, "/Site/GetSiteTableOrderDetails", base_params, "siteTableUseDetailsList") print(f" 总记录: {len(table_use_all)}") # 从台桌使用记录中补充 settle_id ↔ trade_no 映射 settle_to_trades: dict[int, set[int]] = {} trade_to_settle: dict[int, int] = {} for r in table_use_all: sid = r.get("order_settle_id", 0) tno = r.get("order_trade_no", 0) if sid and tno: settle_to_trades.setdefault(sid, set()).add(tno) trade_to_settle[tno] = sid # 扩展:从 trade_no 反查 settle_id,从 settle_id 扩展 trade_no for tno in list(ALL_TRADE_NOS): sid = trade_to_settle.get(tno) if sid: ALL_SETTLE_IDS.add(sid) for sid in list(ALL_SETTLE_IDS): for tno in settle_to_trades.get(sid, set()): ALL_TRADE_NOS.add(tno) print(f" 扩展后目标: {len(ALL_SETTLE_IDS)} settle_ids, {len(ALL_TRADE_NOS)} trade_nos") # 过滤台桌使用记录 table_use = filter_by_settle_or_trade(table_use_all, ALL_SETTLE_IDS, ALL_TRADE_NOS) save_json(out_dir, "table_fee_transactions.json", table_use, {"endpoint": "/Site/GetSiteTableOrderDetails"}) # 2. 台费折扣 print("\n=== 2/7 拉取台费折扣记录 ===") discount_all = fetch_all_pages(client, "/Site/GetTaiFeeAdjustList", base_params, "taiFeeAdjustInfos") print(f" 总记录: {len(discount_all)}") discount = filter_by_settle_or_trade(discount_all, ALL_SETTLE_IDS, ALL_TRADE_NOS) save_json(out_dir, "table_fee_discount_records.json", discount, {"endpoint": "/Site/GetTaiFeeAdjustList"}) # 3. 团购核销 print("\n=== 3/7 拉取团购核销记录 ===") groupbuy_all = fetch_all_pages(client, "/Site/GetSiteTableUseDetails", base_params, "siteTableUseDetailsList") print(f" 总记录: {len(groupbuy_all)}") groupbuy = filter_by_settle_or_trade(groupbuy_all, ALL_SETTLE_IDS, ALL_TRADE_NOS) save_json(out_dir, "group_buy_redemption_records.json", groupbuy, {"endpoint": "/Site/GetSiteTableUseDetails"}) # 4. 商品销售 print("\n=== 4/7 拉取商品销售记录 ===") goods_all = fetch_all_pages(client, "/TenantGoods/GetGoodsSalesList", base_params, "orderGoodsLedgers") print(f" 总记录: {len(goods_all)}") goods = filter_by_settle_or_trade(goods_all, ALL_SETTLE_IDS, ALL_TRADE_NOS) save_json(out_dir, "store_goods_sales_records.json", goods, {"endpoint": "/TenantGoods/GetGoodsSalesList"}) # 5. 助教服务(有时间窗口:startTime/endTime) # 计算最小窗口:从目标订单的 create_time 推算 print("\n=== 5/7 拉取助教服务记录 ===") target_times = [] for r in table_use: ct = r.get("create_time", "") if ct: try: target_times.append(datetime.strptime(ct, "%Y-%m-%d %H:%M:%S")) except ValueError: pass if target_times: win_start = min(target_times) - timedelta(hours=24) win_end = max(target_times) + timedelta(hours=24) else: win_start = datetime(2025, 11, 1) win_end = datetime(2026, 3, 1) assistant_params = { "siteId": site_id, "startTime": win_start.strftime("%Y-%m-%d %H:%M:%S"), "endTime": win_end.strftime("%Y-%m-%d %H:%M:%S"), } assistant_all = fetch_all_pages(client, "/AssistantPerformance/GetOrderAssistantDetails", assistant_params, "orderAssistantDetails") print(f" 时间窗口: {assistant_params['startTime']} ~ {assistant_params['endTime']}") print(f" 总记录: {len(assistant_all)}") assistant = filter_by_settle_or_trade(assistant_all, ALL_SETTLE_IDS, ALL_TRADE_NOS) save_json(out_dir, "assistant_service_records.json", assistant, { "endpoint": "/AssistantPerformance/GetOrderAssistantDetails", "window": {"start": assistant_params["startTime"], "end": assistant_params["endTime"]}, }) # 6. 支付记录(有时间窗口:StartPayTime/EndPayTime) print("\n=== 6/7 拉取支付记录 ===") pay_params = { "siteId": site_id, "StartPayTime": win_start.strftime("%Y-%m-%d %H:%M:%S"), "EndPayTime": win_end.strftime("%Y-%m-%d %H:%M:%S"), } payment_all = fetch_all_pages(client, "/PayLog/GetPayLogListPage", pay_params) print(f" 时间窗口: {pay_params['StartPayTime']} ~ {pay_params['EndPayTime']}") print(f" 总记录: {len(payment_all)}") payment = filter_by_relate_id(payment_all, ALL_SETTLE_IDS) save_json(out_dir, "payment_transactions.json", payment, { "endpoint": "/PayLog/GetPayLogListPage", "window": {"start": pay_params["StartPayTime"], "end": pay_params["EndPayTime"]}, }) # 7. 退款记录 print("\n=== 7/7 拉取退款记录 ===") refund_all = fetch_all_pages(client, "/Order/GetRefundPayLogList", base_params) print(f" 总记录: {len(refund_all)}") refund = filter_by_relate_id(refund_all, ALL_SETTLE_IDS) save_json(out_dir, "refund_transactions.json", refund, {"endpoint": "/Order/GetRefundPayLogList"}) # 汇总 print(f"\n{'=' * 60}") print(f"抓取完成,输出目录: {out_dir}") print(f"目标结算单: {len(ALL_SETTLE_IDS)} 个") print(f"目标台桌订单: {len(ALL_TRADE_NOS)} 个") print(f"文件列表:") for f in sorted(out_dir.glob("*.json")): data = json.loads(f.read_text(encoding="utf-8")) print(f" {f.name}: {data.get('count', '?')} 条") if __name__ == "__main__": main()