主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
- 新增 GET /xcx/coaches/{id}/banner 轻量接口
- performance/records 加 coach_id 参数 + view_board_coach 权限分流
- coach/customer/performance/board/task 服务层重构
- fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
- task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
- recall_detector settle_type=3 双重限制 + 门店级 resolved
主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
- perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
- isScattered 散客标记端到端
- foodDetail/phoneFull/creator* 字段透传
主线 3: P19 指数回测框架 Phase 1+2
- 3 个指数表 stat_date 日快照模式
- 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
- task_engine 升级 HTTP 实时 + 推演回测双模式
主线 4: Core 维度层启用
- 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
- 修复 app 视图空查询问题
主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口
主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
- schema 基线与 DDL 快照同步
主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)
附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具
合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
161 lines
5.5 KiB
Python
161 lines
5.5 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
回填脚本:dws_coach_area_hours
|
||
|
||
对已有月份批量从 DWD 服务记录聚合助教区域课时,写入 dws_coach_area_hours。
|
||
|
||
用法:
|
||
cd C:\\Project\\NeoZQYY
|
||
uv run python scripts/ops/backfill_coach_area_hours.py \\
|
||
--site-id 2790685415443269 --start-month 2025-07-01 --end-month 2026-03-01
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import date, timedelta
|
||
from decimal import Decimal
|
||
|
||
# ── 环境初始化 ──
|
||
_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
sys.path.insert(0, _ROOT)
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv(os.path.join(_ROOT, ".env"))
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
from neozqyy_shared.area_mapping import resolve_area_code
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
||
logger = logging.getLogger("backfill_coach_area_hours")
|
||
|
||
# ── 导入 ETL 纯函数(stub 模式) ──
|
||
import importlib.util as _ilu
|
||
import types as _types
|
||
|
||
_ETL_ROOT = os.path.join(_ROOT, "apps", "etl", "connectors", "feiqiu")
|
||
|
||
_tasks_pkg = _types.ModuleType("tasks")
|
||
_tasks_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks")]
|
||
_dws_pkg = _types.ModuleType("tasks.dws")
|
||
_dws_pkg.__path__ = [os.path.join(_ETL_ROOT, "tasks", "dws")]
|
||
sys.modules.setdefault("tasks", _tasks_pkg)
|
||
sys.modules.setdefault("tasks.dws", _dws_pkg)
|
||
_base_stub = _types.ModuleType("tasks.dws.base_dws_task")
|
||
_base_stub.TaskContext = type("TaskContext", (), {})
|
||
_base_stub.BaseDwsTask = type("BaseDwsTask", (), {})
|
||
sys.modules["tasks.dws.base_dws_task"] = _base_stub
|
||
|
||
_mod_path = os.path.join(_ETL_ROOT, "tasks", "dws", "coach_area_hours_task.py")
|
||
_spec = _ilu.spec_from_file_location("tasks.dws.coach_area_hours_task", _mod_path)
|
||
_mod = _ilu.module_from_spec(_spec)
|
||
_mod.__package__ = "tasks.dws"
|
||
sys.modules["tasks.dws.coach_area_hours_task"] = _mod
|
||
_spec.loader.exec_module(_mod)
|
||
transform_coach_area_hours = _mod.transform_coach_area_hours
|
||
|
||
|
||
# ── 数据提取 ──
|
||
|
||
def extract_service_records(conn, site_id: int, stat_month: date) -> list[dict]:
|
||
"""从 DWD 提取指定月份的服务记录"""
|
||
if stat_month.month == 12:
|
||
next_month = stat_month.replace(year=stat_month.year + 1, month=1)
|
||
else:
|
||
next_month = stat_month.replace(month=stat_month.month + 1)
|
||
|
||
sql = """
|
||
SELECT
|
||
s.site_assistant_id AS assistant_id,
|
||
dt.site_table_area_name AS area_name,
|
||
s.skill_name,
|
||
s.income_seconds,
|
||
COALESCE(ex.is_trash, 0) AS is_trash
|
||
FROM dwd.dwd_assistant_service_log s
|
||
LEFT JOIN dwd.dwd_assistant_service_log_ex ex
|
||
ON s.assistant_service_id = ex.assistant_service_id
|
||
LEFT JOIN dwd.dim_table dt
|
||
ON s.site_table_id = dt.table_id
|
||
AND dt.scd2_is_current = 1
|
||
WHERE s.site_id = %s
|
||
AND s.is_delete = 0
|
||
AND DATE(s.start_use_time) >= %s
|
||
AND DATE(s.start_use_time) < %s
|
||
"""
|
||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||
cur.execute(sql, (site_id, stat_month, next_month))
|
||
return cur.fetchall()
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="回填 dws_coach_area_hours")
|
||
parser.add_argument("--site-id", type=int, required=True)
|
||
parser.add_argument("--start-month", type=str, required=True, help="起始月份 YYYY-MM-01")
|
||
parser.add_argument("--end-month", type=str, required=True, help="结束月份 YYYY-MM-01")
|
||
args = parser.parse_args()
|
||
|
||
dsn = os.environ.get("PG_DSN")
|
||
if not dsn:
|
||
raise RuntimeError("PG_DSN 未设置")
|
||
|
||
os.environ.setdefault("PGCLIENTENCODING", "UTF8")
|
||
conn = psycopg2.connect(dsn)
|
||
conn.autocommit = False
|
||
|
||
site_id = args.site_id
|
||
start = date.fromisoformat(args.start_month)
|
||
end = date.fromisoformat(args.end_month)
|
||
|
||
total_inserted = 0
|
||
month = start
|
||
month_idx = 0
|
||
while month <= end:
|
||
month_idx += 1
|
||
records = extract_service_records(conn, site_id, month)
|
||
|
||
rows = transform_coach_area_hours(
|
||
records=records,
|
||
stat_month=month,
|
||
site_id=site_id,
|
||
tenant_id=site_id,
|
||
)
|
||
|
||
# delete-before-insert
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"DELETE FROM dws.dws_coach_area_hours WHERE site_id = %s AND stat_month = %s",
|
||
(site_id, month),
|
||
)
|
||
if rows:
|
||
cols = [
|
||
"site_id", "tenant_id", "stat_month", "assistant_id", "area_code",
|
||
"base_hours", "bonus_hours", "room_hours",
|
||
"effective_hours", "trashed_hours",
|
||
"base_service_count", "bonus_service_count", "room_service_count",
|
||
]
|
||
placeholders = ", ".join(["%s"] * len(cols))
|
||
insert_sql = f"INSERT INTO dws.dws_coach_area_hours ({', '.join(cols)}) VALUES ({placeholders})"
|
||
for row in rows:
|
||
cur.execute(insert_sql, tuple(row[c] for c in cols))
|
||
|
||
conn.commit()
|
||
total_inserted += len(rows)
|
||
logger.info("[%d] %s → %d 行(%d 条服务记录)", month_idx, month, len(rows), len(records))
|
||
|
||
# 下一个月
|
||
if month.month == 12:
|
||
month = month.replace(year=month.year + 1, month=1)
|
||
else:
|
||
month = month.replace(month=month.month + 1)
|
||
|
||
logger.info("=== 回填完成:共 %d 行 ===", total_inserted)
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|