Files
feiqiu-ETL/tmp/compare_api_ods_fields.py
2026-02-04 21:39:01 +08:00

511 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
对比 API 返回字段和 ODS 表字段,找出 ODS 中缺少的 API 字段
"""
import os
import sys
import json
import requests
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import psycopg2
from psycopg2.extras import RealDictCursor
# 配置
DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test'
API_BASE = 'https://pc.ficoo.vip/apiprod/admin/v1/'
API_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjbGllbnQtdHlwZSI6IjQiLCJ1c2VyLXR5cGUiOiIxIiwiaHR0cDovL3NjaGVtYXMubWljcm9zb2Z0LmNvbS93cy8yMDA4LzA2L2lkZW50aXR5L2NsYWltcy9yb2xlIjoiMTIiLCJyb2xlLWlkIjoiMTIiLCJ0ZW5hbnQtaWQiOiIyNzkwNjgzMTYwNzA5OTU3Iiwibmlja25hbWUiOiLnp5_miLfnrqHnkIblkZjvvJrmganmgakxIiwic2l0ZS1pZCI6IjAiLCJtb2JpbGUiOiIxMzgxMDUwMjMwNCIsInNpZCI6IjI5NTA0ODk2NTgzOTU4NDUiLCJzdGFmZi1pZCI6IjMwMDk5MTg2OTE1NTkwNDUiLCJvcmctaWQiOiIwIiwicm9sZS10eXBlIjoiMyIsInJlZnJlc2hUb2tlbiI6IktlbTVsdHRqZ2tSUExOcVA2ajhNakdQYnFrNW5mRzBQNzRvMHE0b295VVE9IiwicmVmcmVzaEV4cGlyeVRpbWUiOiIyMDI2LzIvOCDkuIvljYg2OjU3OjA1IiwibmVlZENoZWNrVG9rZW4iOiJmYWxzZSIsImV4cCI6MTc3MDU0ODIyNSwiaXNzIjoidGVzdCIsImF1ZCI6IlVzZXIifQ.wJlm7pTqUzp769nUGdxx0e1bVMy4x9Prp9U_UMWQvlk'
STORE_ID = '2790685415443269'
TZ = ZoneInfo('Asia/Taipei')
# ODS 任务配置
ODS_SPECS = [
{
'code': 'ODS_ASSISTANT_ACCOUNT',
'table_name': 'billiards_ods.assistant_accounts_master',
'endpoint': '/PersonnelManagement/SearchAssistantInfo',
'data_path': ['data'],
'list_key': 'assistantInfos',
'requires_window': True,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_SETTLEMENT_RECORDS',
'table_name': 'billiards_ods.settlement_records',
'endpoint': '/Site/GetAllOrderSettleList',
'data_path': ['data'],
'list_key': 'settleList',
'requires_window': True,
'time_fields': ('rangeStartTime', 'rangeEndTime'),
'include_site_id': True,
},
{
'code': 'ODS_TABLE_USE',
'table_name': 'billiards_ods.table_fee_transactions',
'endpoint': '/Site/GetSiteTableOrderDetails',
'data_path': ['data'],
'list_key': 'siteTableUseDetailsList',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_ASSISTANT_LEDGER',
'table_name': 'billiards_ods.assistant_service_records',
'endpoint': '/AssistantPerformance/GetOrderAssistantDetails',
'data_path': ['data'],
'list_key': 'orderAssistantDetails',
'requires_window': True,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_ASSISTANT_ABOLISH',
'table_name': 'billiards_ods.assistant_cancellation_records',
'endpoint': '/AssistantPerformance/GetAbolitionAssistant',
'data_path': ['data'],
'list_key': 'abolitionAssistants',
'requires_window': True,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_STORE_GOODS_SALES',
'table_name': 'billiards_ods.store_goods_sales_records',
'endpoint': '/TenantGoods/GetGoodsSalesList',
'data_path': ['data'],
'list_key': 'orderGoodsLedgers',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_PAYMENT',
'table_name': 'billiards_ods.payment_transactions',
'endpoint': '/PayLog/GetPayLogListPage',
'data_path': ['data'],
'list_key': None,
'requires_window': False,
'time_fields': ('StartPayTime', 'EndPayTime'),
'include_site_id': True,
},
{
'code': 'ODS_REFUND',
'table_name': 'billiards_ods.refund_transactions',
'endpoint': '/Order/GetRefundPayLogList',
'data_path': ['data'],
'list_key': None,
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_PLATFORM_COUPON',
'table_name': 'billiards_ods.platform_coupon_redemption_records',
'endpoint': '/Promotion/GetOfflineCouponConsumePageList',
'data_path': ['data'],
'list_key': None,
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_MEMBER',
'table_name': 'billiards_ods.member_profiles',
'endpoint': '/MemberProfile/GetTenantMemberList',
'data_path': ['data'],
'list_key': 'tenantMemberInfos',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_MEMBER_CARD',
'table_name': 'billiards_ods.member_stored_value_cards',
'endpoint': '/MemberProfile/GetTenantMemberCardList',
'data_path': ['data'],
'list_key': 'tenantMemberCards',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_MEMBER_BALANCE',
'table_name': 'billiards_ods.member_balance_changes',
'endpoint': '/MemberProfile/GetMemberCardBalanceChange',
'data_path': ['data'],
'list_key': 'tenantMemberCardLogs',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_RECHARGE_SETTLE',
'table_name': 'billiards_ods.recharge_settlements',
'endpoint': '/Site/GetRechargeSettleList',
'data_path': ['data'],
'list_key': 'settleList',
'requires_window': True,
'time_fields': ('rangeStartTime', 'rangeEndTime'),
'include_site_id': True,
},
{
'code': 'ODS_GROUP_PACKAGE',
'table_name': 'billiards_ods.group_buy_packages',
'endpoint': '/PackageCoupon/QueryPackageCouponList',
'data_path': ['data'],
'list_key': 'packageCouponList',
'requires_window': False,
'time_fields': None,
'include_site_id': True,
},
{
'code': 'ODS_GROUP_BUY_REDEMPTION',
'table_name': 'billiards_ods.group_buy_redemption_records',
'endpoint': '/Site/GetSiteTableUseDetails',
'data_path': ['data'],
'list_key': 'siteTableUseDetailsList',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_INVENTORY_STOCK',
'table_name': 'billiards_ods.goods_stock_summary',
'endpoint': '/TenantGoods/GetGoodsStockReport',
'data_path': ['data'],
'list_key': None,
'requires_window': False,
'time_fields': None,
'include_site_id': True,
},
{
'code': 'ODS_INVENTORY_CHANGE',
'table_name': 'billiards_ods.goods_stock_movements',
'endpoint': '/GoodsStockManage/QueryGoodsOutboundReceipt',
'data_path': ['data'],
'list_key': 'queryDeliveryRecordsList',
'requires_window': True,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_TABLES',
'table_name': 'billiards_ods.site_tables_master',
'endpoint': '/Table/GetSiteTables',
'data_path': ['data'],
'list_key': 'siteTables',
'requires_window': False,
'time_fields': None,
'include_site_id': True,
},
{
'code': 'ODS_GOODS_CATEGORY',
'table_name': 'billiards_ods.stock_goods_category_tree',
'endpoint': '/TenantGoodsCategory/QueryPrimarySecondaryCategory',
'data_path': ['data'],
'list_key': 'goodsCategoryList',
'requires_window': False,
'time_fields': None,
'include_site_id': True,
},
{
'code': 'ODS_STORE_GOODS',
'table_name': 'billiards_ods.store_goods_master',
'endpoint': '/TenantGoods/GetGoodsInventoryList',
'data_path': ['data'],
'list_key': 'orderGoodsList',
'requires_window': False,
'time_fields': None,
'include_site_id': True,
'site_id_array': True, # 需要数组格式
},
{
'code': 'ODS_TABLE_FEE_DISCOUNT',
'table_name': 'billiards_ods.table_fee_discount_records',
'endpoint': '/Site/GetTaiFeeAdjustList',
'data_path': ['data'],
'list_key': 'taiFeeAdjustInfos',
'requires_window': False,
'time_fields': ('startTime', 'endTime'),
'include_site_id': True,
},
{
'code': 'ODS_TENANT_GOODS',
'table_name': 'billiards_ods.tenant_goods_master',
'endpoint': '/TenantGoods/QueryTenantGoods',
'data_path': ['data'],
'list_key': 'tenantGoodsList',
'requires_window': False,
'time_fields': None,
'include_site_id': True,
},
]
def get_ods_table_columns(conn, table_name: str) -> dict:
"""获取 ODS 表的字段结构"""
if '.' in table_name:
schema, name = table_name.split('.', 1)
else:
schema, name = 'public', table_name
sql = """
SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
"""
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(sql, (schema, name))
rows = cur.fetchall()
return {row['column_name'].lower(): row for row in rows}
def flatten_json_keys(obj, prefix='', depth=0) -> set:
"""递归展平 JSON 获取所有字段名,限制深度"""
if depth > 3: # 限制深度
return set()
keys = set()
if isinstance(obj, dict):
for k, v in obj.items():
full_key = f"{prefix}.{k}" if prefix else k
keys.add(k) # 添加不带前缀的
if isinstance(v, (dict, list)) and depth < 3:
keys.update(flatten_json_keys(v, full_key, depth + 1))
elif isinstance(obj, list):
for item in obj[:5]: # 只检查前5个
keys.update(flatten_json_keys(item, prefix, depth))
return keys
def call_api(endpoint: str, params: dict) -> dict:
"""调用 API"""
url = API_BASE.rstrip('/') + '/' + endpoint.lstrip('/')
headers = {
'Authorization': f'Bearer {API_TOKEN}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
try:
resp = requests.post(url, json=params, headers=headers, timeout=30)
resp.raise_for_status()
return resp.json()
except Exception as e:
print(f" API 调用异常: {e}")
return {}
def extract_list(payload: dict, data_path: list, list_key: str = None) -> list:
"""从响应中提取列表"""
cur = payload
for key in data_path:
if isinstance(cur, dict):
cur = cur.get(key)
else:
return []
if isinstance(cur, list):
return cur
if isinstance(cur, dict):
if list_key and list_key in cur:
return cur[list_key]
# 尝试常见的列表键
for k in ['list', 'rows', 'records', 'items', 'dataList']:
if k in cur and isinstance(cur[k], list):
return cur[k]
# 返回字典的第一个列表值
for v in cur.values():
if isinstance(v, list):
return v
return []
def get_api_sample_data(spec: dict, window_start: datetime, window_end: datetime) -> list:
"""从 API 获取示例数据"""
params = {'page': 1, 'limit': 50}
if spec.get('include_site_id'):
if spec.get('site_id_array'):
params['siteId'] = [int(STORE_ID)]
else:
params['siteId'] = int(STORE_ID)
time_fields = spec.get('time_fields')
if time_fields:
start_key, end_key = time_fields
params[start_key] = window_start.strftime('%Y-%m-%d %H:%M:%S')
params[end_key] = window_end.strftime('%Y-%m-%d %H:%M:%S')
payload = call_api(spec['endpoint'], params)
if not payload:
return []
records = extract_list(payload, spec['data_path'], spec.get('list_key'))
return records
def compare_fields(api_fields: set, ods_columns: dict) -> dict:
"""比较 API 字段和 ODS 列"""
ods_col_names = set(ods_columns.keys())
# 需要排除的 ODS 系统字段
system_cols = {
'payload', 'source_file', 'source_endpoint', 'fetched_at',
'content_hash', 'record_index', 'site_profile'
}
# siteProfile 嵌套字段 - 忽略这些门店配置字段
site_profile_fields = {
'address', 'full_address', 'latitude', 'longitude',
'shop_name', 'shop_status', 'site_label', 'site_type',
'tenant_site_region_id', 'attendance_distance', 'attendance_enabled',
'auto_light', 'avatar', 'business_tel', 'customer_service_qrcode',
'customer_service_wechat', 'fixed_pay_qrcode', 'light_status',
'light_token', 'light_type', 'prod_env', 'wifi_name', 'wifi_password',
'org_id', 'siteprofile', 'ewelink_client_id'
}
# API 字段标准化
api_fields_lower = {f.lower() for f in api_fields}
# 在 ODS 中缺失的 API 字段(排除系统字段和 siteProfile 字段)
missing_in_ods = api_fields_lower - ods_col_names - system_cols - site_profile_fields
# 在 API 中没有但 ODS 有的字段(可能是衍生字段)
ods_only = ods_col_names - api_fields_lower - system_cols
return {
'api_fields': sorted(api_fields_lower),
'ods_columns': sorted(ods_col_names),
'missing_in_ods': sorted(missing_in_ods),
'ods_only': sorted(ods_only),
}
def main():
print("=" * 80)
print("API vs ODS 字段对比分析")
print("=" * 80)
# 连接数据库
conn = psycopg2.connect(DSN)
print(f"数据库连接成功")
print(f"API: {API_BASE}")
print(f"门店 ID: {STORE_ID}")
# 时间窗口2025-12-01 到现在
now = datetime.now(TZ)
window_end = now
window_start = datetime(2025, 12, 1, 0, 0, 0, tzinfo=TZ)
print(f"时间窗口: {window_start.strftime('%Y-%m-%d %H:%M:%S')} ~ {window_end.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
results = {}
for spec in ODS_SPECS:
print(f"\n处理: {spec['code']}")
print(f" 表名: {spec['table_name']}")
print(f" 端点: {spec['endpoint']}")
# 获取 ODS 表结构
ods_columns = get_ods_table_columns(conn, spec['table_name'])
if not ods_columns:
print(f" [跳过] ODS 表不存在或无字段")
continue
print(f" ODS 字段数: {len(ods_columns)}")
# 获取 API 数据
records = get_api_sample_data(spec, window_start, window_end)
print(f" API 返回记录数: {len(records)}")
if not records:
results[spec['code']] = {
'table_name': spec['table_name'],
'endpoint': spec['endpoint'],
'api_records': 0,
'ods_columns': list(ods_columns.keys()),
'missing_in_ods': [],
'note': 'API 无返回数据'
}
continue
# 提取 API 字段
api_fields = set()
for rec in records[:20]: # 检查前20条
if isinstance(rec, dict):
api_fields.update(flatten_json_keys(rec))
print(f" API 字段数: {len(api_fields)}")
# 对比
comparison = compare_fields(api_fields, ods_columns)
results[spec['code']] = {
'table_name': spec['table_name'],
'endpoint': spec['endpoint'],
'api_records': len(records),
'api_fields_count': len(comparison['api_fields']),
'ods_columns_count': len(comparison['ods_columns']),
'missing_in_ods': comparison['missing_in_ods'],
'ods_only': comparison['ods_only'],
'api_fields': comparison['api_fields'],
'ods_columns': comparison['ods_columns'],
}
if comparison['missing_in_ods']:
print(f" [!] ODS 缺少 {len(comparison['missing_in_ods'])} 个字段:")
for f in comparison['missing_in_ods'][:10]:
print(f" - {f}")
if len(comparison['missing_in_ods']) > 10:
print(f" ... 还有 {len(comparison['missing_in_ods']) - 10}")
else:
print(f" [OK] ODS 已包含所有 API 字段")
conn.close()
# 输出汇总表格
print("\n")
print("=" * 80)
print("汇总报告 - 每个 ODS 表缺少的 API 字段")
print("=" * 80)
for code, data in results.items():
missing = data.get('missing_in_ods', [])
if missing or data.get('note'):
print(f"\n### {code}")
print(f"表名: `{data['table_name']}`")
print(f"端点: `{data['endpoint']}`")
print(f"API 记录数: {data.get('api_records', 0)}")
if missing:
print(f"\n**ODS 缺少的字段 ({len(missing)}):**\n")
print("| 字段名 | 说明 |")
print("|--------|------|")
for f in missing:
print(f"| `{f}` | |")
elif data.get('note'):
print(f"\n备注: {data['note']}")
# 显示没有缺失的表
print("\n\n### 已完整的表(无缺失字段)")
for code, data in results.items():
missing = data.get('missing_in_ods', [])
if not missing and not data.get('note'):
print(f"- {code}: `{data['table_name']}` [OK]")
# 保存详细结果
output_file = os.path.join(os.path.dirname(__file__), 'api_ods_comparison.json')
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n\n详细结果已保存至: {output_file}")
if __name__ == '__main__':
main()