Files

423 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档业务日分割点机制Business Day Cutoff
## 概述
本设计将全系统的统计时间口径从自然日切换为以可配置小时值(默认 08:00为分割点的营业日。影响范围覆盖六个层面
1. **配置层**`.env` 新增 `BUSINESS_DAY_START_HOUR`ETL `AppConfig` 和后端 `config.py` 同步加载
2. **共享工具层**`packages/shared``datetime_utils.py` 扩展 `business_day_range``business_week_range``business_month_range` 三个范围函数
3. **ETL DWS 层**`BaseDwsTask.iter_dwd_rows``DATE()` 替换为 `biz_date_sql_expr`18 个具体 DWS 任务的 SQL 全面改造
4. **后端 API 层**:新增 `/api/config/business-day` 端点,时间范围查询统一使用 `business_*_range` 函数
5. **数据库层**:新增 PostgreSQL `biz_date()` 函数,物化视图迁移
6. **前端展示层**:管理后台日期选择器标注营业日口径,小程序透传后端数据
### 设计决策
1. **单一配置源**`BUSINESS_DAY_START_HOUR` 仅在根 `.env` 定义一次ETL 通过 `AppConfig`、后端通过 `config.py`、数据库通过迁移脚本参数化读取,避免多处硬编码导致不一致。
2. **共享包作为唯一逻辑实现**:所有营业日归属计算集中在 `packages/shared/datetime_utils.py`ETL 和后端均从此导入,禁止各子系统重复实现。
3. **SQL 表达式生成器模式**`biz_date_sql_expr(col, hour)` 生成 `DATE(col - INTERVAL 'N hours')` 字符串DWS 任务在 SQL 拼接时调用,避免在 Python 侧逐行转换。
4. **BaseDwsTask 基类统一改造**`iter_dwd_rows` 的日期过滤从 `DATE(col)` 改为 `biz_date_sql_expr(col)`,所有子类自动继承,减少逐任务修改量。
5. **物化视图通过迁移脚本重建**:物化视图的时间过滤条件无法动态参数化,需通过 SQL 迁移脚本 DROP + CREATE 重建。
6. **历史数据重算采用 CLI 批量模式**:提供独立重算脚本,复用正式 ETL 任务逻辑(相同 `Business_Day_Cutoff` 配置),按日期窗口分批执行。
## 架构
```mermaid
graph TD
subgraph 配置层
ENV[".env<br/>BUSINESS_DAY_START_HOUR=8"]
ENV --> AC[AppConfig<br/>app.business_day_start_hour]
ENV --> BC[Backend config.py<br/>BUSINESS_DAY_START_HOUR]
end
subgraph 共享工具层
DU["datetime_utils.py<br/>business_date / business_*_range<br/>biz_date_sql_expr"]
end
subgraph ETL DWS 层
BDT["BaseDwsTask<br/>iter_dwd_rowsbiz_date_sql_expr<br/>get_time_window_range"]
BDT --> FT["FinanceBaseTask / FinanceDailyTask<br/>FinanceRechargeTask / FinanceDiscountTask<br/>FinanceIncomeTask"]
BDT --> AT["AssistantDailyTask / AssistantMonthlyTask<br/>AssistantFinanceTask / AssistantCustomerTask<br/>AssistantOrderContributionTask"]
BDT --> MT["MemberVisitTask / MemberConsumptionTask"]
BDT --> GT["GoodsStockDailyTask / WeeklyTask / MonthlyTask"]
BDT --> IT["SpendingPowerIndexTask / MemberIndexBase"]
BDT --> MV["MvRefreshTask"]
end
subgraph 后端 API 层
API["/api/config/business-day<br/>GET → business_day_start_hour"]
TR["时间范围计算<br/>business_day_range / week_range / month_range"]
end
subgraph 数据库层
PGF["biz_date(timestamptz, int)<br/>PostgreSQL 函数"]
MVR["物化视图重建<br/>迁移脚本"]
end
subgraph 前端
AW["Admin_Web<br/>日期选择器标注"]
MP["Miniprogram<br/>透传后端数据"]
end
AC --> BDT
AC --> DU
BC --> API
BC --> TR
DU --> BDT
DU --> TR
API --> AW
API --> MP
PGF --> MVR
```
## 组件与接口
### 1. 共享时间工具(`packages/shared/src/neozqyy_shared/datetime_utils.py`
现有函数(已实现):
- `business_date(dt, day_start_hour) -> date`
- `business_month(dt, day_start_hour) -> date`
- `business_week_monday(dt, day_start_hour) -> date`
- `biz_date_sql_expr(col, day_start_hour) -> str`
新增函数:
```python
def business_day_range(biz_date: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
"""返回给定营业日的精确时间戳范围 [start, end)。
start = biz_date 当天 day_start_hour:00
end = biz_date 次日 day_start_hour:00
"""
def business_week_range(week_monday: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
"""返回给定营业周(周一)的精确时间戳范围 [start, end)。
start = week_monday 当天 day_start_hour:00
end = week_monday + 7天 day_start_hour:00
"""
def business_month_range(month_first: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
"""返回给定营业月(首日)的精确时间戳范围 [start, end)。
start = month_first 当天 day_start_hour:00
end = 次月1日 day_start_hour:00
"""
```
所有 `*_range` 函数返回的时间戳带 `Asia/Shanghai` 时区信息(使用 `SHANGHAI_TZ`)。
### 2. ETL 配置层(`apps/etl/connectors/feiqiu/config/`
**已完成**(代码库中已存在):
- `defaults.py``"app": {"business_day_start_hour": 8}`
- `env_parser.py``"BUSINESS_DAY_START_HOUR": ("app.business_day_start_hour",)`
**需新增**`settings.py``_validate` 方法增加范围校验:
```python
# 在 _validate 中新增
hour = cfg["app"].get("business_day_start_hour", 8)
if not isinstance(hour, int) or not (0 <= hour <= 23):
raise SystemExit("app.business_day_start_hour 必须为 023 的整数")
```
### 3. 后端配置层(`apps/backend/app/config.py`
新增模块级常量:
```python
BUSINESS_DAY_START_HOUR: int = int(get("BUSINESS_DAY_START_HOUR", "8"))
```
### 4. 后端配置查询 API`apps/backend/app/routers/business_day.py`
```python
router = APIRouter(prefix="/api/config", tags=["业务配置"])
@router.get("/business-day")
async def get_business_day_config():
"""返回当前营业日分割点配置。"""
return {"business_day_start_hour": config.BUSINESS_DAY_START_HOUR}
```
无需认证(公开配置),前端启动时调用一次缓存。
### 5. BaseDwsTask 基类改造
**`iter_dwd_rows` 改造**
```python
def iter_dwd_rows(self, table_name, columns, start_date, end_date,
date_col="created_at", ...):
cutoff = self.config.get("app.business_day_start_hour", 8)
biz_expr = biz_date_sql_expr(date_col, cutoff)
where_parts = [f"{biz_expr} >= %s", f"{biz_expr} <= %s"]
# ... 其余逻辑不变
```
**`get_time_window_range` 改造**
当前方法返回 `TimeRange(start=date, end=date)`,改造后语义不变(仍返回 `date` 范围),但内部使用 `business_date` 计算 `base_date` 的营业日归属:
```python
def get_time_window_range(self, window, base_date=None):
if base_date is None:
from neozqyy_shared.datetime_utils import now_shanghai, business_date
cutoff = self.config.get("app.business_day_start_hour", 8)
base_date = business_date(now_shanghai(), cutoff)
# ... 其余逻辑使用 base_date已是营业日
```
### 6. 各 DWS 任务 SQL 改造模式
所有任务的 SQL 改造遵循统一模式:
```sql
-- 改造前
DATE(pay_time) AS stat_date
WHERE DATE(pay_time) >= %s AND DATE(pay_time) <= %s
GROUP BY DATE(pay_time)
-- 改造后cutoff_hour=8 时)
DATE(pay_time - INTERVAL '8 hours') AS stat_date
WHERE DATE(pay_time - INTERVAL '8 hours') >= %s AND DATE(pay_time - INTERVAL '8 hours') <= %s
GROUP BY DATE(pay_time - INTERVAL '8 hours')
```
任务从 `self.config.get("app.business_day_start_hour", 8)` 读取 cutoff 值,调用 `biz_date_sql_expr(col, cutoff)` 生成表达式。
**受影响任务清单**18 个):
| 任务 | 主要时间列 | 聚合粒度 |
|------|-----------|---------|
| FinanceBaseTask | pay_time | 日 |
| FinanceDailyTask | pay_time | 日 |
| FinanceRechargeTask | pay_time | 日 |
| FinanceDiscountTask | pay_time | 日 |
| FinanceIncomeTask | pay_time | 日 |
| AssistantDailyTask | start_use_time | 日 |
| AssistantOrderContributionTask | pay_time, start_use_time | 日 |
| AssistantCustomerTask | start_use_time | 日 |
| AssistantMonthlyTask | (基于日度数据) | 月 |
| AssistantFinanceTask | start_use_time | 日 |
| MemberVisitTask | pay_time, start_use_time, ledger_end_time | 日 |
| MemberConsumptionTask | pay_time, create_time | 日 |
| GoodsStockDailyTask | fetched_at | 日 |
| GoodsStockWeeklyTask | fetched_at | 周 |
| GoodsStockMonthlyTask | fetched_at | 月 |
| SpendingPowerIndexTask | pay_time | 日 |
| MemberIndexBase | pay_time | 日 |
| MvRefreshTask | (物化视图刷新) | - |
### 7. 数据库层
**新增 PostgreSQL 函数**(迁移脚本 `db/etl_feiqiu/migrations/2026-03-XX__add_biz_date_function.sql`
```sql
CREATE OR REPLACE FUNCTION dws.biz_date(ts timestamptz, cutoff_hour int DEFAULT 8)
RETURNS date AS $$
SELECT (ts - make_interval(hours => cutoff_hour))::date;
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;
```
**物化视图重建**(迁移脚本 `db/etl_feiqiu/migrations/2026-03-XX__rebuild_mv_with_biz_date.sql`
物化视图 `mv_dws_finance_daily_summary_l1..l4``mv_dws_assistant_daily_detail_l1..l4` 的时间过滤条件从 `CURRENT_DATE` 改为 `dws.biz_date(NOW())` 或等效表达式。
### 8. 前端适配
**Admin_Web**
- 日期选择器组件旁增加 Tooltip 或文字标注:`营业日:{HH}:00 起`
- 通过 `/api/config/business-day` 获取 `business_day_start_hour`,启动时请求一次存入全局状态
- 降级策略API 不可用时使用默认值 8`console.warn` 输出警告
**Miniprogram**
- 不直接使用 cutoff 值,所有统计数据由后端 API 按营业日口径返回
- 无需前端改造,仅确认后端 API 返回的数据已是营业日口径
### 9. 历史数据重算
提供 `scripts/ops/rebuild_dws_biz_date.py` 脚本:
```python
# 伪代码
for task_cls in ALL_DWS_TASKS:
for date_window in split_by_month(history_start, history_end):
task = task_cls(config)
task.run(window_start=date_window.start, window_end=date_window.end)
```
- 复用正式 ETL 任务逻辑,确保与正式运行使用相同的 `Business_Day_Cutoff`
- 按月分窗口执行,避免单次事务过大
- 执行前后记录行数对比到日志
- 支持 `--dry-run` 模式预览影响范围
### 10. 运维脚本排查
| 脚本 | 涉及的 DATE() 调用 | 处理方式 |
|------|-------------------|---------|
| `scripts/ops/export_bug_report.py` | `DATE(trash_time)`, `DATE(create_time)`, `DATE(start_use_time)` | 替换为 `biz_date_sql_expr` 生成的表达式 |
| `scripts/ops/etl_consistency_check.py` | 日期比较逻辑 | 评估后按需替换 |
| `apps/etl/.../debug_blackbox.py` | `::date` 类型转换 | 替换为 `biz_date()` 函数调用 |
| `apps/etl/.../run_update.py` | `.date()``datetime.combine` | 替换为 `business_date()` + `business_day_range()` |
## 数据模型
### 配置数据
```
BUSINESS_DAY_START_HOUR: int (023, 默认 8)
```
存储位置:
-`.env``BUSINESS_DAY_START_HOUR=8`
- ETL`AppConfig.config["app"]["business_day_start_hour"]`
- 后端:`config.BUSINESS_DAY_START_HOUR`
### 时间工具函数签名
```python
# 输入/输出类型
business_date(dt: datetime, day_start_hour: int = 8) -> date
business_month(dt: datetime, day_start_hour: int = 8) -> date
business_week_monday(dt: datetime, day_start_hour: int = 8) -> date
business_day_range(biz_date: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
business_week_range(week_monday: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
business_month_range(month_first: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
biz_date_sql_expr(col: str, day_start_hour: int = 8) -> str
```
### 数据库函数
```sql
dws.biz_date(ts timestamptz, cutoff_hour int DEFAULT 8) RETURNS date
-- 等价于 Python 的 business_date用于 SQL 查询和物化视图
```
### API 响应模型
```json
// GET /api/config/business-day
{
"business_day_start_hour": 8
}
```
### DWS 表影响
所有 DWS 表的 `stat_date` 字段语义从"自然日"变为"营业日"。表结构不变,仅数据内容因重算而变化。
## 正确性属性
*属性Property是在系统所有合法执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。*
### Property 1: 营业日归属往返一致性Round-Trip
*对任意* datetime `dt` 和任意合法的 `day_start_hour` h023`business_day_range(business_date(dt, h), h)` 返回的范围 `[start, end)` 应满足 `start <= dt < end`
**Validates: Requirements 2.9, 11.1**
### Property 2: 营业月与营业日一致性
*对任意* datetime `dt` 和任意合法的 `day_start_hour` h023`business_month(dt, h)` 应等于 `business_date(dt, h).replace(day=1)`
**Validates: Requirements 2.10, 11.2**
### Property 3: 营业周与营业日一致性
*对任意* datetime `dt` 和任意合法的 `day_start_hour` h023`business_week_monday(dt, h)` 应等于 `business_date(dt, h) - timedelta(days=business_date(dt, h).weekday())`,且结果的 `weekday()` 始终为 0周一
**Validates: Requirements 2.11, 11.3**
### Property 4: 营业日归属单调性
*对任意* 两个 datetime `dt1 < dt2` 和任意合法的 `day_start_hour` h023`dt1``dt2` 都在同一个 `business_day_range(d, h)` 范围内,则 `business_date(dt1, h) == business_date(dt2, h)`。等价表述:`business_date(dt, h)` 关于 `dt` 是单调非递减的。
**Validates: Requirements 11.9**
### Property 5: 时间范围长度不变量
*对任意* date `d` 和任意合法的 `day_start_hour` h023
- `business_day_range(d, h)` 返回的 `(start, end)` 满足 `end - start == timedelta(hours=24)`
- `business_week_range(monday, h)` 返回的 `(start, end)` 满足 `end - start == timedelta(days=7)`
**Validates: Requirements 11.6, 11.7**
### Property 6: SQL 表达式生成幂等性
*对任意* 列名 `col` 和任意合法的 `day_start_hour` h023`biz_date_sql_expr(col, h)` 多次调用应返回完全相同的字符串。
**Validates: Requirements 11.4**
### Property 7: 非法配置值拒绝
*对任意* 不在 023 范围内的整数值 `v`,当 `BUSINESS_DAY_START_HOUR` 设为 `v` 时,`AppConfig.load()` 应抛出 `SystemExit`
**Validates: Requirements 1.3**
### Property 8: 合法配置值正确加载
*对任意* 023 范围内的整数值 `v`,当 `BUSINESS_DAY_START_HOUR` 环境变量设为 `v` 时,`AppConfig.load()``cfg.get("app.business_day_start_hour")` 应返回 `v`
**Validates: Requirements 3.4**
## 错误处理
| 场景 | 处理方式 |
|------|---------|
| `BUSINESS_DAY_START_HOUR` 值超出 023 | `AppConfig._validate` 抛出 `SystemExit`,明确提示合法范围 |
| `BUSINESS_DAY_START_HOUR` 环境变量缺失 | 使用默认值 8不报错 |
| `BUSINESS_DAY_START_HOUR` 值为非整数字符串 | `env_parser._coerce_env` 保持字符串,`_validate` 阶段类型检查失败抛出 `SystemExit` |
| 后端 `/api/config/business-day` 不可用 | Admin_Web 使用默认值 8`console.warn` 输出警告 |
| 历史数据重算脚本执行失败 | 按月窗口回滚当前批次,记录错误日志,继续下一窗口或中止(由 `--fail-fast` 参数控制) |
| 物化视图迁移脚本执行失败 | 标准 PostgreSQL 事务回滚,迁移脚本幂等设计(`CREATE OR REPLACE` |
| `business_day_range` 等函数收到非法 `day_start_hour` | 函数内部不做校验(调用方负责),依赖 AppConfig 加载阶段的前置校验 |
## 测试策略
### 属性测试Property-Based Testing
使用 `hypothesis` 库,测试文件位于 `tests/test_property_business_day_cutoff.py`
每个属性测试最少运行 100 次迭代,使用 `@settings(max_examples=200)` 配置。
生成策略:
- `day_start_hour``st.integers(min_value=0, max_value=23)`
- `dt``st.datetimes(min_value=datetime(2020, 1, 1), max_value=datetime(2030, 12, 31))`(避免极端日期)
- `biz_date``st.dates(min_value=date(2020, 1, 1), max_value=date(2030, 12, 31))`
每个测试函数以注释标注对应的设计属性:
```python
# Feature: business-day-cutoff, Property 1: 营业日归属往返一致性
@given(dt=st.datetimes(...), h=st.integers(0, 23))
@settings(max_examples=200)
def test_business_date_round_trip(dt, h):
...
# Feature: business-day-cutoff, Property 2: 营业月与营业日一致性
# Feature: business-day-cutoff, Property 3: 营业周与营业日一致性
# Feature: business-day-cutoff, Property 4: 营业日归属单调性
# Feature: business-day-cutoff, Property 5: 时间范围长度不变量
# Feature: business-day-cutoff, Property 6: SQL 表达式生成幂等性
# Feature: business-day-cutoff, Property 7: 非法配置值拒绝
# Feature: business-day-cutoff, Property 8: 合法配置值正确加载
```
### 单元测试
单元测试覆盖属性测试不适合的场景:
- **边界示例**`day_start_hour=8`07:59:59 归属前一天08:00:00 归属当天
- **默认值行为**`BUSINESS_DAY_START_HOUR` 缺失时 AppConfig 返回 8
- **API 端点**`/api/config/business-day` 返回正确 JSON 格式
- **SQL 表达式格式**`biz_date_sql_expr("pay_time", 8)` 返回 `DATE(pay_time - INTERVAL '8 hours')`
- **月末边界**1月31日 07:00 归属1月30日营业日`business_month` 返回1月1日
### 测试配置
- 属性测试库:`hypothesis`(已在项目 `pyproject.toml` 中声明)
- 每个属性测试对应设计文档中的一个 Property由单个 `@given` 装饰的测试函数实现
- 运行命令:`cd C:\NeoZQYY && pytest tests/test_property_business_day_cutoff.py -v`