264 lines
10 KiB
Python
264 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Feature: dataflow-field-completion, Property 7: 计时器记录完整性
|
||
|
||
**Validates: Requirements 15.2**
|
||
|
||
对于任意 ETL 步骤序列,计时器输出应包含每个步骤的名称、开始时间、结束时间和耗时,
|
||
且耗时等于结束时间减去开始时间。
|
||
|
||
测试策略:
|
||
- 使用 hypothesis 生成随机步骤名称列表(1-10 个步骤)
|
||
- 每个步骤可选包含 0-5 个子步骤
|
||
- 验证属性:
|
||
1. 每个步骤的 to_dict() 输出包含 name、start_time、end_time、elapsed_ms
|
||
2. elapsed_ms ≈ (end_time - start_time) 的毫秒数(允许 ±50ms 误差)
|
||
3. 所有步骤名称都出现在 timer.steps 中
|
||
4. timer.to_dict() 的 steps 数量等于实际添加的步骤数
|
||
5. 子步骤的 elapsed_ms 也满足上述一致性
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from hypothesis import given, settings, HealthCheck
|
||
import hypothesis.strategies as st
|
||
|
||
# ── 将 ETL 模块加入 sys.path ──
|
||
_ETL_ROOT = (
|
||
Path(__file__).resolve().parent.parent
|
||
/ "apps" / "etl" / "connectors" / "feiqiu"
|
||
)
|
||
if str(_ETL_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_ETL_ROOT))
|
||
|
||
from utils.timer import EtlTimer
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Hypothesis 策略
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
# 步骤名称:ASCII 字母 + 数字 + 下划线,模拟真实 ETL 任务名
|
||
_step_name = st.text(
|
||
alphabet=st.sampled_from("ABCDEFGHIJKLMNOPQRSTUVWXYZ_0123456789"),
|
||
min_size=3,
|
||
max_size=20,
|
||
)
|
||
|
||
# 子步骤名称
|
||
_sub_step_name = st.text(
|
||
alphabet=st.sampled_from("abcdefghijklmnopqrstuvwxyz_0123456789"),
|
||
min_size=2,
|
||
max_size=15,
|
||
)
|
||
|
||
|
||
@st.composite
|
||
def _step_spec(draw):
|
||
"""生成一个步骤规格:(步骤名, [子步骤名列表])"""
|
||
name = draw(_step_name)
|
||
# 0-5 个子步骤,名称唯一
|
||
sub_names = draw(
|
||
st.lists(_sub_step_name, min_size=0, max_size=5, unique=True)
|
||
)
|
||
return (name, sub_names)
|
||
|
||
|
||
# 生成 1-10 个步骤,步骤名唯一
|
||
_steps_strategy = st.lists(
|
||
_step_spec(),
|
||
min_size=1,
|
||
max_size=10,
|
||
).filter(
|
||
# 确保步骤名唯一
|
||
lambda specs: len(set(s[0] for s in specs)) == len(specs)
|
||
)
|
||
|
||
# 允许的时间误差(毫秒)——perf_counter 与 datetime.now 之间存在微小差异
|
||
_TOLERANCE_MS = 50.0
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# 辅助函数
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
def _run_timer(step_specs: list[tuple[str, list[str]]]) -> EtlTimer:
|
||
"""按给定步骤规格运行计时器,返回完成后的 timer 实例"""
|
||
timer = EtlTimer()
|
||
timer.start()
|
||
|
||
for step_name, sub_names in step_specs:
|
||
timer.start_step(step_name)
|
||
for sub_name in sub_names:
|
||
timer.start_sub_step(step_name, sub_name)
|
||
# 极短暂停,确保 start/end 时间有差异
|
||
time.sleep(0.001)
|
||
timer.stop_sub_step(step_name, sub_name)
|
||
time.sleep(0.001)
|
||
timer.stop_step(step_name)
|
||
|
||
timer.finish(write_report=False)
|
||
return timer
|
||
|
||
|
||
def _parse_iso(iso_str: str) -> datetime:
|
||
"""解析 ISO 格式时间字符串"""
|
||
return datetime.fromisoformat(iso_str)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Property 7a: to_dict() 输出包含必要字段
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
@given(step_specs=_steps_strategy)
|
||
@settings(
|
||
max_examples=100,
|
||
deadline=None,
|
||
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
|
||
)
|
||
def test_step_dict_contains_required_fields(step_specs):
|
||
"""
|
||
**Validates: Requirements 15.2**
|
||
|
||
每个步骤的 to_dict() 输出必须包含 name、start_time、end_time、elapsed_ms 字段。
|
||
"""
|
||
timer = _run_timer(step_specs)
|
||
|
||
for step in timer.steps:
|
||
d = step.to_dict()
|
||
for key in ("name", "start_time", "end_time", "elapsed_ms"):
|
||
assert key in d, f"步骤 {step.name} 的 to_dict() 缺少字段: {key}"
|
||
# start_time 和 end_time 不为 None
|
||
assert d["start_time"] is not None, f"步骤 {step.name} 的 start_time 为 None"
|
||
assert d["end_time"] is not None, f"步骤 {step.name} 的 end_time 为 None"
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Property 7b: elapsed_ms ≈ (end_time - start_time)
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
@given(step_specs=_steps_strategy)
|
||
@settings(
|
||
max_examples=100,
|
||
deadline=None,
|
||
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
|
||
)
|
||
def test_step_elapsed_ms_consistent_with_timestamps(step_specs):
|
||
"""
|
||
**Validates: Requirements 15.2**
|
||
|
||
每个步骤的 elapsed_ms 应近似等于 (end_time - start_time) 的毫秒数。
|
||
允许 ±50ms 误差(perf_counter 与 datetime.now 的微小差异)。
|
||
"""
|
||
timer = _run_timer(step_specs)
|
||
|
||
for step in timer.steps:
|
||
d = step.to_dict()
|
||
start_dt = _parse_iso(d["start_time"])
|
||
end_dt = _parse_iso(d["end_time"])
|
||
wall_ms = (end_dt - start_dt).total_seconds() * 1000
|
||
elapsed_ms = d["elapsed_ms"]
|
||
|
||
diff = abs(elapsed_ms - wall_ms)
|
||
assert diff <= _TOLERANCE_MS, (
|
||
f"步骤 {step.name}: elapsed_ms={elapsed_ms:.3f} 与 "
|
||
f"wall_clock_ms={wall_ms:.3f} 差异 {diff:.3f}ms 超过容差 {_TOLERANCE_MS}ms"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Property 7c: 所有步骤名称都出现在 timer.steps 中
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
@given(step_specs=_steps_strategy)
|
||
@settings(
|
||
max_examples=100,
|
||
deadline=None,
|
||
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
|
||
)
|
||
def test_all_step_names_present(step_specs):
|
||
"""
|
||
**Validates: Requirements 15.2**
|
||
|
||
所有添加的步骤名称都必须出现在 timer.steps 列表中。
|
||
"""
|
||
timer = _run_timer(step_specs)
|
||
recorded_names = {s.name for s in timer.steps}
|
||
|
||
for step_name, _ in step_specs:
|
||
assert step_name in recorded_names, (
|
||
f"步骤 {step_name!r} 未出现在 timer.steps 中"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Property 7d: timer.to_dict() 的 steps 数量等于实际步骤数
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
@given(step_specs=_steps_strategy)
|
||
@settings(
|
||
max_examples=100,
|
||
deadline=None,
|
||
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
|
||
)
|
||
def test_timer_dict_step_count_matches(step_specs):
|
||
"""
|
||
**Validates: Requirements 15.2**
|
||
|
||
timer.to_dict() 输出的 steps 数量应等于实际添加的步骤数。
|
||
"""
|
||
timer = _run_timer(step_specs)
|
||
timer_dict = timer.to_dict()
|
||
|
||
assert len(timer_dict["steps"]) == len(step_specs), (
|
||
f"to_dict() steps 数量 {len(timer_dict['steps'])} "
|
||
f"!= 实际步骤数 {len(step_specs)}"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════
|
||
# Property 7e: 子步骤的 elapsed_ms 也满足一致性
|
||
# ══════════════════════════════════════════════════════════════════
|
||
|
||
@given(step_specs=_steps_strategy)
|
||
@settings(
|
||
max_examples=100,
|
||
deadline=None,
|
||
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
|
||
)
|
||
def test_sub_step_elapsed_ms_consistent(step_specs):
|
||
"""
|
||
**Validates: Requirements 15.2**
|
||
|
||
子步骤的 elapsed_ms 也应近似等于 (end_time - start_time) 的毫秒数,
|
||
且 to_dict() 输出包含必要字段。
|
||
"""
|
||
timer = _run_timer(step_specs)
|
||
|
||
for step in timer.steps:
|
||
step_dict = step.to_dict()
|
||
for child_dict in step_dict["children"]:
|
||
# 必要字段检查
|
||
for key in ("name", "start_time", "end_time", "elapsed_ms"):
|
||
assert key in child_dict, (
|
||
f"子步骤 {child_dict.get('name', '?')} 的 to_dict() 缺少字段: {key}"
|
||
)
|
||
|
||
if child_dict["end_time"] is None:
|
||
continue
|
||
|
||
start_dt = _parse_iso(child_dict["start_time"])
|
||
end_dt = _parse_iso(child_dict["end_time"])
|
||
wall_ms = (end_dt - start_dt).total_seconds() * 1000
|
||
elapsed_ms = child_dict["elapsed_ms"]
|
||
|
||
diff = abs(elapsed_ms - wall_ms)
|
||
assert diff <= _TOLERANCE_MS, (
|
||
f"子步骤 {child_dict['name']}: elapsed_ms={elapsed_ms:.3f} 与 "
|
||
f"wall_clock_ms={wall_ms:.3f} 差异 {diff:.3f}ms 超过容差 {_TOLERANCE_MS}ms"
|
||
)
|