# -*- coding: utf-8 -*- """ 对比“近期记录”与“历史记录(Former)”接口: - 是否能正确响应(HTTP + API code==0) - 返回字段(基于 sample records 的 JSON path)是否一致 默认时间窗口(与 ETL 窗口语义一致,end 为次日 00:00:00): - 近期:2025-12-01 ~ 2025-12-15(end=2025-12-16 00:00:00) - 历史:2025-08-01 ~ 2025-08-15(end=2025-08-16 00:00:00) """ from __future__ import annotations import argparse import json import sys from dataclasses import asdict, dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any, Iterable from dateutil import parser as dtparser from zoneinfo import ZoneInfo PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from api.client import APIClient from api.endpoint_routing import derive_former_endpoint as derive_former_endpoint_shared from config.settings import AppConfig from models.parsers import TypeParser from tasks.ods_json_archive_task import EndpointSpec, OdsJsonArchiveTask CHINESE_NAMES: dict[str, str] = { "/MemberProfile/GetMemberCardBalanceChange": "会员余额变动", "/AssistantPerformance/GetOrderAssistantDetails": "助教服务记录", "/AssistantPerformance/GetAbolitionAssistant": "助教撤销/作废记录", "/TenantGoods/GetGoodsSalesList": "商品销售记录", "/Site/GetSiteTableUseDetails": "团购核销记录", "/Site/GetSiteTableOrderDetails": "台费订单明细", "/Site/GetTaiFeeAdjustList": "台费调整/优惠记录", "/GoodsStockManage/QueryGoodsOutboundReceipt": "出库单/出库记录", "/Promotion/GetOfflineCouponConsumePageList": "平台券核销记录", "/Order/GetRefundPayLogList": "退款记录", "/Site/GetAllOrderSettleList": "结账记录", "/Site/GetRechargeSettleList": "充值结算记录", "/PayLog/GetPayLogListPage": "支付记录", } @dataclass class EndpointCheckResult: name_zh: str recent_endpoint: str recent_ok: bool former_endpoint: str former_ok: bool has_schema_diff: str # "是" | "否" | "未知" diff_detail: str recent_records: int | None = None former_records: int | None = None recent_error: str | None = None former_error: str | None = None extracted_list_key: str | None = None def _reconfigure_stdout_utf8(): try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass def derive_former_endpoint(endpoint: str) -> str | None: # backward compatible wrapper: keep local name but delegate to shared router return derive_former_endpoint_shared(endpoint) def _parse_day_start(d: str, tz: ZoneInfo) -> datetime: dt = dtparser.parse(d) if dt.tzinfo is None: dt = dt.replace(tzinfo=tz) else: dt = dt.astimezone(tz) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _window_from_dates(start_date: str, end_date_inclusive: str, tz: ZoneInfo) -> tuple[datetime, datetime]: start = _parse_day_start(start_date, tz) end_inclusive = _parse_day_start(end_date_inclusive, tz) end_exclusive = end_inclusive + timedelta(days=1) return start, end_exclusive def _build_window_params( window_style: str, store_id: int, window_start: datetime, window_end: datetime, tz: ZoneInfo, ) -> dict: if window_style == "none": return {} if window_style == "site": return {"siteId": store_id} if window_style == "range": return { "siteId": store_id, "rangeStartTime": TypeParser.format_timestamp(window_start, tz), "rangeEndTime": TypeParser.format_timestamp(window_end, tz), } if window_style == "pay": return { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, tz), "EndPayTime": TypeParser.format_timestamp(window_end, tz), } return { "siteId": store_id, "startTime": TypeParser.format_timestamp(window_start, tz), "endTime": TypeParser.format_timestamp(window_end, tz), } def _extract_records(payload: dict, spec: EndpointSpec) -> tuple[list, str | None]: # 优先使用 spec.list_key;若拿不到数据,再尝试自动推断(None)。 records_primary = APIClient._extract_list(payload, spec.data_path, spec.list_key) if records_primary: return records_primary, spec.list_key records_fallback = APIClient._extract_list(payload, spec.data_path, None) if records_fallback: return records_fallback, None return [], spec.list_key def _walk_paths(obj: Any, prefix: str, out: set[str], max_depth: int, depth: int, sample_list_elems: int): if depth > max_depth: return if isinstance(obj, dict): for k, v in obj.items(): if not isinstance(k, str): k = str(k) p = f"{prefix}.{k}" if prefix else k out.add(p) _walk_paths(v, p, out, max_depth, depth + 1, sample_list_elems) elif isinstance(obj, list): p = f"{prefix}[]" if prefix else "[]" out.add(p) for v in obj[:sample_list_elems]: _walk_paths(v, p, out, max_depth, depth + 1, sample_list_elems) def _schema_from_records(records: list, max_records: int, max_depth: int) -> set[str]: paths: set[str] = set() for rec in (records or [])[:max_records]: _walk_paths(rec, "", paths, max_depth=max_depth, depth=0, sample_list_elems=5) return paths def _schema_from_data(payload: dict, data_path: tuple[str, ...], max_depth: int) -> set[str]: cur: Any = payload for k in data_path: if isinstance(cur, dict): cur = cur.get(k) else: cur = None if cur is None: break paths: set[str] = set() _walk_paths(cur, "", paths, max_depth=max_depth, depth=0, sample_list_elems=5) return paths def _cell(text: str) -> str: # markdown table cell escape s = (text or "").replace("|", "\\|").replace("\n", "
") return s def _format_diff(recent_paths: set[str], former_paths: set[str], limit: int = 60) -> tuple[str, str]: if recent_paths == former_paths: return "否", "" only_recent = sorted(recent_paths - former_paths) only_former = sorted(former_paths - recent_paths) parts: list[str] = [] if only_recent: truncated = only_recent[:limit] suffix = "" if len(only_recent) <= limit else f" ...(+{len(only_recent) - limit})" parts.append(f"仅近期({len(only_recent)}): " + ", ".join(truncated) + suffix) if only_former: truncated = only_former[:limit] suffix = "" if len(only_former) <= limit else f" ...(+{len(only_former) - limit})" parts.append(f"仅历史({len(only_former)}): " + ", ".join(truncated) + suffix) return "是", "\n".join(parts).strip() def _post_first_page( client: APIClient, endpoint: str, params: dict, page_size: int, spec: EndpointSpec, ) -> tuple[dict, list]: # 只拉取第 1 页,用于“能否响应”与字段对比 payload: dict | None = None records: list = [] for _, page_records, _, raw in client.iter_paginated( endpoint=endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, page_end=1, ): payload = raw records = page_records or [] break return (payload or {}), (records or []) def _load_specs_for_range_only() -> list[EndpointSpec]: # 以 ODS_JSON_ARCHIVE 的 ENDPOINTS 为准,筛选出“可定义时间范围”的接口 specs: list[EndpointSpec] = [] for spec in OdsJsonArchiveTask.ENDPOINTS: if spec.window_style in ("start_end", "range", "pay"): specs.append(spec) return specs def main() -> int: _reconfigure_stdout_utf8() ap = argparse.ArgumentParser() ap.add_argument("--recent-start", default="2025-12-01") ap.add_argument("--recent-end", default="2025-12-15") ap.add_argument("--former-start", default="2025-08-01") ap.add_argument("--former-end", default="2025-08-15") ap.add_argument("--page-size", type=int, default=50) ap.add_argument("--max-records", type=int, default=50) ap.add_argument("--max-depth", type=int, default=5) ap.add_argument( "--out", default=str(Path(__file__).with_name("recent_vs_former_report.md")), help="输出 markdown 路径", ) ap.add_argument( "--out-json", default=str(Path(__file__).with_name("recent_vs_former_report.json")), help="输出 json 路径(含错误信息与统计)", ) args = ap.parse_args() cfg = AppConfig.load().config tz = ZoneInfo(cfg["app"]["timezone"]) store_id = int(cfg["app"]["store_id"]) recent_start, recent_end = _window_from_dates(args.recent_start, args.recent_end, tz) former_start, former_end = _window_from_dates(args.former_start, args.former_end, tz) if not cfg["api"].get("token"): raise SystemExit("缺少 api.token(请在 .env 配置 API_TOKEN 或 FICOO_TOKEN)") client = APIClient( base_url=cfg["api"]["base_url"], token=cfg["api"]["token"], timeout=int(cfg["api"].get("timeout_sec") or 20), retry_max=int(cfg["api"].get("retries", {}).get("max_attempts") or 3), headers_extra=cfg["api"].get("headers_extra") or {}, ) common_params = cfg["api"].get("params", {}) or {} if not isinstance(common_params, dict): common_params = {} results: list[EndpointCheckResult] = [] specs = _load_specs_for_range_only() for spec in specs: name_zh = CHINESE_NAMES.get(spec.endpoint) or Path(spec.endpoint).name former_endpoint = derive_former_endpoint(spec.endpoint) # recent recent_params = dict(common_params) recent_params.update(_build_window_params(spec.window_style, store_id, recent_start, recent_end, tz)) recent_ok = False recent_records: list = [] recent_payload: dict = {} recent_err: str | None = None try: recent_payload, recent_records = _post_first_page( client=client, endpoint=spec.endpoint, params=recent_params, page_size=args.page_size, spec=spec, ) recent_ok = True except Exception as e: recent_err = f"{type(e).__name__}: {e}" # former former_params = dict(common_params) former_params.update(_build_window_params(spec.window_style, store_id, former_start, former_end, tz)) former_ok = False former_records: list = [] former_payload: dict = {} former_err: str | None = None if not former_endpoint: former_err = "未提供历史记录接口 path" else: try: former_payload, former_records = _post_first_page( client=client, endpoint=former_endpoint, params=former_params, page_size=args.page_size, spec=spec, ) former_ok = True except Exception as e: former_err = f"{type(e).__name__}: {e}" extracted_key: str | None = spec.list_key if recent_ok and former_ok: # 用“更能提取出 records 的方式”来做字段对比 recent_extracted, recent_key_used = _extract_records(recent_payload, spec) former_extracted, former_key_used = _extract_records(former_payload, spec) extracted_key = recent_key_used or former_key_used or spec.list_key if recent_extracted and former_extracted: recent_schema = _schema_from_records(recent_extracted, args.max_records, args.max_depth) former_schema = _schema_from_records(former_extracted, args.max_records, args.max_depth) has_diff, detail = _format_diff(recent_schema, former_schema) elif (not recent_extracted) and (not former_extracted): has_diff, detail = "未知", "两侧 records 均为空,无法判断字段差异" else: has_diff, detail = ( "未知", f"一侧 records 为空(近期={len(recent_extracted)} 历史={len(former_extracted)}),无法判断字段差异", ) else: if former_endpoint is None: has_diff, detail = "未知", "无历史记录接口,跳过字段对比" else: has_diff, detail = "未知", "请求失败,无法对比字段" results.append( EndpointCheckResult( name_zh=name_zh, recent_endpoint=spec.endpoint, recent_ok=recent_ok, former_endpoint=former_endpoint or "无", former_ok=former_ok, has_schema_diff=has_diff, diff_detail=detail, recent_records=len(recent_records) if recent_ok else None, former_records=len(former_records) if former_ok else None, recent_error=recent_err, former_error=former_err, extracted_list_key=extracted_key, ) ) # markdown report out_md = Path(args.out) out_json = Path(args.out_json) out_md.parent.mkdir(parents=True, exist_ok=True) header = [ "# 近期记录 vs 历史记录(Former) 接口对比报告", "", f"- 近期窗口: `{recent_start.isoformat()}` ~ `{recent_end.isoformat()}`(end 为次日 00:00:00)", f"- 历史窗口: `{former_start.isoformat()}` ~ `{former_end.isoformat()}`(end 为次日 00:00:00)", f"- store_id: `{store_id}`", f"- base_url: `{cfg['api']['base_url']}`", "", "表头:接口名称(中文);近期记录接口 path;近期记录是否返回;历史记录接口 path;历史记录是否返回;是否存在返回字段差异;差异字段详情。", "", "| 接口名称(中文) | 近期记录接口 path | 近期记录是否返回 | 历史记录接口 path | 历史记录是否返回 | 是否存在返回字段差异 | 差异字段详情 |", "| --- | --- | --- | --- | --- | --- | --- |", ] rows: list[str] = [] for r in results: rows.append( "| " + " | ".join( [ _cell(r.name_zh), _cell(r.recent_endpoint), "是" if r.recent_ok else "否", _cell(r.former_endpoint), "是" if r.former_ok else "否", _cell(r.has_schema_diff), _cell(r.diff_detail or ""), ] ) + " |" ) out_md.write_text("\n".join(header + rows) + "\n", encoding="utf-8") out_json.write_text( json.dumps( { "recent_window": {"start": recent_start.isoformat(), "end": recent_end.isoformat()}, "former_window": {"start": former_start.isoformat(), "end": former_end.isoformat()}, "store_id": store_id, "base_url": cfg["api"]["base_url"], "results": [asdict(r) for r in results], }, ensure_ascii=False, indent=2, ) + "\n", encoding="utf-8", ) print(f"OK: wrote {out_md}") print(f"OK: wrote {out_json}") return 0 if __name__ == "__main__": raise SystemExit(main())