Files
feiqiu-ETL/etl_billiards/tasks/ods_json_archive_task.py
2026-01-27 22:47:05 +08:00

261 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""在线抓取 ODS 相关接口并落盘为 JSON用于后续离线回放/入库)。"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from api.client import APIClient
from models.parsers import TypeParser
from utils.json_store import dump_json, endpoint_to_filename
from .base_task import BaseTask, TaskContext
@dataclass(frozen=True)
class EndpointSpec:
endpoint: str
window_style: str # site | start_end | range | pay | none
data_path: tuple[str, ...] = ("data",)
list_key: str | None = None
class OdsJsonArchiveTask(BaseTask):
"""
抓取一组 ODS 所需接口并落盘为“简化 JSON”
{"code": 0, "data": [...records...]}
说明:
- 该输出格式与 tasks/manual_ingest_task.py 的解析逻辑兼容;
- 默认每页一个文件,避免单文件过大;
- 结算小票(/Order/GetOrderSettleTicketNew按 orderSettleId 分文件写入。
"""
ENDPOINTS: tuple[EndpointSpec, ...] = (
EndpointSpec("/MemberProfile/GetTenantMemberList", "site", list_key="tenantMemberInfos"),
EndpointSpec("/MemberProfile/GetTenantMemberCardList", "site", list_key="tenantMemberCards"),
EndpointSpec("/MemberProfile/GetMemberCardBalanceChange", "start_end"),
EndpointSpec("/PersonnelManagement/SearchAssistantInfo", "site", list_key="assistantInfos"),
EndpointSpec(
"/AssistantPerformance/GetOrderAssistantDetails",
"start_end",
list_key="orderAssistantDetails",
),
EndpointSpec(
"/AssistantPerformance/GetAbolitionAssistant",
"start_end",
list_key="abolitionAssistants",
),
EndpointSpec("/Table/GetSiteTables", "site", list_key="siteTables"),
EndpointSpec(
"/TenantGoodsCategory/QueryPrimarySecondaryCategory",
"site",
list_key="goodsCategoryList",
),
EndpointSpec("/TenantGoods/QueryTenantGoods", "site", list_key="tenantGoodsList"),
EndpointSpec("/TenantGoods/GetGoodsInventoryList", "site", list_key="orderGoodsList"),
EndpointSpec("/TenantGoods/GetGoodsStockReport", "site"),
EndpointSpec("/TenantGoods/GetGoodsSalesList", "start_end", list_key="orderGoodsLedgers"),
EndpointSpec(
"/PackageCoupon/QueryPackageCouponList",
"site",
list_key="packageCouponList",
),
EndpointSpec("/Site/GetSiteTableUseDetails", "start_end", list_key="siteTableUseDetailsList"),
EndpointSpec("/Site/GetSiteTableOrderDetails", "start_end", list_key="siteTableUseDetailsList"),
EndpointSpec("/Site/GetTaiFeeAdjustList", "start_end", list_key="taiFeeAdjustInfos"),
EndpointSpec(
"/GoodsStockManage/QueryGoodsOutboundReceipt",
"start_end",
list_key="queryDeliveryRecordsList",
),
EndpointSpec("/Promotion/GetOfflineCouponConsumePageList", "start_end"),
EndpointSpec("/Order/GetRefundPayLogList", "start_end"),
EndpointSpec("/Site/GetAllOrderSettleList", "range", list_key="settleList"),
EndpointSpec("/Site/GetRechargeSettleList", "range", list_key="settleList"),
EndpointSpec("/PayLog/GetPayLogListPage", "pay"),
)
TICKET_ENDPOINT = "/Order/GetOrderSettleTicketNew"
def get_task_code(self) -> str:
return "ODS_JSON_ARCHIVE"
def extract(self, context: TaskContext) -> dict:
base_client = getattr(self.api, "base", None) or self.api
if not isinstance(base_client, APIClient):
raise TypeError("ODS_JSON_ARCHIVE 需要 APIClient在线抓取")
output_dir = getattr(self.api, "output_dir", None)
if output_dir:
out = Path(output_dir)
else:
out = Path(self.config.get("pipeline.fetch_root") or self.config["pipeline"]["fetch_root"])
out.mkdir(parents=True, exist_ok=True)
write_pretty = bool(self.config.get("io.write_pretty_json", False))
page_size = int(self.config.get("api.page_size", 200) or 200)
store_id = int(context.store_id)
total_records = 0
ticket_ids: set[int] = set()
per_endpoint: list[dict] = []
self.logger.info(
"ODS_JSON_ARCHIVE: 开始抓取,窗口[%s ~ %s] 输出目录=%s",
context.window_start,
context.window_end,
out,
)
for spec in self.ENDPOINTS:
self.logger.info("ODS_JSON_ARCHIVE: 抓取 endpoint=%s", spec.endpoint)
built_params = self._build_params(
spec.window_style, store_id, context.window_start, context.window_end
)
# /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400
if spec.endpoint == "/TenantGoods/GetGoodsInventoryList":
built_params["siteId"] = [store_id]
params = self._merge_common_params(built_params)
base_filename = endpoint_to_filename(spec.endpoint)
stem = Path(base_filename).stem
suffix = Path(base_filename).suffix or ".json"
endpoint_records = 0
endpoint_pages = 0
endpoint_error: str | None = None
try:
for page_no, records, _, _ in base_client.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
endpoint_pages += 1
total_records += len(records)
endpoint_records += len(records)
if spec.endpoint == "/PayLog/GetPayLogListPage":
for rec in records or []:
relate_id = TypeParser.parse_int(
(rec or {}).get("relateId")
or (rec or {}).get("orderSettleId")
or (rec or {}).get("order_settle_id")
)
if relate_id:
ticket_ids.add(relate_id)
out_path = out / f"{stem}__p{int(page_no):04d}{suffix}"
dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty)
except Exception as exc: # noqa: BLE001
endpoint_error = f"{type(exc).__name__}: {exc}"
self.logger.error("ODS_JSON_ARCHIVE: 接口抓取失败 endpoint=%s err=%s", spec.endpoint, endpoint_error)
per_endpoint.append(
{
"endpoint": spec.endpoint,
"file_stem": stem,
"pages": endpoint_pages,
"records": endpoint_records,
"error": endpoint_error,
}
)
if endpoint_error:
self.logger.warning(
"ODS_JSON_ARCHIVE: endpoint=%s 完成失败pages=%s records=%s err=%s",
spec.endpoint,
endpoint_pages,
endpoint_records,
endpoint_error,
)
else:
self.logger.info(
"ODS_JSON_ARCHIVE: endpoint=%s 完成 pages=%s records=%s",
spec.endpoint,
endpoint_pages,
endpoint_records,
)
# Ticket details: per orderSettleId
ticket_ids_sorted = sorted(ticket_ids)
self.logger.info("ODS_JSON_ARCHIVE: 小票候选数=%s", len(ticket_ids_sorted))
ticket_file_stem = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).stem
ticket_file_suffix = Path(endpoint_to_filename(self.TICKET_ENDPOINT)).suffix or ".json"
ticket_records = 0
for order_settle_id in ticket_ids_sorted:
params = self._merge_common_params({"orderSettleId": int(order_settle_id)})
try:
records, _ = base_client.get_paginated(
endpoint=self.TICKET_ENDPOINT,
params=params,
page_size=None,
data_path=("data",),
list_key=None,
)
if not records:
continue
ticket_records += len(records)
out_path = out / f"{ticket_file_stem}__{int(order_settle_id)}{ticket_file_suffix}"
dump_json(out_path, {"code": 0, "data": records}, pretty=write_pretty)
except Exception as exc: # noqa: BLE001
self.logger.error(
"ODS_JSON_ARCHIVE: 小票抓取失败 orderSettleId=%s err=%s",
order_settle_id,
exc,
)
continue
total_records += ticket_records
manifest = {
"task": self.get_task_code(),
"store_id": store_id,
"window_start": context.window_start.isoformat(),
"window_end": context.window_end.isoformat(),
"page_size": page_size,
"total_records": total_records,
"ticket_ids": len(ticket_ids_sorted),
"ticket_records": ticket_records,
"endpoints": per_endpoint,
}
manifest_path = out / "manifest.json"
dump_json(manifest_path, manifest, pretty=True)
if hasattr(self.api, "last_dump"):
try:
self.api.last_dump = {"file": str(manifest_path), "records": total_records, "pages": None}
except Exception:
pass
self.logger.info("ODS_JSON_ARCHIVE: 抓取完成,总记录数=%s(含小票=%s", total_records, ticket_records)
return {"fetched": total_records, "ticket_ids": len(ticket_ids_sorted)}
def _build_params(self, window_style: str, store_id: int, window_start, window_end) -> dict:
if window_style == "none":
return {}
if window_style == "site":
return {"siteId": store_id}
if window_style == "range":
return {
"siteId": store_id,
"rangeStartTime": TypeParser.format_timestamp(window_start, self.tz),
"rangeEndTime": TypeParser.format_timestamp(window_end, self.tz),
}
if window_style == "pay":
return {
"siteId": store_id,
"StartPayTime": TypeParser.format_timestamp(window_start, self.tz),
"EndPayTime": TypeParser.format_timestamp(window_end, self.tz),
}
# default: startTime/endTime
return {
"siteId": store_id,
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}