Files
feiqiu-ETL/etl_billiards/api/recording_client.py
2025-11-30 07:18:55 +08:00

119 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""包装 APIClient将分页响应落盘便于后续本地清洗。"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Tuple
from api.client import APIClient
from utils.json_store import dump_json, endpoint_to_filename
class RecordingAPIClient:
"""
代理 APIClient在调用 iter_paginated/get_paginated 时同时把响应写入 JSON 文件。
文件名根据 endpoint 生成,写入到指定 output_dir。
"""
def __init__(
self,
base_client: APIClient,
output_dir: Path | str,
task_code: str,
run_id: int,
write_pretty: bool = False,
):
self.base = base_client
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.task_code = task_code
self.run_id = run_id
self.write_pretty = write_pretty
self.last_dump: dict[str, Any] | None = None
# ------------------------------------------------------------------ public API
def iter_paginated(
self,
endpoint: str,
params: dict | None,
page_size: int = 200,
page_field: str = "page",
size_field: str = "limit",
data_path: tuple = ("data",),
list_key: str | None = None,
) -> Iterable[Tuple[int, list, dict, dict]]:
pages: list[dict[str, Any]] = []
total_records = 0
for page_no, records, request_params, response in self.base.iter_paginated(
endpoint=endpoint,
params=params,
page_size=page_size,
page_field=page_field,
size_field=size_field,
data_path=data_path,
list_key=list_key,
):
pages.append({"page": page_no, "request": request_params, "response": response})
total_records += len(records)
yield page_no, records, request_params, response
self._dump(endpoint, params, page_size, pages, total_records)
def get_paginated(
self,
endpoint: str,
params: dict,
page_size: int = 200,
page_field: str = "page",
size_field: str = "limit",
data_path: tuple = ("data",),
list_key: str | None = None,
) -> tuple[list, list]:
records: list = []
pages_meta: list = []
for page_no, page_records, request_params, response in self.iter_paginated(
endpoint=endpoint,
params=params,
page_size=page_size,
page_field=page_field,
size_field=size_field,
data_path=data_path,
list_key=list_key,
):
records.extend(page_records)
pages_meta.append({"page": page_no, "request": request_params, "response": response})
return records, pages_meta
# ------------------------------------------------------------------ internal
def _dump(
self,
endpoint: str,
params: dict | None,
page_size: int,
pages: list[dict[str, Any]],
total_records: int,
):
filename = endpoint_to_filename(endpoint)
path = self.output_dir / filename
payload = {
"task_code": self.task_code,
"run_id": self.run_id,
"endpoint": endpoint,
"params": params or {},
"page_size": page_size,
"pages": pages,
"total_records": total_records,
"dumped_at": datetime.utcnow().isoformat() + "Z",
}
dump_json(path, payload, pretty=self.write_pretty)
self.last_dump = {
"file": str(path),
"endpoint": endpoint,
"pages": len(pages),
"records": total_records,
}