# -*- coding: utf-8 -*- """ 回填脚本:dws_finance_area_daily + dws_finance_board_cache 对已有日期范围批量调用 FinanceAreaDailyTask.transform 逻辑, 回填完成后触发 DWS_FINANCE_BOARD_CACHE 重算所有已完成周期缓存。 用法: cd C:\\NeoZQYY uv run python scripts/ops/backfill_finance_area_daily.py \\ --site-id 1 --start-date 2025-07-16 --end-date 2026-03-28 Requirements: 2.7, 3.4, 5.1 """ from __future__ import annotations import argparse import logging import sys from datetime import date, datetime, timedelta from pathlib import Path # ── 环境加载 ────────────────────────────────────────────────────────────────── sys.path.insert(0, str(Path(__file__).resolve().parent)) from _env_paths import ensure_repo_root ensure_repo_root() import os from dotenv import load_dotenv _ROOT = Path(__file__).resolve().parents[2] load_dotenv(_ROOT / ".env", override=False) PG_DSN = os.environ.get("PG_DSN") if not PG_DSN: print("错误:PG_DSN 环境变量未定义,请在根 .env 中配置", file=sys.stderr) sys.exit(1) import psycopg2 import psycopg2.extras from neozqyy_shared.area_mapping import ALL_AREA_CODES from neozqyy_shared.datetime_utils import biz_date_sql_expr logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("backfill_finance_area_daily") # ── 导入 ETL 纯函数 ────────────────────────────────────────────────────────── # transform_area_daily 是纯函数,不依赖数据库连接对象 # 使用 importlib 加载,绕过 ETL 全量导入链(避免 utils.windowing 等依赖) import importlib.util as _ilu import types as _types _ETL_ROOT = os.path.join(_ROOT, "apps", "etl", "connectors", "feiqiu") # stub 模块(仅提供类型占位,纯函数不需要真实基类) _tasks_pkg = _types.ModuleType("tasks") _tasks_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks")] _dws_pkg = _types.ModuleType("tasks.dws") _dws_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks", "dws")] sys.modules.setdefault("tasks", _tasks_pkg) sys.modules.setdefault("tasks.dws", _dws_pkg) _base_stub = _types.ModuleType("tasks.dws.base_dws_task") _base_stub.TaskContext = type("TaskContext", (), {}) _base_stub.BaseDwsTask = type("BaseDwsTask", (), {}) sys.modules["tasks.dws.base_dws_task"] = _base_stub _fin_stub = _types.ModuleType("tasks.dws.finance_base_task") _fin_stub.FinanceBaseTask = type("FinanceBaseTask", (), {}) sys.modules["tasks.dws.finance_base_task"] = _fin_stub # 加载 finance_area_daily 纯函数 _fad_path = os.path.join(_ETL_ROOT, "tasks", "dws", "finance_area_daily.py") _fad_spec = _ilu.spec_from_file_location("tasks.dws.finance_area_daily", _fad_path) _fad_mod = _ilu.module_from_spec(_fad_spec) _fad_mod.__package__ = "tasks.dws" sys.modules["tasks.dws.finance_area_daily"] = _fad_mod _fad_spec.loader.exec_module(_fad_mod) transform_area_daily = _fad_mod.transform_area_daily # 加载 finance_board_cache 纯函数 _fbc_path = os.path.join(_ETL_ROOT, "tasks", "dws", "finance_board_cache.py") _fbc_spec = _ilu.spec_from_file_location("tasks.dws.finance_board_cache", _fbc_path) _fbc_mod = _ilu.module_from_spec(_fbc_spec) _fbc_mod.__package__ = "tasks.dws" sys.modules["tasks.dws.finance_board_cache"] = _fbc_mod _fbc_spec.loader.exec_module(_fbc_mod) COMPLETED_PERIODS = _fbc_mod.COMPLETED_PERIODS compute_fingerprint = _fbc_mod.compute_fingerprint _calc_period_date_range = _fbc_mod._calc_period_date_range # ── 数据提取 ────────────────────────────────────────────────────────────────── def extract_settlement_with_area( conn, site_id: int, stat_date: date, cutoff_hour: int = 8 ) -> list[dict]: """从 dwd_settlement_head + dim_table 提取单日结算单(含区域名称)。""" biz_expr = biz_date_sql_expr("sh.pay_time", cutoff_hour) sql = f""" SELECT {biz_expr} AS stat_date, dt.site_table_area_name AS area_name, sh.settle_type, sh.table_charge_money AS table_fee_amount, sh.goods_money AS goods_amount, sh.assistant_pd_money AS assistant_pd_amount, sh.assistant_cx_money AS assistant_cx_amount, (sh.table_charge_money + sh.goods_money + sh.assistant_pd_money + sh.assistant_cx_money) AS gross_amount, sh.coupon_amount, sh.pl_coupon_sale_amount, sh.adjust_amount, sh.member_discount_amount, sh.rounding_amount, sh.gift_card_amount FROM dwd.dwd_settlement_head sh LEFT JOIN dwd.dim_table dt ON dt.table_id = sh.table_id AND dt.site_id = sh.site_id AND dt.scd2_is_current = 1 WHERE sh.site_id = %s AND {biz_expr} = %s AND sh.settle_type IN (1, 3) """ with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, (site_id, stat_date)) return [dict(row) for row in cur.fetchall()] def extract_global_summary(conn, site_id: int, stat_date: date) -> list[dict]: """从 dws_finance_daily_summary 提取单日全局现金流/充值/卡消费。""" sql = """ SELECT stat_date, cash_pay_amount, COALESCE(cash_paper_amount, 0) AS cash_paper_amount, COALESCE(scan_pay_amount, 0) AS scan_pay_amount, groupbuy_pay_amount, recharge_cash_inflow, cash_inflow_total, cash_outflow_total, cash_balance_change, card_consume_total, recharge_card_consume, gift_card_consume, recharge_cash, first_recharge_amount AS first_recharge_cash, renewal_amount AS renewal_cash FROM dws.dws_finance_daily_summary WHERE site_id = %s AND stat_date = %s """ with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, (site_id, stat_date)) return [dict(row) for row in cur.fetchall()] # ── 回填单日 ────────────────────────────────────────────────────────────────── def backfill_one_day(conn, site_id: int, tenant_id: int, stat_date: date) -> int: """回填单日数据,返回插入行数。""" settlement_rows = extract_settlement_with_area(conn, site_id, stat_date) global_summary = extract_global_summary(conn, site_id, stat_date) rows = transform_area_daily( settlement_rows=settlement_rows, global_summary=global_summary, site_id=site_id, tenant_id=tenant_id, logger=logger, ) if not rows: return 0 # delete-before-insert(单事务) with conn.cursor() as cur: cur.execute( "DELETE FROM dws.dws_finance_area_daily WHERE site_id = %s AND stat_date = %s", (site_id, stat_date), ) columns = list(rows[0].keys()) cols_str = ", ".join(columns) placeholders = ", ".join(["%s"] * len(columns)) insert_sql = f"INSERT INTO dws.dws_finance_area_daily ({cols_str}) VALUES ({placeholders})" for row in rows: values = [row.get(col) for col in columns] cur.execute(insert_sql, values) conn.commit() return len(rows) # ── 缓存重算 ────────────────────────────────────────────────────────────────── def recompute_cache(conn, site_id: int) -> int: """重算所有已完成周期缓存,返回 upsert 数量。""" from decimal import Decimal _ZERO = Decimal("0") upserted = 0 for time_range in COMPLETED_PERIODS: start_date, end_date = _calc_period_date_range(time_range) for area_code in ALL_AREA_CODES: # 读取日粒度行计算指纹 with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """SELECT stat_date, gross_amount, discount_total, confirmed_income, cash_inflow_total, cash_outflow_total, cash_balance_change FROM dws.dws_finance_area_daily WHERE site_id = %s AND stat_date >= %s AND stat_date <= %s AND area_code = %s""", (site_id, start_date, end_date, area_code), ) daily_rows = [dict(r) for r in cur.fetchall()] new_fp = compute_fingerprint(daily_rows) # 检查已有指纹 with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """SELECT data_fingerprint FROM dws.dws_finance_board_cache WHERE site_id = %s AND time_range = %s AND area_code = %s""", (site_id, time_range, area_code), ) existing = cur.fetchone() existing_fp = dict(existing).get("data_fingerprint") if existing else None if new_fp == existing_fp: continue # SUM 计算 overview with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """SELECT COALESCE(SUM(gross_amount), 0) AS occurrence, COALESCE(SUM(discount_total), 0) AS discount, COALESCE(SUM(confirmed_income), 0) AS confirmed_revenue, COALESCE(SUM(cash_inflow_total), 0) AS cash_in, COALESCE(SUM(cash_outflow_total), 0) AS cash_out, COALESCE(SUM(cash_balance_change), 0) AS cash_balance FROM dws.dws_finance_area_daily WHERE site_id = %s AND stat_date >= %s AND stat_date <= %s AND area_code = %s""", (site_id, start_date, end_date, area_code), ) row = dict(cur.fetchone()) occ = Decimal(str(row["occurrence"])) disc = Decimal(str(row["discount"])) cr = Decimal(str(row["confirmed_revenue"])) ci = Decimal(str(row["cash_in"])) co = Decimal(str(row["cash_out"])) cb = Decimal(str(row["cash_balance"])) dr = (disc / occ) if occ != 0 else _ZERO br = (cb / ci) if ci != 0 else _ZERO # upsert with conn.cursor() as cur: cur.execute( """INSERT INTO dws.dws_finance_board_cache ( site_id, time_range, area_code, start_date, end_date, occurrence, discount, discount_rate, confirmed_revenue, cash_in, cash_out, cash_balance, balance_rate, data_fingerprint, computed_at, updated_at ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW(),NOW()) ON CONFLICT (site_id, time_range, area_code) DO UPDATE SET start_date=EXCLUDED.start_date, end_date=EXCLUDED.end_date, occurrence=EXCLUDED.occurrence, discount=EXCLUDED.discount, discount_rate=EXCLUDED.discount_rate, confirmed_revenue=EXCLUDED.confirmed_revenue, cash_in=EXCLUDED.cash_in, cash_out=EXCLUDED.cash_out, cash_balance=EXCLUDED.cash_balance, balance_rate=EXCLUDED.balance_rate, data_fingerprint=EXCLUDED.data_fingerprint, computed_at=NOW(), updated_at=NOW()""", (site_id, time_range, area_code, start_date, end_date, occ, disc, dr, cr, ci, co, cb, br, new_fp), ) upserted += 1 conn.commit() return upserted # ── 主流程 ──────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="回填 dws_finance_area_daily + dws_finance_board_cache" ) parser.add_argument("--site-id", type=int, required=True, help="门店 ID") parser.add_argument("--start-date", type=str, required=True, help="起始日期 YYYY-MM-DD") parser.add_argument("--end-date", type=str, required=True, help="结束日期 YYYY-MM-DD") parser.add_argument("--tenant-id", type=int, default=None, help="租户 ID(默认=site_id)") args = parser.parse_args() site_id = args.site_id tenant_id = args.tenant_id or site_id start = datetime.strptime(args.start_date, "%Y-%m-%d").date() end = datetime.strptime(args.end_date, "%Y-%m-%d").date() logger.info("=== 回填开始 ===") logger.info("站点: %s, 日期范围: %s ~ %s", site_id, start, end) conn = psycopg2.connect(PG_DSN) conn.autocommit = False try: # 阶段 1:逐日回填 dws_finance_area_daily total_rows = 0 current = start day_count = (end - start).days + 1 while current <= end: idx = (current - start).days + 1 rows = backfill_one_day(conn, site_id, tenant_id, current) total_rows += rows if idx % 10 == 0 or current == end: logger.info("[%d/%d] %s → %d 行", idx, day_count, current, rows) current += timedelta(days=1) logger.info("阶段 1 完成:共插入 %d 行", total_rows) # 阶段 2:重算缓存 logger.info("阶段 2:重算 DWS_FINANCE_BOARD_CACHE ...") upserted = recompute_cache(conn, site_id) logger.info("阶段 2 完成:upsert %d 组合", upserted) logger.info("=== 回填完成 ===") except Exception: conn.rollback() logger.exception("回填失败,已回滚") sys.exit(1) finally: conn.close() if __name__ == "__main__": main()