119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""包装 APIClient,将分页响应落盘便于后续本地清洗。"""
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Iterable, Tuple
|
||
|
||
from api.client import APIClient
|
||
from utils.json_store import dump_json, endpoint_to_filename
|
||
|
||
|
||
class RecordingAPIClient:
|
||
"""
|
||
代理 APIClient,在调用 iter_paginated/get_paginated 时同时把响应写入 JSON 文件。
|
||
文件名根据 endpoint 生成,写入到指定 output_dir。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
base_client: APIClient,
|
||
output_dir: Path | str,
|
||
task_code: str,
|
||
run_id: int,
|
||
write_pretty: bool = False,
|
||
):
|
||
self.base = base_client
|
||
self.output_dir = Path(output_dir)
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
self.task_code = task_code
|
||
self.run_id = run_id
|
||
self.write_pretty = write_pretty
|
||
self.last_dump: dict[str, Any] | None = None
|
||
|
||
# ------------------------------------------------------------------ public API
|
||
def iter_paginated(
|
||
self,
|
||
endpoint: str,
|
||
params: dict | None,
|
||
page_size: int = 200,
|
||
page_field: str = "page",
|
||
size_field: str = "limit",
|
||
data_path: tuple = ("data",),
|
||
list_key: str | None = None,
|
||
) -> Iterable[Tuple[int, list, dict, dict]]:
|
||
pages: list[dict[str, Any]] = []
|
||
total_records = 0
|
||
|
||
for page_no, records, request_params, response in self.base.iter_paginated(
|
||
endpoint=endpoint,
|
||
params=params,
|
||
page_size=page_size,
|
||
page_field=page_field,
|
||
size_field=size_field,
|
||
data_path=data_path,
|
||
list_key=list_key,
|
||
):
|
||
pages.append({"page": page_no, "request": request_params, "response": response})
|
||
total_records += len(records)
|
||
yield page_no, records, request_params, response
|
||
|
||
self._dump(endpoint, params, page_size, pages, total_records)
|
||
|
||
def get_paginated(
|
||
self,
|
||
endpoint: str,
|
||
params: dict,
|
||
page_size: int = 200,
|
||
page_field: str = "page",
|
||
size_field: str = "limit",
|
||
data_path: tuple = ("data",),
|
||
list_key: str | None = None,
|
||
) -> tuple[list, list]:
|
||
records: list = []
|
||
pages_meta: list = []
|
||
|
||
for page_no, page_records, request_params, response in self.iter_paginated(
|
||
endpoint=endpoint,
|
||
params=params,
|
||
page_size=page_size,
|
||
page_field=page_field,
|
||
size_field=size_field,
|
||
data_path=data_path,
|
||
list_key=list_key,
|
||
):
|
||
records.extend(page_records)
|
||
pages_meta.append({"page": page_no, "request": request_params, "response": response})
|
||
|
||
return records, pages_meta
|
||
|
||
# ------------------------------------------------------------------ internal
|
||
def _dump(
|
||
self,
|
||
endpoint: str,
|
||
params: dict | None,
|
||
page_size: int,
|
||
pages: list[dict[str, Any]],
|
||
total_records: int,
|
||
):
|
||
filename = endpoint_to_filename(endpoint)
|
||
path = self.output_dir / filename
|
||
payload = {
|
||
"task_code": self.task_code,
|
||
"run_id": self.run_id,
|
||
"endpoint": endpoint,
|
||
"params": params or {},
|
||
"page_size": page_size,
|
||
"pages": pages,
|
||
"total_records": total_records,
|
||
"dumped_at": datetime.utcnow().isoformat() + "Z",
|
||
}
|
||
dump_json(path, payload, pretty=self.write_pretty)
|
||
self.last_dump = {
|
||
"file": str(path),
|
||
"endpoint": endpoint,
|
||
"pages": len(pages),
|
||
"records": total_records,
|
||
}
|