Files
Neo-ZQYY/tests/test_property_staff_ods_column_consistency.py

136 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Feature: etl-staff-dimension, Property 3: ODS 列名提取一致性
**Validates: Requirements 1.3**
对于任意 API 返回的员工记录(含驼峰和蛇形混合字段名),经 BaseOdsTask 处理后:
- 所有字段名转为小写蛇形_get_value_case_insensitive 大小写不敏感匹配)
- id 字段不为空且为正整数
- payload 字段包含完整原始 JSON
验证方式hypothesis 属性测试,生成随机员工记录验证转换一致性。
"""
from __future__ import annotations
import json
import logging
import os
import sys
from pathlib import Path
from hypothesis import given, settings, HealthCheck
import hypothesis.strategies as st
_ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
if str(_ETL_ROOT) not in sys.path:
sys.path.insert(0, str(_ETL_ROOT))
os.environ.setdefault("ETL_SKIP_DOTENV", "1")
from tasks.ods.ods_tasks import ODS_TASK_CLASSES, BaseOdsTask
# 将 tests/unit 加入 path 以使用 FakeDB/FakeAPI
_UNIT_TEST_ROOT = _ETL_ROOT / "tests" / "unit"
if str(_UNIT_TEST_ROOT) not in sys.path:
sys.path.insert(0, str(_UNIT_TEST_ROOT))
from task_test_utils import create_test_config, get_db_operations, FakeAPIClient
# -- 策略:生成随机员工记录 --
# API 返回的字段名混合驼峰和蛇形
_STAFF_RECORD = st.fixed_dictionaries({
"id": st.integers(min_value=1, max_value=2**53),
"staff_name": st.text(min_size=1, max_size=10),
"mobile": st.from_regex(r"1[3-9]\d{9}", fullmatch=True),
"job": st.sampled_from(["店长", "主管", "教练", "收银员", "助教管理员"]),
"staff_identity": st.integers(min_value=0, max_value=5),
"status": st.integers(min_value=0, max_value=2),
"leave_status": st.integers(min_value=0, max_value=2),
"site_id": st.integers(min_value=1, max_value=2**53),
"tenant_id": st.integers(min_value=1, max_value=2**53),
# 驼峰字段API 实际返回的格式)
"cashierPointId": st.integers(min_value=0, max_value=2**53),
"cashierPointName": st.text(min_size=0, max_size=20),
"groupName": st.text(min_size=0, max_size=20),
"groupId": st.integers(min_value=0, max_value=2**53),
"rankName": st.text(min_size=0, max_size=10),
"userRoles": st.just([]),
"gender": st.integers(min_value=0, max_value=3),
"is_delete": st.just(0),
})
@given(record=_STAFF_RECORD)
@settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow])
def test_staff_record_field_case_insensitive_lookup(record):
"""P3(a): _get_value_case_insensitive 能正确匹配驼峰和蛇形字段。"""
# 驼峰字段应能通过小写列名查找到
assert BaseOdsTask._get_value_case_insensitive(record, "cashierpointid") == record["cashierPointId"]
assert BaseOdsTask._get_value_case_insensitive(record, "groupname") == record["groupName"]
assert BaseOdsTask._get_value_case_insensitive(record, "groupid") == record["groupId"]
assert BaseOdsTask._get_value_case_insensitive(record, "rankname") == record["rankName"]
assert BaseOdsTask._get_value_case_insensitive(record, "userroles") == record["userRoles"]
# 蛇形字段直接匹配
assert BaseOdsTask._get_value_case_insensitive(record, "id") == record["id"]
assert BaseOdsTask._get_value_case_insensitive(record, "staff_name") == record["staff_name"]
assert BaseOdsTask._get_value_case_insensitive(record, "mobile") == record["mobile"]
@given(record=_STAFF_RECORD)
@settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow])
def test_staff_record_id_positive_integer(record):
"""P3(b): id 字段始终为正整数。"""
val = BaseOdsTask._get_value_case_insensitive(record, "id")
assert val is not None
assert isinstance(val, int)
assert val > 0
@given(record=_STAFF_RECORD)
@settings(max_examples=50, suppress_health_check=[HealthCheck.too_slow])
def test_staff_record_payload_preserves_original(record):
"""P3(c): payload 序列化后包含原始记录的所有键。"""
payload_str = json.dumps(record, ensure_ascii=False)
for key in record:
assert key in payload_str
def test_staff_ingest_payload_roundtrip(tmp_path):
"""P3(d): 端到端验证 ODS 落地后 payload 包含完整原始 JSON。"""
config = create_test_config("ONLINE", tmp_path / "archive", tmp_path / "temp")
sample = [
{
"id": 9999999999999,
"staff_name": "测试员工",
"mobile": "13900000001",
"cashierPointId": 12345,
"cashierPointName": "默认收银台",
"groupName": "A组",
"groupId": 100,
"rankName": "初级",
"userRoles": [{"roleId": 1}],
"gender": 1,
"is_delete": 0,
"status": 1,
"staff_identity": 2,
"site_id": 1001,
"tenant_id": 2001,
}
]
api = FakeAPIClient({"/PersonnelManagement/SearchSystemStaffInfo": sample})
task_cls = ODS_TASK_CLASSES["ODS_STAFF_INFO"]
with get_db_operations() as db_ops:
task = task_cls(config, db_ops, api, logging.getLogger("test_p3"))
result = task.execute()
assert result["status"] == "SUCCESS"
row = db_ops.upserts[0]["rows"][0]
payload = json.loads(row["payload"])
# payload 保留原始键名(含驼峰)
assert payload["cashierPointId"] == 12345
assert payload["groupName"] == "A组"
assert payload["id"] == 9999999999999