# -*- coding: utf-8 -*- """ Feature: dataflow-field-completion, Property 5: ETL 参数解析与 CLI 命令构建正确性 **Validates: Requirements 14.1, 14.2** 对于任意合法的 ETL 执行参数组合(门店列表、数据源模式、校验模式、时间范围、 窗口切分、force-full 标志、任务选择),Backend 构建的 CLI 命令字符串应包含 所有指定参数,且参数值与输入一致。 测试策略: - 使用 hypothesis 生成随机 TaskConfigSchema 实例 - 随机 flow(从 VALID_FLOWS 中选择) - 随机 processing_mode(从 VALID_PROCESSING_MODES 中选择) - 随机任务代码列表(从 task_registry 中选择) - 随机时间窗口模式(lookback / custom) - 随机 window_split 和 window_split_days - 随机 force_full / dry_run / fetch_before_verify 布尔值 - 随机 store_id - 随机 ods_use_local_json 验证: 1. 构建的 CLI 命令包含 --flow 且值与 flow 一致 2. 任务代码通过 --tasks 正确传递 3. 时间范围参数格式正确且值一致 4. 布尔标志(--force-full / --dry-run / --fetch-before-verify)正确出现或缺失 5. --store-id 值与输入一致 6. --window-split / --window-split-days 正确传递 7. --data-source offline 在 ods_use_local_json=True 时出现 """ from __future__ import annotations import sys from pathlib import Path from hypothesis import given, settings, HealthCheck, assume import hypothesis.strategies as st # ── 将后端模块加入 sys.path ── _BACKEND_ROOT = Path(__file__).resolve().parent.parent / "apps" / "backend" if str(_BACKEND_ROOT) not in sys.path: sys.path.insert(0, str(_BACKEND_ROOT)) from app.services.cli_builder import CLIBuilder, VALID_FLOWS, VALID_PROCESSING_MODES from app.schemas.tasks import TaskConfigSchema from app.services.task_registry import ALL_TASKS # ══════════════════════════════════════════════════════════════════ # 常量与策略 # ══════════════════════════════════════════════════════════════════ # 所有合法任务代码 _ALL_TASK_CODES: list[str] = [t.code for t in ALL_TASKS] # 合法的 flow 值 _VALID_FLOWS_LIST = sorted(VALID_FLOWS) # 合法的 processing_mode 值 _VALID_MODES_LIST = sorted(VALID_PROCESSING_MODES) # 合法的 window_split 值(CLI 支持的切分模式) _VALID_WINDOW_SPLITS = ["none", "day", "week", "month"] # 日期格式策略:YYYY-MM-DD _date_str = st.dates( min_value=st.just(2024, 1, 1).__wrapped__ if False else __import__("datetime").date(2024, 1, 1), max_value=__import__("datetime").date(2026, 12, 31), ).map(lambda d: d.isoformat()) @st.composite def _valid_config(draw) -> TaskConfigSchema: """生成一个合法的 TaskConfigSchema 实例""" # 随机选择 1-5 个任务代码 tasks = draw(st.lists( st.sampled_from(_ALL_TASK_CODES), min_size=1, max_size=5, unique=True, )) flow_id = draw(st.sampled_from(_VALID_FLOWS_LIST)) processing_mode = draw(st.sampled_from(_VALID_MODES_LIST)) # 时间窗口模式 window_mode = draw(st.sampled_from(["lookback", "custom"])) window_start = None window_end = None lookback_hours = 24 overlap_seconds = 600 if window_mode == "custom": # 生成合法的 start <= end 日期对 start = draw(_date_str) end = draw(_date_str) if start > end: start, end = end, start window_start = start window_end = end else: lookback_hours = draw(st.integers(min_value=1, max_value=720)) overlap_seconds = draw(st.integers(min_value=0, max_value=7200)) # 窗口切分 window_split = draw(st.sampled_from(_VALID_WINDOW_SPLITS)) window_split_days = None if window_split != "none": window_split_days = draw(st.integers(min_value=1, max_value=30)) # 布尔标志 force_full = draw(st.booleans()) dry_run = draw(st.booleans()) fetch_before_verify = draw(st.booleans()) ods_use_local_json = draw(st.booleans()) # store_id:可能为 None 或正整数 store_id = draw(st.one_of(st.none(), st.integers(min_value=1, max_value=999999))) return TaskConfigSchema( tasks=tasks, flow=flow_id, processing_mode=processing_mode, window_mode=window_mode, window_start=window_start, window_end=window_end, lookback_hours=lookback_hours, overlap_seconds=overlap_seconds, window_split=window_split, window_split_days=window_split_days, force_full=force_full, dry_run=dry_run, fetch_before_verify=fetch_before_verify, ods_use_local_json=ods_use_local_json, store_id=store_id, ) # 全局 CLIBuilder 实例 _builder = CLIBuilder() _ETL_PATH = "apps/etl/connectors/feiqiu" def _build(config: TaskConfigSchema) -> list[str]: """便捷包装:构建命令列表""" return _builder.build_command(config, _ETL_PATH) def _get_arg_value(cmd: list[str], flag: str) -> str | None: """从命令列表中提取指定 flag 后面的值""" try: idx = cmd.index(flag) if idx + 1 < len(cmd): return cmd[idx + 1] except ValueError: pass return None def _has_flag(cmd: list[str], flag: str) -> bool: """检查命令列表中是否包含指定 flag""" return flag in cmd # ══════════════════════════════════════════════════════════════════ # Property 5a: --flow 参数与 flow 一致 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_flow_param_matches_flow(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** 构建的 CLI 命令必须包含 --flow 参数,且值与 config.flow 一致。 """ cmd = _build(config) flow_value = _get_arg_value(cmd, "--flow") assert flow_value is not None, "CLI 命令缺少 --flow 参数" assert flow_value == config.flow, ( f"--flow 值 {flow_value!r} != config.flow {config.flow!r}" ) # ══════════════════════════════════════════════════════════════════ # Property 5b: --tasks 参数包含所有任务代码 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_tasks_param_contains_all_codes(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** 构建的 CLI 命令中 --tasks 参数应包含所有指定的任务代码(逗号分隔)。 """ cmd = _build(config) tasks_value = _get_arg_value(cmd, "--tasks") if config.tasks: assert tasks_value is not None, "CLI 命令缺少 --tasks 参数" parsed_tasks = set(tasks_value.split(",")) expected_tasks = set(config.tasks) assert parsed_tasks == expected_tasks, ( f"--tasks 解析结果 {parsed_tasks} != 期望 {expected_tasks}" ) # tasks 为空列表时,CLIBuilder 不添加 --tasks(符合预期) # ══════════════════════════════════════════════════════════════════ # Property 5c: 时间窗口参数正确传递 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_time_window_params_correct(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** - lookback 模式:命令包含 --lookback-hours 和 --overlap-seconds - custom 模式:命令包含 --window-start 和 --window-end - 两种模式互斥 """ cmd = _build(config) if config.window_mode == "lookback": # lookback 模式:应有 --lookback-hours lh = _get_arg_value(cmd, "--lookback-hours") assert lh is not None, "lookback 模式缺少 --lookback-hours" assert lh == str(config.lookback_hours), ( f"--lookback-hours {lh!r} != {config.lookback_hours}" ) os_val = _get_arg_value(cmd, "--overlap-seconds") assert os_val is not None, "lookback 模式缺少 --overlap-seconds" assert os_val == str(config.overlap_seconds), ( f"--overlap-seconds {os_val!r} != {config.overlap_seconds}" ) # 不应有 custom 参数 assert not _has_flag(cmd, "--window-start"), ( "lookback 模式不应包含 --window-start" ) assert not _has_flag(cmd, "--window-end"), ( "lookback 模式不应包含 --window-end" ) else: # custom 模式 if config.window_start: ws = _get_arg_value(cmd, "--window-start") assert ws == config.window_start, ( f"--window-start {ws!r} != {config.window_start!r}" ) if config.window_end: we = _get_arg_value(cmd, "--window-end") assert we == config.window_end, ( f"--window-end {we!r} != {config.window_end!r}" ) # 不应有 lookback 参数 assert not _has_flag(cmd, "--lookback-hours"), ( "custom 模式不应包含 --lookback-hours" ) # ══════════════════════════════════════════════════════════════════ # Property 5d: 布尔标志正确出现或缺失 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_boolean_flags_correct(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** - force_full=True → 命令包含 --force-full - dry_run=True → 命令包含 --dry-run - fetch_before_verify=True 且 processing_mode="verify_only" → 命令包含 --fetch-before-verify - ods_use_local_json=True → 命令包含 --data-source offline """ cmd = _build(config) # force_full if config.force_full: assert _has_flag(cmd, "--force-full"), "force_full=True 但命令缺少 --force-full" else: assert not _has_flag(cmd, "--force-full"), "force_full=False 但命令包含 --force-full" # dry_run if config.dry_run: assert _has_flag(cmd, "--dry-run"), "dry_run=True 但命令缺少 --dry-run" else: assert not _has_flag(cmd, "--dry-run"), "dry_run=False 但命令包含 --dry-run" # fetch_before_verify(仅 verify_only 模式生效) if config.fetch_before_verify and config.processing_mode == "verify_only": assert _has_flag(cmd, "--fetch-before-verify"), ( "fetch_before_verify=True + verify_only 但命令缺少 --fetch-before-verify" ) else: assert not _has_flag(cmd, "--fetch-before-verify"), ( "非 verify_only 模式或 fetch_before_verify=False 但命令包含 --fetch-before-verify" ) # ods_use_local_json if config.ods_use_local_json: ds = _get_arg_value(cmd, "--data-source") assert ds == "offline", ( f"ods_use_local_json=True 但 --data-source={ds!r}(期望 'offline')" ) else: # 不应有 --data-source offline(除非 extra_args 中有 data_source) if "data_source" not in config.extra_args: ds = _get_arg_value(cmd, "--data-source") assert ds is None, ( f"ods_use_local_json=False 但命令包含 --data-source {ds!r}" ) # ══════════════════════════════════════════════════════════════════ # Property 5e: --store-id 正确传递 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_store_id_param_correct(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** store_id 不为 None 时,命令应包含 --store-id 且值一致; store_id 为 None 时,命令不应包含 --store-id。 """ cmd = _build(config) sid = _get_arg_value(cmd, "--store-id") if config.store_id is not None: assert sid is not None, "store_id 不为 None 但命令缺少 --store-id" assert sid == str(config.store_id), ( f"--store-id {sid!r} != {config.store_id}" ) else: assert sid is None, f"store_id=None 但命令包含 --store-id {sid!r}" # ══════════════════════════════════════════════════════════════════ # Property 5f: --window-split / --window-split-days 正确传递 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_window_split_params_correct(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** window_split 不为 "none" 时,命令应包含 --window-split 和 --window-split-days; window_split 为 "none" 时,命令不应包含这些参数。 """ cmd = _build(config) if config.window_split and config.window_split != "none": ws = _get_arg_value(cmd, "--window-split") assert ws == config.window_split, ( f"--window-split {ws!r} != {config.window_split!r}" ) if config.window_split_days is not None: wsd = _get_arg_value(cmd, "--window-split-days") assert wsd == str(config.window_split_days), ( f"--window-split-days {wsd!r} != {config.window_split_days}" ) else: assert not _has_flag(cmd, "--window-split"), ( "window_split='none' 但命令包含 --window-split" ) # ══════════════════════════════════════════════════════════════════ # Property 5g: --processing-mode 正确传递 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_processing_mode_param_correct(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** processing_mode 不为空时,命令应包含 --processing-mode 且值一致。 """ cmd = _build(config) if config.processing_mode: pm = _get_arg_value(cmd, "--processing-mode") assert pm is not None, "processing_mode 不为空但命令缺少 --processing-mode" assert pm == config.processing_mode, ( f"--processing-mode {pm!r} != {config.processing_mode!r}" ) # ══════════════════════════════════════════════════════════════════ # Property 5h: 命令字符串与命令列表一致 # ══════════════════════════════════════════════════════════════════ @given(config=_valid_config()) @settings( max_examples=100, deadline=None, suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow], ) def test_command_string_consistent_with_list(config: TaskConfigSchema): """ **Validates: Requirements 14.1, 14.2** build_command_string() 的输出应与 build_command() 的列表拼接结果一致 (对含空格的参数自动加引号)。 """ cmd_list = _builder.build_command(config, _ETL_PATH) cmd_str = _builder.build_command_string(config, _ETL_PATH) # 逐个参数验证:每个参数都应出现在字符串中 for arg in cmd_list: if " " in arg or '"' in arg: # 含空格的参数应被引号包裹 assert f'"{arg}"' in cmd_str, ( f"含空格参数 {arg!r} 未在命令字符串中被正确引用" ) else: assert arg in cmd_str, ( f"参数 {arg!r} 未出现在命令字符串中" )