Files
feiqiu-ETL/etl_billiards/tests/unit/test_etl_tasks_offline.py
2025-11-19 03:36:44 +08:00

40 lines
1.3 KiB
Python

# -*- coding: utf-8 -*-
"""离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。"""
import logging
from pathlib import Path
import pytest
from .task_test_utils import (
TASK_SPECS,
OfflineAPIClient,
create_test_config,
dump_offline_payload,
get_db_operations,
)
@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code)
def test_task_offline_mode(spec, tmp_path):
"""确保每个任务都能读取归档 JSON 并完成 Transform + Load 操作。"""
archive_dir = tmp_path / "archive"
temp_dir = tmp_path / "tmp"
archive_dir.mkdir()
temp_dir.mkdir()
file_path = dump_offline_payload(spec, archive_dir)
config = create_test_config("OFFLINE", archive_dir, temp_dir)
offline_api = OfflineAPIClient({spec.endpoint: Path(file_path)})
logger = logging.getLogger(f"test_offline_{spec.code.lower()}")
with get_db_operations() as db_ops:
task = spec.task_cls(config, db_ops, offline_api, logger)
result = task.execute()
assert result["status"] == "SUCCESS"
assert result["counts"]["fetched"] == len(spec.sample_records)
assert result["counts"]["inserted"] == len(spec.sample_records)
if hasattr(db_ops, "commits"):
assert db_ops.commits == 1
assert db_ops.rollbacks == 0