#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 分析手动获取的结账数据,检查时间分布 用法: cd C:/NeoZQYY python scripts/ops/_analyze_settlement_data.py """ import json import os import sys from datetime import datetime, date from pathlib import Path from collections import defaultdict # 添加项目根目录到 Python 路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) # 加载环境变量 from dotenv import load_dotenv load_dotenv(project_root / ".env") def main(): """分析结账数据的时间分布""" # 查找最新的结账数据文件 log_dir = Path(os.environ["SYSTEM_LOG_ROOT"]) settlement_files = list(log_dir.glob("settlement_manual_fetch_*.json")) if not settlement_files: print("❌ 未找到结账数据文件") return # 使用最新的文件 latest_file = max(settlement_files, key=lambda f: f.stat().st_mtime) print(f"📂 分析文件: {latest_file.name}") # 读取数据 with open(latest_file, "r", encoding="utf-8") as f: data = json.load(f) records = data.get("records", []) print(f"📊 总记录数: {len(records)}") if not records: print("❌ 没有记录数据") return # 分析时间分布 pay_times = [] date_counts = defaultdict(int) for record in records: # 结账数据在 settleList 字段中 settle_data = record.get("settleList", {}) pay_time = settle_data.get("payTime") if pay_time: pay_times.append(pay_time) # 提取日期部分 try: date_str = pay_time.split()[0] # "2026-02-14 10:30:00" -> "2026-02-14" date_counts[date_str] += 1 except: continue if not pay_times: print("❌ 没有有效的 payTime 数据") return pay_times.sort() print(f"\n🕐 时间分布:") print(f" 最早结账时间: {pay_times[0]}") print(f" 最晚结账时间: {pay_times[-1]}") print(f" 有效结账记录: {len(pay_times)}/{len(records)}") # 按日期统计 print(f"\n📅 按日期统计:") sorted_dates = sorted(date_counts.keys()) for date_str in sorted_dates: count = date_counts[date_str] print(f" {date_str}: {count:4d} 条记录") print(f"\n总计: {len(sorted_dates)} 天有数据") # 检查数据延迟 latest_date = pay_times[-1].split()[0] today = date.today().strftime("%Y-%m-%d") print(f"\n🔍 数据延迟检查:") print(f" API 最新数据日期: {latest_date}") print(f" 今天日期: {today}") if latest_date < today: from datetime import datetime as dt latest_dt = dt.strptime(latest_date, "%Y-%m-%d") today_dt = dt.strptime(today, "%Y-%m-%d") days_behind = (today_dt - latest_dt).days print(f" ⚠️ 数据延迟: {days_behind} 天") else: print(f" ✅ 数据是最新的") # 检查最近几天的数据 print(f"\n📈 最近 7 天数据:") recent_dates = sorted_dates[-7:] if len(sorted_dates) >= 7 else sorted_dates for date_str in recent_dates: count = date_counts[date_str] print(f" {date_str}: {count:4d} 条记录") # 生成更新的报告 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_file = log_dir / f"settlement_detailed_analysis_{timestamp}.md" with open(report_file, "w", encoding="utf-8") as f: f.write(f"# 飞球 API 结账数据详细分析报告\n\n") f.write(f"**分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"**数据源文件**: {latest_file.name}\n\n") f.write(f"**数据统计**:\n") f.write(f"- 总记录数: {len(records)}\n") f.write(f"- 有效结账记录: {len(pay_times)}\n") f.write(f"- 最早结账时间: {pay_times[0]}\n") f.write(f"- 最晚结账时间: {pay_times[-1]}\n") f.write(f"- 数据覆盖天数: {len(sorted_dates)} 天\n\n") f.write(f"**数据延迟检查**:\n") f.write(f"- API 最新数据日期: {latest_date}\n") f.write(f"- 今天日期: {today}\n") if latest_date < today: f.write(f"- ⚠️ 数据延迟: {days_behind} 天\n\n") else: f.write(f"- ✅ 数据是最新的\n\n") f.write(f"**按日期统计** (共 {len(sorted_dates)} 天):\n") for date_str in sorted_dates: count = date_counts[date_str] f.write(f"- {date_str}: {count:4d} 条记录\n") f.write(f"\n**最近 7 天数据**:\n") for date_str in recent_dates: count = date_counts[date_str] f.write(f"- {date_str}: {count:4d} 条记录\n") print(f"\n📋 详细分析报告已保存到: {report_file}") if __name__ == "__main__": main()