Files
Neo-ZQYY/scripts/ops/backfill_finance_area_daily.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

357 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
回填脚本dws_finance_area_daily + dws_finance_board_cache
对已有日期范围批量调用 FinanceAreaDailyTask.transform 逻辑,
回填完成后触发 DWS_FINANCE_BOARD_CACHE 重算所有已完成周期缓存。
用法:
cd C:\\Project\\NeoZQYY
uv run python scripts/ops/backfill_finance_area_daily.py \\
--site-id 1 --start-date 2025-07-16 --end-date 2026-03-28
Requirements: 2.7, 3.4, 5.1
"""
from __future__ import annotations
import argparse
import logging
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
# ── 环境加载 ──────────────────────────────────────────────────────────────────
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _env_paths import ensure_repo_root
ensure_repo_root()
import os
from dotenv import load_dotenv
_ROOT = Path(__file__).resolve().parents[2]
load_dotenv(_ROOT / ".env", override=False)
PG_DSN = os.environ.get("PG_DSN")
if not PG_DSN:
print("错误PG_DSN 环境变量未定义,请在根 .env 中配置", file=sys.stderr)
sys.exit(1)
import psycopg2
import psycopg2.extras
from neozqyy_shared.area_mapping import ALL_AREA_CODES
from neozqyy_shared.datetime_utils import biz_date_sql_expr
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("backfill_finance_area_daily")
# ── 导入 ETL 纯函数 ──────────────────────────────────────────────────────────
# transform_area_daily 是纯函数,不依赖数据库连接对象
# 使用 importlib 加载,绕过 ETL 全量导入链(避免 utils.windowing 等依赖)
import importlib.util as _ilu
import types as _types
_ETL_ROOT = os.path.join(_ROOT, "apps", "etl", "connectors", "feiqiu")
# stub 模块(仅提供类型占位,纯函数不需要真实基类)
_tasks_pkg = _types.ModuleType("tasks")
_tasks_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks")]
_dws_pkg = _types.ModuleType("tasks.dws")
_dws_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks", "dws")]
sys.modules.setdefault("tasks", _tasks_pkg)
sys.modules.setdefault("tasks.dws", _dws_pkg)
_base_stub = _types.ModuleType("tasks.dws.base_dws_task")
_base_stub.TaskContext = type("TaskContext", (), {})
_base_stub.BaseDwsTask = type("BaseDwsTask", (), {})
sys.modules["tasks.dws.base_dws_task"] = _base_stub
_fin_stub = _types.ModuleType("tasks.dws.finance_base_task")
_fin_stub.FinanceBaseTask = type("FinanceBaseTask", (), {})
sys.modules["tasks.dws.finance_base_task"] = _fin_stub
# 加载 finance_area_daily 纯函数
_fad_path = os.path.join(_ETL_ROOT, "tasks", "dws", "finance_area_daily.py")
_fad_spec = _ilu.spec_from_file_location("tasks.dws.finance_area_daily", _fad_path)
_fad_mod = _ilu.module_from_spec(_fad_spec)
_fad_mod.__package__ = "tasks.dws"
sys.modules["tasks.dws.finance_area_daily"] = _fad_mod
_fad_spec.loader.exec_module(_fad_mod)
transform_area_daily = _fad_mod.transform_area_daily
# 加载 finance_board_cache 纯函数
_fbc_path = os.path.join(_ETL_ROOT, "tasks", "dws", "finance_board_cache.py")
_fbc_spec = _ilu.spec_from_file_location("tasks.dws.finance_board_cache", _fbc_path)
_fbc_mod = _ilu.module_from_spec(_fbc_spec)
_fbc_mod.__package__ = "tasks.dws"
sys.modules["tasks.dws.finance_board_cache"] = _fbc_mod
_fbc_spec.loader.exec_module(_fbc_mod)
COMPLETED_PERIODS = _fbc_mod.COMPLETED_PERIODS
compute_fingerprint = _fbc_mod.compute_fingerprint
_calc_period_date_range = _fbc_mod._calc_period_date_range
# ── 数据提取 ──────────────────────────────────────────────────────────────────
def extract_settlement_with_area(
conn, site_id: int, stat_date: date, cutoff_hour: int = 8
) -> list[dict]:
"""从 dwd_settlement_head + dim_table 提取单日结算单(含区域名称)。"""
biz_expr = biz_date_sql_expr("sh.pay_time", cutoff_hour)
sql = f"""
SELECT
{biz_expr} AS stat_date,
dt.site_table_area_name AS area_name,
sh.settle_type,
sh.table_charge_money AS table_fee_amount,
sh.goods_money AS goods_amount,
sh.assistant_pd_money AS assistant_pd_amount,
sh.assistant_cx_money AS assistant_cx_amount,
(sh.table_charge_money + sh.goods_money
+ sh.assistant_pd_money + sh.assistant_cx_money)
AS gross_amount,
sh.coupon_amount,
sh.pl_coupon_sale_amount,
sh.adjust_amount,
sh.member_discount_amount,
sh.rounding_amount,
sh.gift_card_amount
FROM dwd.dwd_settlement_head sh
LEFT JOIN dwd.dim_table dt
ON dt.table_id = sh.table_id
AND dt.site_id = sh.site_id
AND dt.scd2_is_current = 1
WHERE sh.site_id = %s
AND {biz_expr} = %s
AND sh.settle_type IN (1, 3)
"""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, (site_id, stat_date))
return [dict(row) for row in cur.fetchall()]
def extract_global_summary(conn, site_id: int, stat_date: date) -> list[dict]:
"""从 dws_finance_daily_summary 提取单日全局现金流/充值/卡消费。"""
sql = """
SELECT
stat_date,
cash_pay_amount,
COALESCE(cash_paper_amount, 0) AS cash_paper_amount,
COALESCE(scan_pay_amount, 0) AS scan_pay_amount,
groupbuy_pay_amount,
recharge_cash_inflow,
cash_inflow_total,
cash_outflow_total,
cash_balance_change,
card_consume_total,
recharge_card_consume,
gift_card_consume,
recharge_cash,
first_recharge_amount AS first_recharge_cash,
renewal_amount AS renewal_cash
FROM dws.dws_finance_daily_summary
WHERE site_id = %s
AND stat_date = %s
"""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, (site_id, stat_date))
return [dict(row) for row in cur.fetchall()]
# ── 回填单日 ──────────────────────────────────────────────────────────────────
def backfill_one_day(conn, site_id: int, tenant_id: int, stat_date: date) -> int:
"""回填单日数据,返回插入行数。"""
settlement_rows = extract_settlement_with_area(conn, site_id, stat_date)
global_summary = extract_global_summary(conn, site_id, stat_date)
rows = transform_area_daily(
settlement_rows=settlement_rows,
global_summary=global_summary,
site_id=site_id,
tenant_id=tenant_id,
logger=logger,
)
if not rows:
return 0
# delete-before-insert单事务
with conn.cursor() as cur:
cur.execute(
"DELETE FROM dws.dws_finance_area_daily WHERE site_id = %s AND stat_date = %s",
(site_id, stat_date),
)
columns = list(rows[0].keys())
cols_str = ", ".join(columns)
placeholders = ", ".join(["%s"] * len(columns))
insert_sql = f"INSERT INTO dws.dws_finance_area_daily ({cols_str}) VALUES ({placeholders})"
for row in rows:
values = [row.get(col) for col in columns]
cur.execute(insert_sql, values)
conn.commit()
return len(rows)
# ── 缓存重算 ──────────────────────────────────────────────────────────────────
def recompute_cache(conn, site_id: int) -> int:
"""重算所有已完成周期缓存,返回 upsert 数量。"""
from decimal import Decimal
_ZERO = Decimal("0")
upserted = 0
for time_range in COMPLETED_PERIODS:
start_date, end_date = _calc_period_date_range(time_range)
for area_code in ALL_AREA_CODES:
# 读取日粒度行计算指纹
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""SELECT stat_date, gross_amount, discount_total,
confirmed_income, cash_inflow_total,
cash_outflow_total, cash_balance_change
FROM dws.dws_finance_area_daily
WHERE site_id = %s AND stat_date >= %s AND stat_date <= %s
AND area_code = %s""",
(site_id, start_date, end_date, area_code),
)
daily_rows = [dict(r) for r in cur.fetchall()]
new_fp = compute_fingerprint(daily_rows)
# 检查已有指纹
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""SELECT data_fingerprint FROM dws.dws_finance_board_cache
WHERE site_id = %s AND time_range = %s AND area_code = %s""",
(site_id, time_range, area_code),
)
existing = cur.fetchone()
existing_fp = dict(existing).get("data_fingerprint") if existing else None
if new_fp == existing_fp:
continue
# SUM 计算 overview
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""SELECT
COALESCE(SUM(gross_amount), 0) AS occurrence,
COALESCE(SUM(discount_total), 0) AS discount,
COALESCE(SUM(confirmed_income), 0) AS confirmed_revenue,
COALESCE(SUM(cash_inflow_total), 0) AS cash_in,
COALESCE(SUM(cash_outflow_total), 0) AS cash_out,
COALESCE(SUM(cash_balance_change), 0) AS cash_balance
FROM dws.dws_finance_area_daily
WHERE site_id = %s AND stat_date >= %s AND stat_date <= %s
AND area_code = %s""",
(site_id, start_date, end_date, area_code),
)
row = dict(cur.fetchone())
occ = Decimal(str(row["occurrence"]))
disc = Decimal(str(row["discount"]))
cr = Decimal(str(row["confirmed_revenue"]))
ci = Decimal(str(row["cash_in"]))
co = Decimal(str(row["cash_out"]))
cb = Decimal(str(row["cash_balance"]))
dr = (disc / occ) if occ != 0 else _ZERO
br = (cb / ci) if ci != 0 else _ZERO
# upsert
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO dws.dws_finance_board_cache (
site_id, time_range, area_code, start_date, end_date,
occurrence, discount, discount_rate, confirmed_revenue,
cash_in, cash_out, cash_balance, balance_rate,
data_fingerprint, computed_at, updated_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW(),NOW())
ON CONFLICT (site_id, time_range, area_code) DO UPDATE SET
start_date=EXCLUDED.start_date, end_date=EXCLUDED.end_date,
occurrence=EXCLUDED.occurrence, discount=EXCLUDED.discount,
discount_rate=EXCLUDED.discount_rate,
confirmed_revenue=EXCLUDED.confirmed_revenue,
cash_in=EXCLUDED.cash_in, cash_out=EXCLUDED.cash_out,
cash_balance=EXCLUDED.cash_balance, balance_rate=EXCLUDED.balance_rate,
data_fingerprint=EXCLUDED.data_fingerprint,
computed_at=NOW(), updated_at=NOW()""",
(site_id, time_range, area_code, start_date, end_date,
occ, disc, dr, cr, ci, co, cb, br, new_fp),
)
upserted += 1
conn.commit()
return upserted
# ── 主流程 ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="回填 dws_finance_area_daily + dws_finance_board_cache"
)
parser.add_argument("--site-id", type=int, required=True, help="门店 ID")
parser.add_argument("--start-date", type=str, required=True, help="起始日期 YYYY-MM-DD")
parser.add_argument("--end-date", type=str, required=True, help="结束日期 YYYY-MM-DD")
parser.add_argument("--tenant-id", type=int, default=None, help="租户 ID默认=site_id")
args = parser.parse_args()
site_id = args.site_id
tenant_id = args.tenant_id or site_id
start = datetime.strptime(args.start_date, "%Y-%m-%d").date()
end = datetime.strptime(args.end_date, "%Y-%m-%d").date()
logger.info("=== 回填开始 ===")
logger.info("站点: %s, 日期范围: %s ~ %s", site_id, start, end)
conn = psycopg2.connect(PG_DSN)
conn.autocommit = False
try:
# 阶段 1逐日回填 dws_finance_area_daily
total_rows = 0
current = start
day_count = (end - start).days + 1
while current <= end:
idx = (current - start).days + 1
rows = backfill_one_day(conn, site_id, tenant_id, current)
total_rows += rows
if idx % 10 == 0 or current == end:
logger.info("[%d/%d] %s%d", idx, day_count, current, rows)
current += timedelta(days=1)
logger.info("阶段 1 完成:共插入 %d", total_rows)
# 阶段 2重算缓存
logger.info("阶段 2重算 DWS_FINANCE_BOARD_CACHE ...")
upserted = recompute_cache(conn, site_id)
logger.info("阶段 2 完成upsert %d 组合", upserted)
logger.info("=== 回填完成 ===")
except Exception:
conn.rollback()
logger.exception("回填失败,已回滚")
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
main()