Files

18 KiB
Raw Permalink Blame History

设计文档业务日分割点机制Business Day Cutoff

概述

本设计将全系统的统计时间口径从自然日切换为以可配置小时值(默认 08:00为分割点的营业日。影响范围覆盖六个层面

  1. 配置层.env 新增 BUSINESS_DAY_START_HOURETL AppConfig 和后端 config.py 同步加载
  2. 共享工具层packages/shareddatetime_utils.py 扩展 business_day_rangebusiness_week_rangebusiness_month_range 三个范围函数
  3. ETL DWS 层BaseDwsTask.iter_dwd_rowsDATE() 替换为 biz_date_sql_expr18 个具体 DWS 任务的 SQL 全面改造
  4. 后端 API 层:新增 /api/config/business-day 端点,时间范围查询统一使用 business_*_range 函数
  5. 数据库层:新增 PostgreSQL biz_date() 函数,物化视图迁移
  6. 前端展示层:管理后台日期选择器标注营业日口径,小程序透传后端数据

设计决策

  1. 单一配置源BUSINESS_DAY_START_HOUR 仅在根 .env 定义一次ETL 通过 AppConfig、后端通过 config.py、数据库通过迁移脚本参数化读取,避免多处硬编码导致不一致。
  2. 共享包作为唯一逻辑实现:所有营业日归属计算集中在 packages/shared/datetime_utils.pyETL 和后端均从此导入,禁止各子系统重复实现。
  3. SQL 表达式生成器模式biz_date_sql_expr(col, hour) 生成 DATE(col - INTERVAL 'N hours') 字符串DWS 任务在 SQL 拼接时调用,避免在 Python 侧逐行转换。
  4. BaseDwsTask 基类统一改造iter_dwd_rows 的日期过滤从 DATE(col) 改为 biz_date_sql_expr(col),所有子类自动继承,减少逐任务修改量。
  5. 物化视图通过迁移脚本重建:物化视图的时间过滤条件无法动态参数化,需通过 SQL 迁移脚本 DROP + CREATE 重建。
  6. 历史数据重算采用 CLI 批量模式:提供独立重算脚本,复用正式 ETL 任务逻辑(相同 Business_Day_Cutoff 配置),按日期窗口分批执行。

架构

graph TD
    subgraph 配置层
        ENV[".env<br/>BUSINESS_DAY_START_HOUR=8"]
        ENV --> AC[AppConfig<br/>app.business_day_start_hour]
        ENV --> BC[Backend config.py<br/>BUSINESS_DAY_START_HOUR]
    end

    subgraph 共享工具层
        DU["datetime_utils.py<br/>business_date / business_*_range<br/>biz_date_sql_expr"]
    end

    subgraph ETL DWS 层
        BDT["BaseDwsTask<br/>iter_dwd_rowsbiz_date_sql_expr<br/>get_time_window_range"]
        BDT --> FT["FinanceBaseTask / FinanceDailyTask<br/>FinanceRechargeTask / FinanceDiscountTask<br/>FinanceIncomeTask"]
        BDT --> AT["AssistantDailyTask / AssistantMonthlyTask<br/>AssistantFinanceTask / AssistantCustomerTask<br/>AssistantOrderContributionTask"]
        BDT --> MT["MemberVisitTask / MemberConsumptionTask"]
        BDT --> GT["GoodsStockDailyTask / WeeklyTask / MonthlyTask"]
        BDT --> IT["SpendingPowerIndexTask / MemberIndexBase"]
        BDT --> MV["MvRefreshTask"]
    end

    subgraph 后端 API 层
        API["/api/config/business-day<br/>GET → business_day_start_hour"]
        TR["时间范围计算<br/>business_day_range / week_range / month_range"]
    end

    subgraph 数据库层
        PGF["biz_date(timestamptz, int)<br/>PostgreSQL 函数"]
        MVR["物化视图重建<br/>迁移脚本"]
    end

    subgraph 前端
        AW["Admin_Web<br/>日期选择器标注"]
        MP["Miniprogram<br/>透传后端数据"]
    end

    AC --> BDT
    AC --> DU
    BC --> API
    BC --> TR
    DU --> BDT
    DU --> TR
    API --> AW
    API --> MP
    PGF --> MVR

组件与接口

1. 共享时间工具(packages/shared/src/neozqyy_shared/datetime_utils.py

现有函数(已实现):

  • business_date(dt, day_start_hour) -> date
  • business_month(dt, day_start_hour) -> date
  • business_week_monday(dt, day_start_hour) -> date
  • biz_date_sql_expr(col, day_start_hour) -> str

新增函数:

def business_day_range(biz_date: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
    """返回给定营业日的精确时间戳范围 [start, end)。
    
    start = biz_date 当天 day_start_hour:00
    end   = biz_date 次日 day_start_hour:00
    """

def business_week_range(week_monday: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
    """返回给定营业周(周一)的精确时间戳范围 [start, end)。
    
    start = week_monday 当天 day_start_hour:00
    end   = week_monday + 7天 day_start_hour:00
    """

def business_month_range(month_first: date, day_start_hour: int = 8) -> tuple[datetime, datetime]:
    """返回给定营业月(首日)的精确时间戳范围 [start, end)。
    
    start = month_first 当天 day_start_hour:00
    end   = 次月1日 day_start_hour:00
    """

所有 *_range 函数返回的时间戳带 Asia/Shanghai 时区信息(使用 SHANGHAI_TZ)。

2. ETL 配置层(apps/etl/connectors/feiqiu/config/

已完成(代码库中已存在):

  • defaults.py"app": {"business_day_start_hour": 8}
  • env_parser.py"BUSINESS_DAY_START_HOUR": ("app.business_day_start_hour",)

需新增settings.py_validate 方法增加范围校验:

# 在 _validate 中新增
hour = cfg["app"].get("business_day_start_hour", 8)
if not isinstance(hour, int) or not (0 <= hour <= 23):
    raise SystemExit("app.business_day_start_hour 必须为 023 的整数")

3. 后端配置层(apps/backend/app/config.py

新增模块级常量:

BUSINESS_DAY_START_HOUR: int = int(get("BUSINESS_DAY_START_HOUR", "8"))

4. 后端配置查询 APIapps/backend/app/routers/business_day.py

router = APIRouter(prefix="/api/config", tags=["业务配置"])

@router.get("/business-day")
async def get_business_day_config():
    """返回当前营业日分割点配置。"""
    return {"business_day_start_hour": config.BUSINESS_DAY_START_HOUR}

无需认证(公开配置),前端启动时调用一次缓存。

5. BaseDwsTask 基类改造

iter_dwd_rows 改造

def iter_dwd_rows(self, table_name, columns, start_date, end_date,
                  date_col="created_at", ...):
    cutoff = self.config.get("app.business_day_start_hour", 8)
    biz_expr = biz_date_sql_expr(date_col, cutoff)
    where_parts = [f"{biz_expr} >= %s", f"{biz_expr} <= %s"]
    # ... 其余逻辑不变

get_time_window_range 改造

当前方法返回 TimeRange(start=date, end=date),改造后语义不变(仍返回 date 范围),但内部使用 business_date 计算 base_date 的营业日归属:

def get_time_window_range(self, window, base_date=None):
    if base_date is None:
        from neozqyy_shared.datetime_utils import now_shanghai, business_date
        cutoff = self.config.get("app.business_day_start_hour", 8)
        base_date = business_date(now_shanghai(), cutoff)
    # ... 其余逻辑使用 base_date已是营业日

6. 各 DWS 任务 SQL 改造模式

所有任务的 SQL 改造遵循统一模式:

-- 改造前
DATE(pay_time) AS stat_date
WHERE DATE(pay_time) >= %s AND DATE(pay_time) <= %s
GROUP BY DATE(pay_time)

-- 改造后cutoff_hour=8 时)
DATE(pay_time - INTERVAL '8 hours') AS stat_date
WHERE DATE(pay_time - INTERVAL '8 hours') >= %s AND DATE(pay_time - INTERVAL '8 hours') <= %s
GROUP BY DATE(pay_time - INTERVAL '8 hours')

任务从 self.config.get("app.business_day_start_hour", 8) 读取 cutoff 值,调用 biz_date_sql_expr(col, cutoff) 生成表达式。

受影响任务清单18 个):

任务 主要时间列 聚合粒度
FinanceBaseTask pay_time
FinanceDailyTask pay_time
FinanceRechargeTask pay_time
FinanceDiscountTask pay_time
FinanceIncomeTask pay_time
AssistantDailyTask start_use_time
AssistantOrderContributionTask pay_time, start_use_time
AssistantCustomerTask start_use_time
AssistantMonthlyTask (基于日度数据)
AssistantFinanceTask start_use_time
MemberVisitTask pay_time, start_use_time, ledger_end_time
MemberConsumptionTask pay_time, create_time
GoodsStockDailyTask fetched_at
GoodsStockWeeklyTask fetched_at
GoodsStockMonthlyTask fetched_at
SpendingPowerIndexTask pay_time
MemberIndexBase pay_time
MvRefreshTask (物化视图刷新) -

7. 数据库层

新增 PostgreSQL 函数(迁移脚本 db/etl_feiqiu/migrations/2026-03-XX__add_biz_date_function.sql

CREATE OR REPLACE FUNCTION dws.biz_date(ts timestamptz, cutoff_hour int DEFAULT 8)
RETURNS date AS $$
    SELECT (ts - make_interval(hours => cutoff_hour))::date;
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;

物化视图重建(迁移脚本 db/etl_feiqiu/migrations/2026-03-XX__rebuild_mv_with_biz_date.sql

物化视图 mv_dws_finance_daily_summary_l1..l4mv_dws_assistant_daily_detail_l1..l4 的时间过滤条件从 CURRENT_DATE 改为 dws.biz_date(NOW()) 或等效表达式。

8. 前端适配

Admin_Web

  • 日期选择器组件旁增加 Tooltip 或文字标注:营业日:{HH}:00 起
  • 通过 /api/config/business-day 获取 business_day_start_hour,启动时请求一次存入全局状态
  • 降级策略API 不可用时使用默认值 8console.warn 输出警告

Miniprogram

  • 不直接使用 cutoff 值,所有统计数据由后端 API 按营业日口径返回
  • 无需前端改造,仅确认后端 API 返回的数据已是营业日口径

9. 历史数据重算

提供 scripts/ops/rebuild_dws_biz_date.py 脚本:

# 伪代码
for task_cls in ALL_DWS_TASKS:
    for date_window in split_by_month(history_start, history_end):
        task = task_cls(config)
        task.run(window_start=date_window.start, window_end=date_window.end)
  • 复用正式 ETL 任务逻辑,确保与正式运行使用相同的 Business_Day_Cutoff
  • 按月分窗口执行,避免单次事务过大
  • 执行前后记录行数对比到日志
  • 支持 --dry-run 模式预览影响范围

10. 运维脚本排查

脚本 涉及的 DATE() 调用 处理方式
scripts/ops/export_bug_report.py DATE(trash_time), DATE(create_time), DATE(start_use_time) 替换为 biz_date_sql_expr 生成的表达式
scripts/ops/etl_consistency_check.py 日期比较逻辑 评估后按需替换
apps/etl/.../debug_blackbox.py ::date 类型转换 替换为 biz_date() 函数调用
apps/etl/.../run_update.py .date()datetime.combine 替换为 business_date() + business_day_range()

数据模型

配置数据

BUSINESS_DAY_START_HOUR: int  (023, 默认 8)

存储位置:

  • .envBUSINESS_DAY_START_HOUR=8
  • ETLAppConfig.config["app"]["business_day_start_hour"]
  • 后端:config.BUSINESS_DAY_START_HOUR

时间工具函数签名

# 输入/输出类型
business_date(dt: datetime, day_start_hour: int = 8) -> date
business_month(dt: datetime, day_start_hour: int = 8) -> date
business_week_monday(dt: datetime, day_start_hour: int = 8) -> date
business_day_range(biz_date: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
business_week_range(week_monday: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
business_month_range(month_first: date, day_start_hour: int = 8) -> tuple[datetime, datetime]
biz_date_sql_expr(col: str, day_start_hour: int = 8) -> str

数据库函数

dws.biz_date(ts timestamptz, cutoff_hour int DEFAULT 8) RETURNS date
-- 等价于 Python 的 business_date用于 SQL 查询和物化视图

API 响应模型

// GET /api/config/business-day
{
  "business_day_start_hour": 8
}

DWS 表影响

所有 DWS 表的 stat_date 字段语义从"自然日"变为"营业日"。表结构不变,仅数据内容因重算而变化。

正确性属性

属性Property是在系统所有合法执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。

Property 1: 营业日归属往返一致性Round-Trip

对任意 datetime dt 和任意合法的 day_start_hour h023business_day_range(business_date(dt, h), h) 返回的范围 [start, end) 应满足 start <= dt < end

Validates: Requirements 2.9, 11.1

Property 2: 营业月与营业日一致性

对任意 datetime dt 和任意合法的 day_start_hour h023business_month(dt, h) 应等于 business_date(dt, h).replace(day=1)

Validates: Requirements 2.10, 11.2

Property 3: 营业周与营业日一致性

对任意 datetime dt 和任意合法的 day_start_hour h023business_week_monday(dt, h) 应等于 business_date(dt, h) - timedelta(days=business_date(dt, h).weekday()),且结果的 weekday() 始终为 0周一

Validates: Requirements 2.11, 11.3

Property 4: 营业日归属单调性

对任意 两个 datetime dt1 < dt2 和任意合法的 day_start_hour h023dt1dt2 都在同一个 business_day_range(d, h) 范围内,则 business_date(dt1, h) == business_date(dt2, h)。等价表述:business_date(dt, h) 关于 dt 是单调非递减的。

Validates: Requirements 11.9

Property 5: 时间范围长度不变量

对任意 date d 和任意合法的 day_start_hour h023

  • business_day_range(d, h) 返回的 (start, end) 满足 end - start == timedelta(hours=24)
  • business_week_range(monday, h) 返回的 (start, end) 满足 end - start == timedelta(days=7)

Validates: Requirements 11.6, 11.7

Property 6: SQL 表达式生成幂等性

对任意 列名 col 和任意合法的 day_start_hour h023biz_date_sql_expr(col, h) 多次调用应返回完全相同的字符串。

Validates: Requirements 11.4

Property 7: 非法配置值拒绝

对任意 不在 023 范围内的整数值 v,当 BUSINESS_DAY_START_HOUR 设为 v 时,AppConfig.load() 应抛出 SystemExit

Validates: Requirements 1.3

Property 8: 合法配置值正确加载

对任意 023 范围内的整数值 v,当 BUSINESS_DAY_START_HOUR 环境变量设为 v 时,AppConfig.load()cfg.get("app.business_day_start_hour") 应返回 v

Validates: Requirements 3.4

错误处理

场景 处理方式
BUSINESS_DAY_START_HOUR 值超出 023 AppConfig._validate 抛出 SystemExit,明确提示合法范围
BUSINESS_DAY_START_HOUR 环境变量缺失 使用默认值 8不报错
BUSINESS_DAY_START_HOUR 值为非整数字符串 env_parser._coerce_env 保持字符串,_validate 阶段类型检查失败抛出 SystemExit
后端 /api/config/business-day 不可用 Admin_Web 使用默认值 8console.warn 输出警告
历史数据重算脚本执行失败 按月窗口回滚当前批次,记录错误日志,继续下一窗口或中止(由 --fail-fast 参数控制)
物化视图迁移脚本执行失败 标准 PostgreSQL 事务回滚,迁移脚本幂等设计(CREATE OR REPLACE
business_day_range 等函数收到非法 day_start_hour 函数内部不做校验(调用方负责),依赖 AppConfig 加载阶段的前置校验

测试策略

属性测试Property-Based Testing

使用 hypothesis 库,测试文件位于 tests/test_property_business_day_cutoff.py

每个属性测试最少运行 100 次迭代,使用 @settings(max_examples=200) 配置。

生成策略:

  • day_start_hourst.integers(min_value=0, max_value=23)
  • dtst.datetimes(min_value=datetime(2020, 1, 1), max_value=datetime(2030, 12, 31))(避免极端日期)
  • biz_datest.dates(min_value=date(2020, 1, 1), max_value=date(2030, 12, 31))

每个测试函数以注释标注对应的设计属性:

# Feature: business-day-cutoff, Property 1: 营业日归属往返一致性
@given(dt=st.datetimes(...), h=st.integers(0, 23))
@settings(max_examples=200)
def test_business_date_round_trip(dt, h):
    ...

# Feature: business-day-cutoff, Property 2: 营业月与营业日一致性
# Feature: business-day-cutoff, Property 3: 营业周与营业日一致性
# Feature: business-day-cutoff, Property 4: 营业日归属单调性
# Feature: business-day-cutoff, Property 5: 时间范围长度不变量
# Feature: business-day-cutoff, Property 6: SQL 表达式生成幂等性
# Feature: business-day-cutoff, Property 7: 非法配置值拒绝
# Feature: business-day-cutoff, Property 8: 合法配置值正确加载

单元测试

单元测试覆盖属性测试不适合的场景:

  • 边界示例day_start_hour=807:59:59 归属前一天08:00:00 归属当天
  • 默认值行为BUSINESS_DAY_START_HOUR 缺失时 AppConfig 返回 8
  • API 端点/api/config/business-day 返回正确 JSON 格式
  • SQL 表达式格式biz_date_sql_expr("pay_time", 8) 返回 DATE(pay_time - INTERVAL '8 hours')
  • 月末边界1月31日 07:00 归属1月30日营业日business_month 返回1月1日

测试配置

  • 属性测试库:hypothesis(已在项目 pyproject.toml 中声明)
  • 每个属性测试对应设计文档中的一个 Property由单个 @given 装饰的测试函数实现
  • 运行命令:cd C:\NeoZQYY && pytest tests/test_property_business_day_cutoff.py -v