Files
Neo-ZQYY/scripts/ops/_fetch_settlement_data_manual.py

218 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
手动获取飞球 API 结账数据2026-02-01 到现在)
背景SPI 警告显示结账数据只到 2026-02-14存在约 2 周数据延迟。
需要手动调用 API 获取最新的结账数据,确认是否为 API 数据源问题。
用法:
cd C:/NeoZQYY
python scripts/ops/_fetch_settlement_data_manual.py
"""
import json
import os
import sys
from datetime import datetime, date
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
# 加载环境变量
from dotenv import load_dotenv
load_dotenv(project_root / ".env")
# 验证必需的环境变量
required_vars = ["SYSTEM_LOG_ROOT"]
for var in required_vars:
if not os.environ.get(var):
raise RuntimeError(f"环境变量 {var} 未设置,请检查 .env 文件")
# 导入 ETL 模块
sys.path.insert(0, str(project_root / "apps/etl/connectors/feiqiu"))
from api.client import APIClient
def main():
"""主函数:调用飞球 API 获取结账数据"""
# 从环境变量读取 API 配置
api_base = os.environ.get("API_BASE")
api_token = os.environ.get("API_TOKEN")
if not api_base or not api_token:
# 尝试从 ETL 配置文件读取
etl_env_path = project_root / "apps/etl/connectors/feiqiu/.env"
if etl_env_path.exists():
load_dotenv(etl_env_path)
api_base = os.environ.get("API_BASE")
api_token = os.environ.get("API_TOKEN")
if not api_base or not api_token:
raise RuntimeError("API_BASE 或 API_TOKEN 未配置,请检查 .env 文件")
print(f"API Base URL: {api_base}")
print(f"API Token: {api_token[:20]}..." if api_token else "API Token: 未设置")
# 创建 API 客户端
client = APIClient(
base_url=api_base,
token=api_token,
timeout=30,
retry_max=3
)
# 设置查询参数2026-02-01 00:00:00 到现在
start_time = "2026-02-01 00:00:00"
end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
params = {
"siteId": 0, # 0 表示所有门店
"rangeStartTime": start_time,
"rangeEndTime": end_time
}
print(f"\n查询参数:")
print(f" 时间范围: {start_time} ~ {end_time}")
print(f" 门店ID: {params['siteId']} (0=所有门店)")
# 调用 API
endpoint = "/Site/GetAllOrderSettleList"
print(f"\n调用 API: {endpoint}")
try:
# 获取分页数据
records, pages_meta = client.get_paginated(
endpoint=endpoint,
params=params,
page_size=200,
data_path=("data",),
list_key="settleList"
)
print(f"\n✅ API 调用成功")
print(f"总页数: {len(pages_meta)}")
print(f"总记录数: {len(records)}")
if records:
# 分析数据时间分布
pay_times = []
for record in records:
pay_time = record.get("payTime")
if pay_time:
pay_times.append(pay_time)
if pay_times:
pay_times.sort()
print(f"\n📊 数据时间分布:")
print(f" 最早结账时间: {pay_times[0]}")
print(f" 最晚结账时间: {pay_times[-1]}")
print(f" 有效结账记录: {len(pay_times)}/{len(records)}")
# 按日期统计
date_counts = {}
for pay_time in pay_times:
try:
# 提取日期部分
date_str = pay_time.split()[0] # "2026-02-14 10:30:00" -> "2026-02-14"
date_counts[date_str] = date_counts.get(date_str, 0) + 1
except:
continue
print(f"\n📅 按日期统计 (前10天):")
for date_str in sorted(date_counts.keys())[:10]:
print(f" {date_str}: {date_counts[date_str]} 条记录")
if len(date_counts) > 10:
print(f" ... (共 {len(date_counts)} 天有数据)")
# 保存结果到文件
output_dir = Path(os.environ["SYSTEM_LOG_ROOT"])
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = output_dir / f"settlement_manual_fetch_{timestamp}.json"
result = {
"query_time": datetime.now().isoformat(),
"params": params,
"endpoint": endpoint,
"total_pages": len(pages_meta),
"total_records": len(records),
"records": records,
"pages_meta": pages_meta
}
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"\n💾 结果已保存到: {output_file}")
# 生成简要报告
report_file = output_dir / f"settlement_analysis_{timestamp}.md"
with open(report_file, "w", encoding="utf-8") as f:
f.write(f"# 飞球 API 结账数据手动获取报告\n\n")
f.write(f"**查询时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**查询范围**: {start_time} ~ {end_time}\n\n")
f.write(f"**API 端点**: {endpoint}\n\n")
f.write(f"**结果统计**:\n")
f.write(f"- 总页数: {len(pages_meta)}\n")
f.write(f"- 总记录数: {len(records)}\n")
if records and pay_times:
f.write(f"- 最早结账时间: {pay_times[0]}\n")
f.write(f"- 最晚结账时间: {pay_times[-1]}\n")
f.write(f"- 有效结账记录: {len(pay_times)}/{len(records)}\n\n")
f.write(f"**按日期统计**:\n")
for date_str in sorted(date_counts.keys()):
f.write(f"- {date_str}: {date_counts[date_str]} 条记录\n")
f.write(f"\n**数据文件**: {output_file.name}\n")
print(f"📋 分析报告已保存到: {report_file}")
# 关键发现
if records and pay_times:
latest_date = pay_times[-1].split()[0]
today = date.today().strftime("%Y-%m-%d")
print(f"\n🔍 关键发现:")
print(f" API 最新数据日期: {latest_date}")
print(f" 今天日期: {today}")
if latest_date < today:
from datetime import datetime as dt
latest_dt = dt.strptime(latest_date, "%Y-%m-%d")
today_dt = dt.strptime(today, "%Y-%m-%d")
days_behind = (today_dt - latest_dt).days
print(f" ⚠️ 数据延迟: {days_behind}")
else:
print(f" ✅ 数据是最新的")
except Exception as e:
print(f"\n❌ API 调用失败: {e}")
# 保存错误信息
output_dir = Path(os.environ["SYSTEM_LOG_ROOT"])
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
error_file = output_dir / f"settlement_fetch_error_{timestamp}.txt"
with open(error_file, "w", encoding="utf-8") as f:
f.write(f"飞球 API 结账数据获取失败\n")
f.write(f"时间: {datetime.now().isoformat()}\n")
f.write(f"端点: {endpoint}\n")
f.write(f"参数: {json.dumps(params, ensure_ascii=False, indent=2)}\n")
f.write(f"错误: {str(e)}\n")
print(f"错误信息已保存到: {error_file}")
raise
if __name__ == "__main__":
main()