# -*- coding: utf-8 -*- """在线抓取 ODS 相关接口并落盘为 JSON(用于后续离线回放/入库)。""" from __future__ import annotations from dataclasses import dataclass from pathlib import Path from api.client import APIClient from models.parsers import TypeParser from utils.json_store import dump_json, endpoint_to_filename from .base_task import BaseTask, TaskContext @dataclass(frozen=True) class EndpointSpec: endpoint: str window_style: str # site | start_end | range | pay | none data_path: tuple[str, ...] = ("data",) list_key: str | None = None class OdsJsonArchiveTask(BaseTask): """ 抓取一组 ODS 所需接口并落盘为“简化 JSON”: {"code": 0, "data": [...records...]} 说明: - 该输出格式与 tasks/manual_ingest_task.py 的解析逻辑兼容; - 默认每页一个文件,避免单文件过大; - 结算小票(/Order/GetOrderSettleTicketNew)按 orderSettleId 分文件写入。 """ ENDPOINTS: tuple[EndpointSpec, ...] = ( EndpointSpec("/MemberProfile/GetTenantMemberList", "site", list_key="tenantMemberInfos"), EndpointSpec("/MemberProfile/GetTenantMemberCardList", "site", list_key="tenantMemberCards"), EndpointSpec("/MemberProfile/GetMemberCardBalanceChange", "start_end"), EndpointSpec("/PersonnelManagement/SearchAssistantInfo", "site", list_key="assistantInfos"), EndpointSpec( "/AssistantPerformance/GetOrderAssistantDetails", "start_end", list_key="orderAssistantDetails", ), EndpointSpec( "/AssistantPerformance/GetAbolitionAssistant", "start_end", list_key="abolitionAssistants", ), EndpointSpec("/Table/GetSiteTables", "site", list_key="siteTables"), EndpointSpec( "/TenantGoodsCategory/QueryPrimarySecondaryCategory", "site", list_key="goodsCategoryList", ), EndpointSpec("/TenantGoods/QueryTenantGoods", "site", list_key="tenantGoodsList"), EndpointSpec("/TenantGoods/GetGoodsInventoryList", "site", list_key="orderGoodsList"), EndpointSpec("/TenantGoods/GetGoodsStockReport", "site"), EndpointSpec("/TenantGoods/GetGoodsSalesList", "start_end", list_key="orderGoodsLedgers"), EndpointSpec( "/PackageCoupon/QueryPackageCouponList", "site", list_key="packageCouponList", ), EndpointSpec("/Site/GetSiteTableUseDetails", "start_end", list_key="siteTableUseDetailsList"), EndpointSpec("/Site/GetSiteTableOrderDetails", "start_end", list_key="siteTableUseDetailsList"), EndpointSpec("/Site/GetTaiFeeAdjustList", "start_end", list_key="taiFeeAdjustInfos"), EndpointSpec( "/GoodsStockManage/QueryGoodsOutboundReceipt", "start_end", list_key="queryDeliveryRecordsList", ), EndpointSpec("/Promotion/GetOfflineCouponConsumePageList", "start_end"), EndpointSpec("/Order/GetRefundPayLogList", "start_end"), EndpointSpec("/Site/GetAllOrderSettleList", "range", list_key="settleList"), EndpointSpec("/Site/GetRechargeSettleList", "range", list_key="settleList"), EndpointSpec("/PayLog/GetPayLogListPage", "pay"), ) TICKET_ENDPOINT = "/Order/GetOrderSettleTicketNew" def get_task_code(self) -> str: return "ODS_JSON_ARCHIVE" def extract(self, context: TaskContext) -> dict: base_client = getattr(self.api, "base", None) or self.api if not isinstance(base_client, APIClient): raise TypeError("ODS_JSON_ARCHIVE 需要 APIClient(在线抓取)") output_dir = getattr(self.api, "output_dir", None) if output_dir: out = Path(output_dir) else: out = Path(self.config.get("pipeline.fetch_root") or self.config["pipeline"]["fetch_root"]) out.mkdir(parents=True, exist_ok=True) write_pretty = bool(self.config.get("io.write_pretty_json", False)) page_size = int(self.config.get("api.page_size", 200) or 200) store_id = int(context.store_id) total_records = 0 ticket_ids: set[int] = set() per_endpoint: list[dict] = [] self.logger.info( "ODS_JSON_ARCHIVE: 开始抓取,窗口[%s ~ %s] 输出目录=%s", context.window_start, context.window_end, out, ) for spec in self.ENDPOINTS: self.logger.info("ODS_JSON_ARCHIVE: 抓取 endpoint=%s", spec.endpoint) built_params = self._build_params( spec.window_style, store_id, context.window_start, context.window_end ) # /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400) if spec.endpoint == "/TenantGoods/GetGoodsInventoryList": built_params["siteId"] = [store_id] params = self._merge_common_params(built_params) base_filename = endpoint_to_filename(spec.endpoint) stem = Path(base_filename).stem suffix = Path(base_filename).suffix or ".json" endpoint_records = 0 endpoint_pages = 0 endpoint_error: str | None = None try: for page_no, records, _, _ in base_client.iter_paginated( endpoint=spec.endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, ): endpoint_pages += 1 total_records += len(records) endpoint_records += len(records) if spec.endpoint == "/PayLog/GetPayLogListPage": for rec in records or []: relate_id = TypeParser.parse_int( (rec or {}).get("relateId") or (rec or {}).get("orderSettleId") or (rec or {}).get("order_settle_id") ) if relate_id: ticket_ids.add(relate_id) out_path = out / f"{stem}__p{int(page_no):04d}{suffix}" dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty) except Exception as exc: # noqa: BLE001 endpoint_error = f"{type(exc).__name__}: {exc}" self.logger.error("ODS_JSON_ARCHIVE: 接口抓取失败 endpoint=%s err=%s", spec.endpoint, endpoint_error) per_endpoint.append( { "endpoint": spec.endpoint, "file_stem": stem, "pages": endpoint_pages, "records": endpoint_records, "error": endpoint_error, } ) if endpoint_error: self.logger.warning( "ODS_JSON_ARCHIVE: endpoint=%s 完成(失败)pages=%s records=%s err=%s", spec.endpoint, endpoint_pages, endpoint_records, endpoint_error, ) else: self.logger.info( "ODS_JSON_ARCHIVE: endpoint=%s 完成 pages=%s records=%s", spec.endpoint, endpoint_pages, endpoint_records, ) # Ticket details: per orderSettleId ticket_ids_sorted = sorted(ticket_ids) self.logger.info("ODS_JSON_ARCHIVE: 小票候选数=%s", len(ticket_ids_sorted)) ticket_file_stem = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).stem ticket_file_suffix = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).suffix or ".json" ticket_records = 0 for order_settle_id in ticket_ids_sorted: params = self._merge_common_params({"orderSettleId": int(order_settle_id)}) try: records, _ = base_client.get_paginated( endpoint=self.TICKET_ENDPOINT, params=params, page_size=None, data_path=("data",), list_key=None, ) if not records: continue ticket_records += len(records) out_path = out / f"{ticket_file_stem}__{int(order_settle_id)}{ticket_file_suffix}" dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty) except Exception as exc: # noqa: BLE001 self.logger.error( "ODS_JSON_ARCHIVE: 小票抓取失败 orderSettleId=%s err=%s", order_settle_id, exc, ) continue total_records += ticket_records manifest = { "task": self.get_task_code(), "store_id": store_id, "window_start": context.window_start.isoformat(), "window_end": context.window_end.isoformat(), "page_size": page_size, "total_records": total_records, "ticket_ids": len(ticket_ids_sorted), "ticket_records": ticket_records, "endpoints": per_endpoint, } manifest_path = out / "manifest.json" dump_json(manifest_path, manifest, pretty=True) if hasattr(self.api, "last_dump"): try: self.api.last_dump = {"file": str(manifest_path), "records": total_records, "pages": None} except Exception: pass self.logger.info("ODS_JSON_ARCHIVE: 抓取完成,总记录数=%s(含小票=%s)", total_records, ticket_records) return {"fetched": total_records, "ticket_ids": len(ticket_ids_sorted)} def _build_params(self, window_style: str, store_id: int, window_start, window_end) -> dict: if window_style == "none": return {} if window_style == "site": return {"siteId": store_id} if window_style == "range": return { "siteId": store_id, "rangeStartTime": TypeParser.format_timestamp(window_start, self.tz), "rangeEndTime": TypeParser.format_timestamp(window_end, self.tz), } if window_style == "pay": return { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, self.tz), "EndPayTime": TypeParser.format_timestamp(window_end, self.tz), } # default: startTime/endTime return { "siteId": store_id, "startTime": TypeParser.format_timestamp(window_start, self.tz), "endTime": TypeParser.format_timestamp(window_end, self.tz), }