# -*- coding: utf-8 -*- """ Feature: dataflow-field-completion, Property 1: FACT_MAPPINGS 字段映射正确性 **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 对于任意 ODS 表行和任意已配置的 FACT_MAPPINGS 条目 (dwd_col, ods_expr, cast_type), 当 DWD 加载任务执行后,DWD 目标行中 dwd_col 列的值应等于从 ODS 行中按 ods_expr 提取并按 cast_type 转换后的值。 本测试聚焦 A 类表(新增 DWD 列 + FACT_MAPPINGS): - dim_assistant_ex - dwd_assistant_service_log_ex - dwd_store_goods_sale - dwd_member_balance_change_ex - dim_table_ex """ from __future__ import annotations import re import sys from pathlib import Path from typing import Any from unittest.mock import MagicMock from hypothesis import given, settings, assume, HealthCheck import hypothesis.strategies as st # ── 将 ETL 模块加入 sys.path ── _ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu" if str(_ETL_ROOT) not in sys.path: sys.path.insert(0, str(_ETL_ROOT)) from tasks.dwd.dwd_load_task import DwdLoadTask # ── A 类表列表 ── A_CLASS_TABLES = [ "dwd.dim_assistant_ex", "dwd.dwd_assistant_service_log_ex", "dwd.dwd_store_goods_sale", "dwd.dwd_member_balance_change_ex", "dwd.dim_table_ex", ] # ── 辅助:构造最小可用的 DwdLoadTask 实例 ── def _make_task() -> DwdLoadTask: """构造一个用于测试的 DwdLoadTask,使用 mock config/db/api/logger。""" config = MagicMock() config.get = lambda key, default=None: { "app.store_id": 1, "app.timezone": "Asia/Shanghai", "dwd.fact_upsert": True, }.get(key, default) db = MagicMock() api = MagicMock() logger = MagicMock() return DwdLoadTask(config, db, api, logger) # ── 收集 A 类表的所有 FACT_MAPPINGS 条目 ── def _collect_a_class_mappings() -> list[tuple[str, str, str, str | None]]: """返回 (dwd_table, dwd_col, ods_expr, cast_type) 四元组列表。""" result = [] for table in A_CLASS_TABLES: entries = DwdLoadTask.FACT_MAPPINGS.get(table, []) for dwd_col, ods_expr, cast_type in entries: result.append((table, dwd_col, ods_expr, cast_type)) return result _A_CLASS_MAPPING_ENTRIES = _collect_a_class_mappings() # ── 已知的合法 cast_type 值 ── _VALID_CAST_TYPES = { None, "bigint", "integer", "numeric", "decimal", "timestamptz", "boolean", "date", "text", } # ══════════════════════════════════════════════════════════════════ # Property 1.1: A 类表 FACT_MAPPINGS 条目结构完整性 # ══════════════════════════════════════════════════════════════════ def test_a_class_tables_have_fact_mappings(): """每张 A 类表在 FACT_MAPPINGS 中都有至少一个条目。""" for table in A_CLASS_TABLES: entries = DwdLoadTask.FACT_MAPPINGS.get(table, []) assert len(entries) > 0, f"{table} 在 FACT_MAPPINGS 中无条目" def test_a_class_mappings_are_valid_tuples(): """每个 FACT_MAPPINGS 条目都是 (dwd_col, ods_expr, cast_type) 三元组。""" for table, dwd_col, ods_expr, cast_type in _A_CLASS_MAPPING_ENTRIES: assert isinstance(dwd_col, str) and dwd_col, \ f"{table}: dwd_col 不能为空" assert isinstance(ods_expr, str) and ods_expr, \ f"{table}: ods_expr 不能为空" assert cast_type is None or isinstance(cast_type, str), \ f"{table}.{dwd_col}: cast_type 必须为 None 或字符串" def test_a_class_cast_types_are_valid(): """所有 cast_type 值都在已知合法集合内。""" for table, dwd_col, _, cast_type in _A_CLASS_MAPPING_ENTRIES: assert cast_type in _VALID_CAST_TYPES, \ f"{table}.{dwd_col}: 未知 cast_type={cast_type!r}" def test_a_class_no_duplicate_dwd_cols(): """同一张 DWD 表内不应有重复的 dwd_col。""" for table in A_CLASS_TABLES: entries = DwdLoadTask.FACT_MAPPINGS.get(table, []) dwd_cols = [e[0] for e in entries] seen = set() for col in dwd_cols: assert col not in seen, \ f"{table}: dwd_col={col!r} 重复出现" seen.add(col) # ══════════════════════════════════════════════════════════════════ # Property 1.2: _cast_expr 对 A 类表映射条目的转换正确性 # ══════════════════════════════════════════════════════════════════ # 生成策略:从 A 类表映射条目中随机选取 _mapping_entry_strategy = st.sampled_from(_A_CLASS_MAPPING_ENTRIES) # 生成策略:模拟 ODS 列值(用于验证 _cast_expr 的 SQL 表达式结构) _ods_value_strategy = st.one_of( st.none(), st.integers(min_value=-999999, max_value=999999), st.text(min_size=0, max_size=50, alphabet=st.characters( whitelist_categories=("L", "N", "P", "Z"), blacklist_characters=("\x00",), )), st.floats(min_value=-1e6, max_value=1e6, allow_nan=False, allow_infinity=False), ) @given(entry=_mapping_entry_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.function_scoped_fixture]) def test_cast_expr_produces_valid_sql_for_a_class(entry): """ **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 对于任意 A 类表 FACT_MAPPINGS 条目,_cast_expr 应产生非空的 SQL 表达式。 """ table, dwd_col, ods_expr, cast_type = entry task = _make_task() result = task._cast_expr(ods_expr, cast_type) # 基本断言:结果非空 assert result and isinstance(result, str), \ f"{table}.{dwd_col}: _cast_expr 返回空结果" # 结果应包含 ODS 源表达式(可能被引号包裹或 CAST 包裹) # 对于简单列名,应出现在结果中(带引号或不带) if ods_expr.upper() != "NULL": # 去掉引号后的 ods_expr 应在结果中可找到 bare_expr = ods_expr.strip('"') assert bare_expr in result or ods_expr in result, \ f"{table}.{dwd_col}: _cast_expr 结果 {result!r} 中未包含 ODS 表达式 {ods_expr!r}" # 如果有 cast_type,结果应包含类型转换语法 if cast_type: cast_lower = cast_type.lower() if cast_lower in {"bigint", "integer", "numeric", "decimal"}: assert "CAST" in result.upper() or "::" in result, \ f"{table}.{dwd_col}: 数值类型转换缺少 CAST/:: 语法" elif cast_lower == "timestamptz": assert "timestamptz" in result.lower(), \ f"{table}.{dwd_col}: 时间类型转换缺少 timestamptz" elif cast_lower == "boolean": assert "boolean" in result.lower(), \ f"{table}.{dwd_col}: 布尔类型转换缺少 boolean" @given(entry=_mapping_entry_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.function_scoped_fixture]) def test_cast_expr_is_deterministic(entry): """ **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 对于同一 FACT_MAPPINGS 条目,_cast_expr 的输出应是确定性的(多次调用结果一致)。 """ _, _, ods_expr, cast_type = entry task = _make_task() result1 = task._cast_expr(ods_expr, cast_type) result2 = task._cast_expr(ods_expr, cast_type) assert result1 == result2, "同一输入的 _cast_expr 结果不一致" # ══════════════════════════════════════════════════════════════════ # Property 1.3: _build_column_mapping 对 A 类表的映射注册正确性 # ══════════════════════════════════════════════════════════════════ @given(table_name=st.sampled_from(A_CLASS_TABLES)) @settings(max_examples=50, suppress_health_check=[HealthCheck.function_scoped_fixture]) def test_build_column_mapping_registers_all_explicit_entries(table_name): """ **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 对于任意 A 类表,_build_column_mapping 应将 FACT_MAPPINGS 中的所有条目 注册到返回的映射字典中。 """ task = _make_task() entries = DwdLoadTask.FACT_MAPPINGS.get(table_name, []) ods_table = DwdLoadTask.TABLE_MAP.get(table_name, "") # 构造 mock cursor,返回包含 fetched_at 的列信息 mock_cur = MagicMock() # _get_columns 内部查 information_schema,这里直接 mock _build_column_mapping 的输入 # 收集所有 ODS 列名(从 FACT_MAPPINGS 的 ods_expr 中提取简单列名) ods_cols = ["fetched_at", "id", "site_id", "tenant_id"] for _, ods_expr, _ in entries: # 简单列名直接加入;复杂表达式(含 -> 或 CASE)跳过 bare = ods_expr.strip('"') if bare.isidentifier(): ods_cols.append(bare) pk_cols = ["id"] # 简化:假设主键为 id mapping = task._build_column_mapping(mock_cur, table_name, ods_table, pk_cols, ods_cols) # 如果返回的是错误字典(缺少 fetched_at),跳过 if "processed" in mapping: return # 验证所有显式映射条目都被注册 for dwd_col, ods_expr, cast_type in entries: dwd_col_lower = dwd_col.lower() assert dwd_col_lower in mapping, \ f"{table_name}: FACT_MAPPINGS 条目 {dwd_col!r} 未被注册到映射中" src, ct = mapping[dwd_col_lower] assert src == ods_expr, \ f"{table_name}.{dwd_col}: 映射源应为 {ods_expr!r},实际为 {src!r}" assert ct == cast_type, \ f"{table_name}.{dwd_col}: cast_type 应为 {cast_type!r},实际为 {ct!r}" # ══════════════════════════════════════════════════════════════════ # Property 1.4: A 类表特定字段映射验证(需求级别) # ══════════════════════════════════════════════════════════════════ # 需求 1: assistant_accounts_master → dim_assistant_ex _REQ1_EXPECTED = { "system_role_id": ("system_role_id", None), "job_num": ("job_num", None), "cx_unit_price": ("cx_unit_price", None), "pd_unit_price": ("pd_unit_price", None), } # 需求 2: assistant_service_records → dwd_assistant_service_log_ex _REQ2_EXPECTED = { "operator_id": ("operator_id", None), "operator_name": ("operator_name", None), } # 需求 4: store_goods_sales_records → dwd_store_goods_sale _REQ4_EXPECTED = { "discount_money": ("discount_money", None), "discount_price": ("discount_price", None), } # 需求 5: member_balance_changes → dwd_member_balance_change_ex _REQ5_EXPECTED = { "relate_id": ("relate_id", None), } # 需求 9: site_tables_master → dim_table_ex _REQ9_EXPECTED = { "create_time": ("create_time", None), "light_status": ("light_status", None), "tablestatusname": ("tablestatusname", None), "sitename": ("sitename", None), "applet_qr_code_url": ('"appletQrCodeUrl"', None), "audit_status": ("audit_status", None), "charge_free": ("charge_free", None), "delay_lights_time": ("delay_lights_time", None), "is_rest_area": ("is_rest_area", None), "only_allow_groupon": ("only_allow_groupon", None), "order_delay_time": ("order_delay_time", None), "self_table": ("self_table", None), "temporary_light_second": ("temporary_light_second", None), "virtual_table": ("virtual_table", None), } # 汇总:(DWD 表, 期望映射字典, 需求编号) _REQUIREMENT_CHECKS = [ ("dwd.dim_assistant_ex", _REQ1_EXPECTED, "1.1, 1.2"), ("dwd.dwd_assistant_service_log_ex", _REQ2_EXPECTED, "2.1"), ("dwd.dwd_store_goods_sale", _REQ4_EXPECTED, "4.1"), ("dwd.dwd_member_balance_change_ex", _REQ5_EXPECTED, "5.1"), ("dwd.dim_table_ex", _REQ9_EXPECTED, "9.1"), ] @given(check=st.sampled_from(_REQUIREMENT_CHECKS)) @settings(max_examples=50, suppress_health_check=[HealthCheck.function_scoped_fixture]) def test_requirement_specific_mappings_exist(check): """ **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 对于每个需求指定的字段映射,验证 FACT_MAPPINGS 中确实包含正确的条目。 """ dwd_table, expected_mappings, req_ids = check entries = DwdLoadTask.FACT_MAPPINGS.get(dwd_table, []) # 构建实际映射字典:dwd_col -> (ods_expr, cast_type) actual = {e[0].lower(): (e[1], e[2]) for e in entries} for dwd_col, (expected_src, expected_cast) in expected_mappings.items(): assert dwd_col in actual, \ f"[Req {req_ids}] {dwd_table}: 缺少 dwd_col={dwd_col!r} 的映射条目" actual_src, actual_cast = actual[dwd_col] assert actual_src == expected_src, \ f"[Req {req_ids}] {dwd_table}.{dwd_col}: ODS 源应为 {expected_src!r},实际为 {actual_src!r}" assert actual_cast == expected_cast, \ f"[Req {req_ids}] {dwd_table}.{dwd_col}: cast_type 应为 {expected_cast!r},实际为 {actual_cast!r}" # ══════════════════════════════════════════════════════════════════ # Property 1.5: A 类表 FACT_MAPPINGS 与 TABLE_MAP 一致性 # ══════════════════════════════════════════════════════════════════ @given(table_name=st.sampled_from(A_CLASS_TABLES)) @settings(max_examples=50, suppress_health_check=[HealthCheck.function_scoped_fixture]) def test_a_class_tables_registered_in_table_map(table_name): """ **Validates: Requirements 1.1, 1.2, 2.1, 4.1, 5.1, 9.1** 每张 A 类表必须同时在 TABLE_MAP 和 FACT_MAPPINGS 中注册。 """ assert table_name in DwdLoadTask.TABLE_MAP, \ f"{table_name} 未在 TABLE_MAP 中注册" assert table_name in DwdLoadTask.FACT_MAPPINGS, \ f"{table_name} 未在 FACT_MAPPINGS 中注册" # TABLE_MAP 的 ODS 源表应为非空字符串 ods_table = DwdLoadTask.TABLE_MAP[table_name] assert ods_table and isinstance(ods_table, str), \ f"{table_name}: TABLE_MAP 中的 ODS 表名无效"