# -*- coding: utf-8 -*- """包装 APIClient,将分页响应落盘便于后续本地清洗。""" from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Any, Iterable, Tuple from api.client import APIClient from utils.json_store import dump_json, endpoint_to_filename class RecordingAPIClient: """ 代理 APIClient,在调用 iter_paginated/get_paginated 时同时把响应写入 JSON 文件。 文件名根据 endpoint 生成,写入到指定 output_dir。 """ def __init__( self, base_client: APIClient, output_dir: Path | str, task_code: str, run_id: int, write_pretty: bool = False, ): self.base = base_client self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.task_code = task_code self.run_id = run_id self.write_pretty = write_pretty self.last_dump: dict[str, Any] | None = None # ------------------------------------------------------------------ public API def iter_paginated( self, endpoint: str, params: dict | None, page_size: int = 200, page_field: str = "page", size_field: str = "limit", data_path: tuple = ("data",), list_key: str | None = None, ) -> Iterable[Tuple[int, list, dict, dict]]: pages: list[dict[str, Any]] = [] total_records = 0 for page_no, records, request_params, response in self.base.iter_paginated( endpoint=endpoint, params=params, page_size=page_size, page_field=page_field, size_field=size_field, data_path=data_path, list_key=list_key, ): pages.append({"page": page_no, "request": request_params, "response": response}) total_records += len(records) yield page_no, records, request_params, response self._dump(endpoint, params, page_size, pages, total_records) def get_paginated( self, endpoint: str, params: dict, page_size: int = 200, page_field: str = "page", size_field: str = "limit", data_path: tuple = ("data",), list_key: str | None = None, ) -> tuple[list, list]: records: list = [] pages_meta: list = [] for page_no, page_records, request_params, response in self.iter_paginated( endpoint=endpoint, params=params, page_size=page_size, page_field=page_field, size_field=size_field, data_path=data_path, list_key=list_key, ): records.extend(page_records) pages_meta.append({"page": page_no, "request": request_params, "response": response}) return records, pages_meta # ------------------------------------------------------------------ internal def _dump( self, endpoint: str, params: dict | None, page_size: int, pages: list[dict[str, Any]], total_records: int, ): filename = endpoint_to_filename(endpoint) path = self.output_dir / filename payload = { "task_code": self.task_code, "run_id": self.run_id, "endpoint": endpoint, "params": params or {}, "page_size": page_size, "pages": pages, "total_records": total_records, "dumped_at": datetime.utcnow().isoformat() + "Z", } dump_json(path, payload, pretty=self.write_pretty) self.last_dump = { "file": str(path), "endpoint": endpoint, "pages": len(pages), "records": total_records, }