# -*- coding: utf-8 -*- """ 一次性导出:可用台费卡明细 → CSV 筛选条件:member_card_type_name='台费卡', status=1, is_delete=0, balance>0 输出路径:export/SYSTEM/REPORTS/biz_export/ """ from __future__ import annotations import csv import os from datetime import datetime from pathlib import Path from _env_paths import get_output_path from dotenv import load_dotenv # 加载环境变量 _ROOT = Path(__file__).resolve().parents[2] load_dotenv(_ROOT / ".env", override=False) # 输出目录:复用 SYSTEM_ANALYZE_ROOT 的上级 REPORTS 目录,建 biz_export 子目录 _REPORTS_ROOT = Path(os.environ.get("SYSTEM_ANALYZE_ROOT", "")) if not _REPORTS_ROOT: raise RuntimeError("SYSTEM_ANALYZE_ROOT 未定义,请检查 .env") OUT_DIR = _REPORTS_ROOT.parent / "biz_export" OUT_DIR.mkdir(parents=True, exist_ok=True) # 数据库连接 — 使用 ETL 测试库(台费卡数据在 ETL 库的 ODS/DWD 层) DSN = os.environ.get("PG_DSN") if not DSN: raise RuntimeError("PG_DSN 未定义,请检查 .env") SQL = """ WITH latest_card AS ( SELECT DISTINCT ON (id) id, member_card_type_name, balance, create_time, member_name, member_mobile, last_consume_time, status, is_delete, tenant_member_id FROM ods.member_stored_value_cards WHERE member_card_type_name = '台费卡' ORDER BY id, fetched_at DESC ), last_balance_change AS ( SELECT DISTINCT ON (tenant_member_card_id) tenant_member_card_id, change_amount, change_time FROM dwd.dwd_member_balance_change ORDER BY tenant_member_card_id, change_time DESC ), member_last_consume AS ( SELECT member_id, MAX(pay_time) AS last_settle_time FROM dwd.dwd_settlement_head WHERE member_id IS NOT NULL GROUP BY member_id ) SELECT c.id AS "卡号ID", c.member_card_type_name AS "卡类型", c.balance AS "余额", c.create_time AS "卡片生成时间", c.member_name AS "客户昵称", c.member_mobile AS "客户联系方式", c.last_consume_time AS "本卡最后使用时间", bc.change_amount AS "最后使用的金额", mc.last_settle_time AS "所属客户最后消费时间" FROM latest_card c LEFT JOIN last_balance_change bc ON bc.tenant_member_card_id = c.id LEFT JOIN member_last_consume mc ON mc.member_id = c.tenant_member_id WHERE c.status = 1 AND c.is_delete = 0 AND c.balance > 0 ORDER BY c.balance DESC """ HEADERS = [ "卡号ID", "卡类型", "余额", "卡片生成时间", "客户昵称", "客户联系方式", "本卡最后使用时间", "最后使用的金额", "所属客户最后消费时间", ] def main(): import psycopg2 with psycopg2.connect(DSN) as conn, conn.cursor() as cur: cur.execute(SQL) rows = cur.fetchall() ts = datetime.now().strftime("%Y%m%d_%H%M%S") out_file = OUT_DIR / f"台费卡明细_{ts}.csv" with open(out_file, "w", newline="", encoding="utf-8-sig") as f: writer = csv.writer(f) writer.writerow(HEADERS) for row in rows: writer.writerow([str(v) if v is not None else "" for v in row]) print(f"导出完成:{len(rows)} 行 → {out_file}") if __name__ == "__main__": main()