Files
Neo-ZQYY/apps/backend/tests/test_queue_properties.py

511 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""队列属性测试Property-Based Testing
使用 hypothesis 验证队列管理的通用正确性属性:
- Property 8: 队列 CRUD 不变量
- Property 9: 队列出队顺序
- Property 10: 队列重排一致性
- Property 11: 执行历史排序与限制
测试策略:
- Property 8-10 通过内存模拟队列状态mock 数据库操作,验证 TaskQueue 的核心逻辑
- Property 11 通过 mock 数据库返回,验证执行历史端点的排序与限制逻辑
"""
import os
os.environ.setdefault("JWT_SECRET_KEY", "test-secret-key-for-queue-properties")
import json
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
from hypothesis import given, settings, assume
from hypothesis import strategies as st
from app.schemas.tasks import TaskConfigSchema
from app.services.task_queue import TaskQueue, QueuedTask
# ---------------------------------------------------------------------------
# 通用策略Strategies
# ---------------------------------------------------------------------------
_site_id_st = st.integers(min_value=1, max_value=2**31 - 1)
# 简单的任务代码列表
_task_codes = ["ODS_MEMBER", "ODS_PAYMENT", "ODS_ORDER", "DWD_LOAD_FROM_ODS", "DWS_SUMMARY"]
_simple_config_st = st.builds(
TaskConfigSchema,
tasks=st.lists(st.sampled_from(_task_codes), min_size=1, max_size=3, unique=True),
flow=st.sampled_from(["api_ods", "api_ods_dwd", "ods_dwd"]),
)
# ---------------------------------------------------------------------------
# 内存队列模拟器 — 用于 mock 数据库交互
# ---------------------------------------------------------------------------
class InMemoryQueueDB:
"""模拟 task_queue 表的内存存储,为 TaskQueue 方法提供 mock 数据库行为。"""
def __init__(self, site_id: int):
self.site_id = site_id
# 存储格式:{task_id: {config, status, position, ...}}
self.rows: dict[str, dict] = {}
@property
def pending_tasks(self) -> list[dict]:
"""按 position 排序的 pending 任务列表。"""
return sorted(
[r for r in self.rows.values() if r["status"] == "pending"],
key=lambda r: r["position"],
)
def mock_enqueue_connection(self):
"""为 enqueue 方法构造 mock connection。
enqueue 执行两条 SQL
1. SELECT COALESCE(MAX(position), 0) → 返回当前最大 position
2. INSERT INTO task_queue → 插入新行
"""
pending = self.pending_tasks
max_pos = max((r["position"] for r in pending), default=0)
call_count = [0]
db = self
def make_cursor():
cur = MagicMock()
executed_sqls = []
def execute_side_effect(sql, params=None):
executed_sqls.append((sql, params))
call_count[0] += 1
if "MAX(position)" in sql:
cur.fetchone.return_value = (max_pos,)
elif "INSERT INTO task_queue" in sql:
# 记录插入的行
task_id, site_id, config_json, new_pos = params
db.rows[task_id] = {
"id": task_id,
"site_id": site_id,
"config": json.loads(config_json),
"status": "pending",
"position": new_pos,
}
cur.execute = MagicMock(side_effect=execute_side_effect)
cur.__enter__ = MagicMock(return_value=cur)
cur.__exit__ = MagicMock(return_value=False)
return cur
conn = MagicMock()
conn.cursor.return_value = make_cursor()
return conn
def mock_dequeue_connection(self):
"""为 dequeue 方法构造 mock connection。
dequeue 执行两条 SQL
1. SELECT ... ORDER BY position ASC LIMIT 1 FOR UPDATE → 返回队首任务
2. UPDATE ... SET status = 'running' → 更新状态
"""
pending = self.pending_tasks
first = pending[0] if pending else None
db = self
def make_cursor():
cur = MagicMock()
def execute_side_effect(sql, params=None):
if "ORDER BY position ASC" in sql:
if first:
cur.fetchone.return_value = (
first["id"], first["site_id"],
json.dumps(first["config"]),
first["status"], first["position"],
None, None, None, None, None,
)
else:
cur.fetchone.return_value = None
elif "SET status = 'running'" in sql:
if first:
db.rows[first["id"]]["status"] = "running"
cur.execute = MagicMock(side_effect=execute_side_effect)
cur.__enter__ = MagicMock(return_value=cur)
cur.__exit__ = MagicMock(return_value=False)
return cur
conn = MagicMock()
conn.cursor.return_value = make_cursor()
return conn
def mock_delete_connection(self, task_id: str):
"""为 delete 方法构造 mock connection。"""
db = self
def make_cursor():
cur = MagicMock()
def execute_side_effect(sql, params=None):
tid = params[0]
if tid in db.rows and db.rows[tid]["status"] == "pending":
del db.rows[tid]
cur.rowcount = 1
else:
cur.rowcount = 0
cur.execute = MagicMock(side_effect=execute_side_effect)
cur.rowcount = 0
cur.__enter__ = MagicMock(return_value=cur)
cur.__exit__ = MagicMock(return_value=False)
return cur
conn = MagicMock()
conn.cursor.return_value = make_cursor()
return conn
def mock_reorder_connection(self):
"""为 reorder 方法构造 mock connection。
reorder 执行:
1. SELECT id FROM task_queue WHERE ... ORDER BY position ASC
2. 多次 UPDATE task_queue SET position = %s WHERE id = %s
"""
pending = self.pending_tasks
db = self
def make_cursor():
cur = MagicMock()
call_idx = [0]
def execute_side_effect(sql, params=None):
if "SELECT id FROM task_queue" in sql:
cur.fetchall.return_value = [(r["id"],) for r in pending]
elif "UPDATE task_queue SET position" in sql:
pos, tid = params
if tid in db.rows:
db.rows[tid]["position"] = pos
cur.execute = MagicMock(side_effect=execute_side_effect)
cur.__enter__ = MagicMock(return_value=cur)
cur.__exit__ = MagicMock(return_value=False)
return cur
conn = MagicMock()
conn.cursor.return_value = make_cursor()
return conn
def mock_list_pending_connection(self):
"""为 list_pending 方法构造 mock connection。"""
pending = self.pending_tasks
def make_cursor():
cur = MagicMock()
def execute_side_effect(sql, params=None):
cur.fetchall.return_value = [
(
r["id"], r["site_id"], json.dumps(r["config"]),
r["status"], r["position"],
None, None, None, None, None,
)
for r in pending
]
cur.execute = MagicMock(side_effect=execute_side_effect)
cur.__enter__ = MagicMock(return_value=cur)
cur.__exit__ = MagicMock(return_value=False)
return cur
conn = MagicMock()
conn.cursor.return_value = make_cursor()
return conn
# ---------------------------------------------------------------------------
# Feature: admin-web-console, Property 8: 队列 CRUD 不变量
# **Validates: Requirements 4.1, 4.4**
# ---------------------------------------------------------------------------
@settings(max_examples=100)
@given(
config=_simple_config_st,
site_id=_site_id_st,
initial_count=st.integers(min_value=0, max_value=5),
)
@patch("app.services.task_queue.get_connection")
def test_queue_crud_invariant(mock_get_conn, config, site_id, initial_count):
"""Property 8: 队列 CRUD 不变量。
入队一个任务后队列长度增加 1 且新任务状态为 pending
删除一个 pending 任务后队列长度减少 1 且该任务不再出现在队列中。
"""
queue = TaskQueue()
db = InMemoryQueueDB(site_id)
# 预填充若干任务
for i in range(initial_count):
tid = str(uuid.uuid4())
db.rows[tid] = {
"id": tid,
"site_id": site_id,
"config": {"tasks": ["ODS_MEMBER"], "flow": "api_ods"},
"status": "pending",
"position": i + 1,
}
before_count = len(db.pending_tasks)
# --- 入队 ---
mock_get_conn.return_value = db.mock_enqueue_connection()
new_id = queue.enqueue(config, site_id)
after_enqueue_count = len(db.pending_tasks)
assert after_enqueue_count == before_count + 1, (
f"入队后长度应 +1期望 {before_count + 1},实际 {after_enqueue_count}"
)
assert new_id in db.rows, "新任务应存在于队列中"
assert db.rows[new_id]["status"] == "pending", "新任务状态应为 pending"
# --- 删除刚入队的任务 ---
mock_get_conn.return_value = db.mock_delete_connection(new_id)
deleted = queue.delete(new_id, site_id)
after_delete_count = len(db.pending_tasks)
assert deleted is True, "删除 pending 任务应返回 True"
assert after_delete_count == before_count, (
f"删除后长度应恢复:期望 {before_count},实际 {after_delete_count}"
)
assert new_id not in db.rows, "已删除任务不应出现在队列中"
# ---------------------------------------------------------------------------
# Feature: admin-web-console, Property 9: 队列出队顺序
# **Validates: Requirements 4.2**
# ---------------------------------------------------------------------------
@settings(max_examples=100)
@given(
site_id=_site_id_st,
num_tasks=st.integers(min_value=1, max_value=8),
positions=st.data(),
)
@patch("app.services.task_queue.get_connection")
def test_queue_dequeue_order(mock_get_conn, site_id, num_tasks, positions):
"""Property 9: 队列出队顺序。
包含多个 pending 任务的队列dequeue 操作应返回 position 值最小的任务。
"""
queue = TaskQueue()
db = InMemoryQueueDB(site_id)
# 生成不重复的 position 值
pos_list = positions.draw(
st.lists(
st.integers(min_value=1, max_value=1000),
min_size=num_tasks,
max_size=num_tasks,
unique=True,
)
)
# 填充队列
task_ids = []
for i, pos in enumerate(pos_list):
tid = str(uuid.uuid4())
task_ids.append(tid)
db.rows[tid] = {
"id": tid,
"site_id": site_id,
"config": {"tasks": [_task_codes[i % len(_task_codes)]], "flow": "api_ods"},
"status": "pending",
"position": pos,
}
# 找出 position 最小的任务
expected_first = min(db.pending_tasks, key=lambda r: r["position"])
# dequeue
mock_get_conn.return_value = db.mock_dequeue_connection()
result = queue.dequeue(site_id)
assert result is not None, "队列非空时 dequeue 不应返回 None"
assert result.id == expected_first["id"], (
f"应返回 position 最小的任务:期望 id={expected_first['id']} "
f"(pos={expected_first['position']}),实际 id={result.id}"
)
# ---------------------------------------------------------------------------
# Feature: admin-web-console, Property 10: 队列重排一致性
# **Validates: Requirements 4.3**
# ---------------------------------------------------------------------------
@settings(max_examples=100)
@given(
site_id=_site_id_st,
num_tasks=st.integers(min_value=2, max_value=6),
data=st.data(),
)
@patch("app.services.task_queue.get_connection")
def test_queue_reorder_consistency(mock_get_conn, site_id, num_tasks, data):
"""Property 10: 队列重排一致性。
重排操作(将任务移动到新位置)后,队列中任务的相对顺序应与请求一致:
- 被移动的任务应出现在目标位置clamp 到有效范围)
- 其余任务保持原有相对顺序
- 所有任务仍在队列中(不丢失)
"""
queue = TaskQueue()
db = InMemoryQueueDB(site_id)
# 填充队列position 从 1 开始连续编号
task_ids = []
for i in range(num_tasks):
tid = str(uuid.uuid4())
task_ids.append(tid)
db.rows[tid] = {
"id": tid,
"site_id": site_id,
"config": {"tasks": ["ODS_MEMBER"], "flow": "api_ods"},
"status": "pending",
"position": i + 1,
}
# 随机选择要移动的任务和目标位置
move_idx = data.draw(st.integers(min_value=0, max_value=num_tasks - 1))
move_task_id = task_ids[move_idx]
new_position = data.draw(st.integers(min_value=1, max_value=num_tasks + 2))
# 执行 reorder
mock_get_conn.return_value = db.mock_reorder_connection()
queue.reorder(move_task_id, new_position, site_id)
# 验证:所有任务仍在队列中
remaining_ids = {r["id"] for r in db.rows.values() if r["status"] == "pending"}
assert remaining_ids == set(task_ids), "重排后不应丢失任何任务"
# 验证position 值连续且唯一1-based
positions = sorted(r["position"] for r in db.pending_tasks)
assert positions == list(range(1, num_tasks + 1)), (
f"重排后 position 应为连续编号 1..{num_tasks},实际 {positions}"
)
# 验证:被移动的任务在正确位置
# reorder 内部逻辑clamp new_position 到 [1, len(others)+1]
clamped_pos = max(1, min(new_position, num_tasks))
actual_pos = db.rows[move_task_id]["position"]
assert actual_pos == clamped_pos, (
f"被移动任务的 position 应为 {clamped_pos}clamp 后),实际 {actual_pos}"
)
# 验证:其余任务保持原有相对顺序
others_before = [tid for tid in task_ids if tid != move_task_id]
others_after = sorted(
[r for r in db.pending_tasks if r["id"] != move_task_id],
key=lambda r: r["position"],
)
others_after_ids = [r["id"] for r in others_after]
assert others_after_ids == others_before, (
"其余任务的相对顺序应保持不变"
)
# ---------------------------------------------------------------------------
# Feature: admin-web-console, Property 11: 执行历史排序与限制
# **Validates: Requirements 4.5, 8.2**
# ---------------------------------------------------------------------------
# 导入 FastAPI 测试客户端
from app.auth.dependencies import CurrentUser, get_current_user
from app.main import app
from fastapi.testclient import TestClient
def _make_history_rows(count: int, site_id: int) -> list[tuple]:
"""生成 count 条执行历史记录started_at 随机但可排序。"""
base_time = datetime(2024, 1, 1, tzinfo=timezone.utc)
rows = []
for i in range(count):
rows.append((
str(uuid.uuid4()), # id
site_id, # site_id
["ODS_MEMBER"], # task_codes
"success", # status
base_time + timedelta(hours=i), # started_at
base_time + timedelta(hours=i, minutes=30), # finished_at
0, # exit_code
1800000, # duration_ms
"python -m cli.main", # command
None, # summary
))
return rows
@settings(max_examples=100, deadline=None)
@given(
site_id=_site_id_st,
total_records=st.integers(min_value=0, max_value=30),
limit=st.integers(min_value=1, max_value=200),
)
@patch("app.routers.execution.get_connection")
def test_execution_history_sort_and_limit(mock_get_conn, site_id, total_records, limit):
"""Property 11: 执行历史排序与限制。
执行历史记录集合API 返回的结果应按 started_at 降序排列,
且结果数量不超过请求的 limit 值。
"""
# 生成测试数据
all_rows = _make_history_rows(total_records, site_id)
# 模拟数据库:按 started_at DESC 排序后取 limit 条
sorted_rows = sorted(all_rows, key=lambda r: r[4], reverse=True)
returned_rows = sorted_rows[:limit]
# mock 数据库连接
mock_cursor = MagicMock()
mock_cursor.fetchall.return_value = returned_rows
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mock_get_conn.return_value = mock_conn
# 覆盖认证依赖
test_user = CurrentUser(user_id=1, site_id=site_id)
app.dependency_overrides[get_current_user] = lambda: test_user
try:
client = TestClient(app)
# limit 必须在 [1, 200] 范围内API 约束)
clamped_limit = max(1, min(limit, 200))
resp = client.get(f"/api/execution/history?limit={clamped_limit}")
assert resp.status_code == 200
data = resp.json()
# 验证 1结果数量不超过 limit
assert len(data) <= clamped_limit, (
f"结果数量 {len(data)} 超过 limit {clamped_limit}"
)
# 验证 2结果数量不超过总记录数
assert len(data) <= total_records, (
f"结果数量 {len(data)} 超过总记录数 {total_records}"
)
# 验证 3按 started_at 降序排列
if len(data) >= 2:
for i in range(len(data) - 1):
t1 = data[i]["started_at"]
t2 = data[i + 1]["started_at"]
assert t1 >= t2, (
f"结果未按 started_at 降序排列data[{i}]={t1} < data[{i+1}]={t2}"
)
finally:
app.dependency_overrides[get_current_user] = lambda: CurrentUser(user_id=1, site_id=100)