# -*- coding: utf-8 -*- """ Feature: etl-staff-dimension, Property 3: ODS 列名提取一致性 **Validates: Requirements 1.3** 对于任意 API 返回的员工记录(含驼峰和蛇形混合字段名),经 BaseOdsTask 处理后: - 所有字段名转为小写蛇形(_get_value_case_insensitive 大小写不敏感匹配) - id 字段不为空且为正整数 - payload 字段包含完整原始 JSON 验证方式:hypothesis 属性测试,生成随机员工记录验证转换一致性。 """ from __future__ import annotations import json import logging import os import sys from pathlib import Path from hypothesis import given, settings, HealthCheck import hypothesis.strategies as st _ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu" if str(_ETL_ROOT) not in sys.path: sys.path.insert(0, str(_ETL_ROOT)) os.environ.setdefault("ETL_SKIP_DOTENV", "1") from tasks.ods.ods_tasks import ODS_TASK_CLASSES, BaseOdsTask # 将 tests/unit 加入 path 以使用 FakeDB/FakeAPI _UNIT_TEST_ROOT = _ETL_ROOT / "tests" / "unit" if str(_UNIT_TEST_ROOT) not in sys.path: sys.path.insert(0, str(_UNIT_TEST_ROOT)) from task_test_utils import create_test_config, get_db_operations, FakeAPIClient # -- 策略:生成随机员工记录 -- # API 返回的字段名混合驼峰和蛇形 _STAFF_RECORD = st.fixed_dictionaries({ "id": st.integers(min_value=1, max_value=2**53), "staff_name": st.text(min_size=1, max_size=10), "mobile": st.from_regex(r"1[3-9]\d{9}", fullmatch=True), "job": st.sampled_from(["店长", "主管", "教练", "收银员", "助教管理员"]), "staff_identity": st.integers(min_value=0, max_value=5), "status": st.integers(min_value=0, max_value=2), "leave_status": st.integers(min_value=0, max_value=2), "site_id": st.integers(min_value=1, max_value=2**53), "tenant_id": st.integers(min_value=1, max_value=2**53), # 驼峰字段(API 实际返回的格式) "cashierPointId": st.integers(min_value=0, max_value=2**53), "cashierPointName": st.text(min_size=0, max_size=20), "groupName": st.text(min_size=0, max_size=20), "groupId": st.integers(min_value=0, max_value=2**53), "rankName": st.text(min_size=0, max_size=10), "userRoles": st.just([]), "gender": st.integers(min_value=0, max_value=3), "is_delete": st.just(0), }) @given(record=_STAFF_RECORD) @settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow]) def test_staff_record_field_case_insensitive_lookup(record): """P3(a): _get_value_case_insensitive 能正确匹配驼峰和蛇形字段。""" # 驼峰字段应能通过小写列名查找到 assert BaseOdsTask._get_value_case_insensitive(record, "cashierpointid") == record["cashierPointId"] assert BaseOdsTask._get_value_case_insensitive(record, "groupname") == record["groupName"] assert BaseOdsTask._get_value_case_insensitive(record, "groupid") == record["groupId"] assert BaseOdsTask._get_value_case_insensitive(record, "rankname") == record["rankName"] assert BaseOdsTask._get_value_case_insensitive(record, "userroles") == record["userRoles"] # 蛇形字段直接匹配 assert BaseOdsTask._get_value_case_insensitive(record, "id") == record["id"] assert BaseOdsTask._get_value_case_insensitive(record, "staff_name") == record["staff_name"] assert BaseOdsTask._get_value_case_insensitive(record, "mobile") == record["mobile"] @given(record=_STAFF_RECORD) @settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow]) def test_staff_record_id_positive_integer(record): """P3(b): id 字段始终为正整数。""" val = BaseOdsTask._get_value_case_insensitive(record, "id") assert val is not None assert isinstance(val, int) assert val > 0 @given(record=_STAFF_RECORD) @settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow]) def test_staff_record_payload_preserves_original(record): """P3(c): payload 序列化后包含原始记录的所有键。""" payload_str = json.dumps(record, ensure_ascii=False) for key in record: assert key in payload_str def test_staff_ingest_payload_roundtrip(tmp_path): """P3(d): 端到端验证 ODS 落地后 payload 包含完整原始 JSON。""" config = create_test_config("ONLINE", tmp_path / "archive", tmp_path / "temp") sample = [ { "id": 9999999999999, "staff_name": "测试员工", "mobile": "13900000001", "cashierPointId": 12345, "cashierPointName": "默认收银台", "groupName": "A组", "groupId": 100, "rankName": "初级", "userRoles": [{"roleId": 1}], "gender": 1, "is_delete": 0, "status": 1, "staff_identity": 2, "site_id": 1001, "tenant_id": 2001, } ] api = FakeAPIClient({"/PersonnelManagement/SearchSystemStaffInfo": sample}) task_cls = ODS_TASK_CLASSES["ODS_STAFF_INFO"] with get_db_operations() as db_ops: task = task_cls(config, db_ops, api, logging.getLogger("test_p3")) result = task.execute() assert result["status"] == "SUCCESS" row = db_ops.upserts[0]["rows"][0] payload = json.loads(row["payload"]) # payload 保留原始键名(含驼峰) assert payload["cashierPointId"] == 12345 assert payload["groupName"] == "A组" assert payload["id"] == 9999999999999