190 lines
7.3 KiB
Python
190 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""一次性调研脚本:拉取全部团购详情并写入 ods.group_buy_package_details。
|
||
|
||
用法(cwd = C:\\NeoZQYY/):
|
||
python apps/etl/connectors/feiqiu/scripts/research_coupon_details.py
|
||
|
||
流程:
|
||
1. 从 ods.group_buy_packages 读取所有 coupon_id(id 列)
|
||
2. 串行调用 QueryPackageCouponInfo 详情接口(RateLimiter 5-20s)
|
||
3. 提取结构化字段 + 计算 content_hash + 保留原始 payload
|
||
4. UPSERT 写入 ods.group_buy_package_details
|
||
|
||
需求覆盖:附录 B 调研 3、4
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# ── 环境初始化 ──────────────────────────────────────────────────────────
|
||
# 加载根 .env(脚本 cwd 为 apps/etl/connectors/feiqiu/)
|
||
from dotenv import load_dotenv
|
||
|
||
_SCRIPT_DIR = Path(__file__).resolve().parent # scripts/
|
||
_FEIQIU_DIR = _SCRIPT_DIR.parent # apps/etl/connectors/feiqiu/
|
||
_REPO_ROOT = _FEIQIU_DIR.parents[3] # → connectors/ → etl/ → apps/ → root
|
||
|
||
load_dotenv(_REPO_ROOT / ".env")
|
||
|
||
# 必需环境变量校验
|
||
_REQUIRED_ENV = ("FETCH_ROOT", "EXPORT_ROOT", "PG_DSN", "TEST_DB_DSN")
|
||
_missing = [k for k in _REQUIRED_ENV if not os.environ.get(k)]
|
||
if _missing:
|
||
sys.exit(f"ERROR: 缺少必需环境变量: {', '.join(_missing)}")
|
||
|
||
TEST_DB_DSN = os.environ["TEST_DB_DSN"]
|
||
|
||
# 确保 feiqiu 目录在 sys.path 中,以便从仓库根目录运行时也能 import 本地模块
|
||
if str(_FEIQIU_DIR) not in sys.path:
|
||
sys.path.insert(0, str(_FEIQIU_DIR))
|
||
|
||
# ── 依赖导入 ──────────────────────────────────────────────────────────
|
||
from psycopg2.extras import Json # noqa: E402
|
||
|
||
from config.settings import AppConfig # noqa: E402
|
||
from api.client import APIClient # noqa: E402
|
||
from api.rate_limiter import RateLimiter # noqa: E402
|
||
from database.connection import DatabaseConnection # noqa: E402
|
||
|
||
# 复用 ods_tasks.py 中的字段提取逻辑
|
||
from tasks.ods.ods_tasks import _group_package_detail_process_fn # noqa: E402
|
||
|
||
|
||
def main():
|
||
# ── 1. 加载配置 ──────────────────────────────────────────────────
|
||
config = AppConfig.load()
|
||
print(f"✅ 配置加载完成 (store_id={config.get('app.store_id')})")
|
||
|
||
# ── 2. 连接测试库 ──────────────────────────────────────────────
|
||
db = DatabaseConnection(
|
||
dsn=TEST_DB_DSN,
|
||
session=config["db"].get("session", {}),
|
||
connect_timeout=config["db"].get("connect_timeout_sec"),
|
||
)
|
||
print(f"✅ 已连接测试库: {TEST_DB_DSN.split('@')[-1]}")
|
||
|
||
# ── 3. 查询所有 coupon_id ────────────────────────────────────
|
||
rows = db.query("SELECT DISTINCT id FROM ods.group_buy_packages ORDER BY id")
|
||
coupon_ids = [r["id"] for r in rows]
|
||
print(f"📋 共 {len(coupon_ids)} 个 coupon_id 待拉取")
|
||
|
||
if not coupon_ids:
|
||
print("⚠️ 没有找到任何 coupon_id,退出")
|
||
db.close()
|
||
return
|
||
|
||
# ── 4. 初始化 API 客户端 + 限流器 ────────────────────────────
|
||
api = APIClient(
|
||
base_url=config["api"]["base_url"],
|
||
token=config["api"]["token"],
|
||
timeout=config.get("api.timeout_sec", 20),
|
||
)
|
||
limiter = RateLimiter(min_interval=5.0, max_interval=20.0)
|
||
|
||
# ── 5. 串行拉取详情 ──────────────────────────────────────────
|
||
success_count = 0
|
||
fail_count = 0
|
||
skip_count = 0
|
||
|
||
for idx, cid in enumerate(coupon_ids, 1):
|
||
print(f"\n[{idx}/{len(coupon_ids)}] coupon_id={cid} ...", end=" ", flush=True)
|
||
|
||
try:
|
||
resp = api.get(
|
||
"/PackageCoupon/QueryPackageCouponInfo",
|
||
{"couponId": cid},
|
||
)
|
||
except Exception as e:
|
||
print(f"❌ API 错误: {e}")
|
||
fail_count += 1
|
||
if idx < len(coupon_ids):
|
||
limiter.wait()
|
||
continue
|
||
|
||
# 提取字段(复用 _group_package_detail_process_fn)
|
||
records = _group_package_detail_process_fn(resp)
|
||
if not records:
|
||
print("⚠️ 响应无有效数据,跳过")
|
||
skip_count += 1
|
||
if idx < len(coupon_ids):
|
||
limiter.wait()
|
||
continue
|
||
|
||
record = records[0]
|
||
|
||
# ── 6. UPSERT 写入 ──────────────────────────────────────
|
||
try:
|
||
_upsert_detail(db, record)
|
||
db.commit()
|
||
success_count += 1
|
||
print(f"✅ 写入成功 (hash={record['content_hash'][:8]}...)")
|
||
except Exception as e:
|
||
db.rollback()
|
||
print(f"❌ 写入失败: {e}")
|
||
fail_count += 1
|
||
|
||
# 限流等待(最后一条不等)
|
||
if idx < len(coupon_ids):
|
||
waited = limiter.wait()
|
||
if not waited:
|
||
print("⚠️ 等待被中断")
|
||
break
|
||
|
||
# ── 7. 汇总 ──────────────────────────────────────────────────
|
||
print("\n" + "=" * 50)
|
||
print(f"📊 拉取完成: 成功={success_count}, 失败={fail_count}, 跳过={skip_count}, 总计={len(coupon_ids)}")
|
||
print("=" * 50)
|
||
|
||
db.close()
|
||
|
||
|
||
def _upsert_detail(db: DatabaseConnection, record: dict) -> None:
|
||
"""UPSERT 单条详情记录到 ods.group_buy_package_details。
|
||
|
||
ON CONFLICT (coupon_id) 时更新所有字段。
|
||
"""
|
||
columns = [
|
||
"coupon_id", "package_name", "duration", "start_time", "end_time",
|
||
"add_start_clock", "add_end_clock", "is_enabled", "is_delete",
|
||
"site_id", "tenant_id", "create_time", "creator_name",
|
||
"table_area_ids", "table_area_names", "assistant_services",
|
||
"groupon_site_infos", "package_services", "coupon_details_list",
|
||
"content_hash", "payload",
|
||
]
|
||
|
||
# JSONB 字段需要用 Json 适配器
|
||
_JSONB_COLS = {
|
||
"table_area_ids", "table_area_names", "assistant_services",
|
||
"groupon_site_infos", "package_services", "coupon_details_list",
|
||
"payload",
|
||
}
|
||
|
||
values = []
|
||
for col in columns:
|
||
val = record.get(col)
|
||
if col in _JSONB_COLS and val is not None:
|
||
val = Json(val)
|
||
values.append(val)
|
||
|
||
col_list = ", ".join(columns)
|
||
placeholders = ", ".join(["%s"] * len(columns))
|
||
|
||
# 除 coupon_id 外的所有列用于 UPDATE
|
||
update_cols = [c for c in columns if c != "coupon_id"]
|
||
update_set = ", ".join(f"{c} = EXCLUDED.{c}" for c in update_cols)
|
||
|
||
sql = (
|
||
f"INSERT INTO ods.group_buy_package_details ({col_list}) "
|
||
f"VALUES ({placeholders}) "
|
||
f"ON CONFLICT (coupon_id) DO UPDATE SET {update_set}, "
|
||
f"fetched_at = now()"
|
||
)
|
||
|
||
db.execute(sql, values)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|