Files
Neo-ZQYY/tests/test_property_rls_site_id.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

211 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
RLS 按 site_id 隔离属性测试
**Validates: Requirements 13.2**
Property 11: 对于任意 app schema 中启用了 RLS 的视图,当会话变量
`app.current_site_id` 设置为某个门店 ID 时,查询结果应仅包含该
`site_id` 的数据行。
实现方式:基于 DDL 文件的静态分析(不需要实际数据库连接)
- 解析 app.sql 中所有 ENABLE ROW LEVEL SECURITY 的表
- 解析所有 CREATE POLICY 语句
- 验证每个启用 RLS 的表都有包含 site_id 过滤的策略
- 验证策略的 USING 子句使用 current_setting('app.current_site_id') 模式
"""
import os
import re
import pytest
from hypothesis import given, settings, assume
from hypothesis.strategies import sampled_from, integers
# ── 路径常量 ──────────────────────────────────────────────────────
SCHEMAS_DIR = os.path.join(r"C:\NeoZQYY", "db", "etl_feiqiu", "schemas")
APP_SQL = os.path.join(SCHEMAS_DIR, "app.sql")
# DDL 基线文件在 ETL schema 重构后已删除,跳过整个模块
if not os.path.exists(APP_SQL):
pytest.skip(
"DDL 基线文件 app.sql 不存在ETL schema 重构后已删除)",
allow_module_level=True,
)
# ── 解析工具 ──────────────────────────────────────────────────────
def _read_app_sql() -> str:
"""读取 app.sql 文件内容。"""
with open(APP_SQL, encoding="utf-8") as f:
return f.read()
# 匹配 ALTER TABLE [schema.]table_name ENABLE ROW LEVEL SECURITY
_ENABLE_RLS_RE = re.compile(
r"ALTER\s+TABLE\s+([\w]+\.[\w]+)\s+ENABLE\s+ROW\s+LEVEL\s+SECURITY",
re.IGNORECASE,
)
# 匹配 CREATE POLICY policy_name ON [schema.]table_name ... USING (...)
# 捕获策略名、表全名、USING 子句内容
_CREATE_POLICY_RE = re.compile(
r"CREATE\s+POLICY\s+(\w+)\s+ON\s+([\w]+\.[\w]+)"
r".*?USING\s*\((.+?)\)\s*;",
re.IGNORECASE | re.DOTALL,
)
# 匹配 USING 子句中的 current_setting('app.current_site_id') 模式
_SITE_ID_FILTER_RE = re.compile(
r"current_setting\s*\(\s*'app\.current_site_id'\s*\)",
re.IGNORECASE,
)
# 匹配 USING 子句中的 site_id 相关字段site_id 或 register_site_id 等)
_SITE_ID_FIELD_RE = re.compile(
r"\b\w*site_id\b",
re.IGNORECASE,
)
def _parse_rls_enabled_tables(content: str) -> list[str]:
"""提取所有启用了 RLS 的表schema.table 格式)。"""
return [m.group(1).lower() for m in _ENABLE_RLS_RE.finditer(content)]
def _parse_policies(content: str) -> list[dict]:
"""
提取所有 CREATE POLICY 语句。
返回 [{"name": ..., "table": ..., "using_clause": ...}, ...]
"""
policies = []
for m in _CREATE_POLICY_RE.finditer(content):
policies.append({
"name": m.group(1).lower(),
"table": m.group(2).lower(),
"using_clause": m.group(3).strip(),
})
return policies
# ── 预加载(模块级,只解析一次) ──────────────────────────────────
_content = _read_app_sql()
RLS_TABLES = _parse_rls_enabled_tables(_content)
POLICIES = _parse_policies(_content)
# 构建 table -> [policy] 映射
POLICY_MAP: dict[str, list[dict]] = {}
for p in POLICIES:
POLICY_MAP.setdefault(p["table"], []).append(p)
assert len(RLS_TABLES) > 0, "未找到任何启用 RLS 的表,请检查 app.sql"
assert len(POLICIES) > 0, "未找到任何 CREATE POLICY 语句,请检查 app.sql"
# ── 属性测试 ──────────────────────────────────────────────────────
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_table_has_site_isolation_policy(table: str):
"""
Property 11子属性 A每个启用 RLS 的表都有对应的隔离策略。
对于任意启用了 RLS 的表,应存在至少一条 CREATE POLICY 语句。
**Validates: Requirements 13.2**
"""
assert table in POLICY_MAP, (
f"{table} 启用了 RLS 但没有对应的 CREATE POLICY 语句。"
f"Requirements 13.2 要求所有启用 RLS 的表都有隔离策略。"
)
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_policy_uses_current_site_id_setting(table: str):
"""
Property 11子属性 BRLS 策略使用 app.current_site_id 会话变量过滤。
对于任意启用了 RLS 的表,其策略的 USING 子句应包含
current_setting('app.current_site_id') 模式,确保按门店 ID 隔离。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
has_site_filter = any(
_SITE_ID_FILTER_RE.search(p["using_clause"])
for p in policies
)
assert has_site_filter, (
f"{table} 的 RLS 策略未使用 current_setting('app.current_site_id') 过滤。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
f"Requirements 13.2 要求根据会话变量 app.current_site_id 自动过滤。"
)
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_policy_filters_by_site_id_field(table: str):
"""
Property 11子属性 CRLS 策略的 USING 子句包含 site_id 相关字段。
对于任意启用了 RLS 的表,其策略的 USING 子句应引用 site_id
或 register_site_id 等 site_id 相关字段。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
has_site_id_field = any(
_SITE_ID_FIELD_RE.search(p["using_clause"])
for p in policies
)
assert has_site_id_field, (
f"{table} 的 RLS 策略 USING 子句中未引用 site_id 相关字段。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
f"Requirements 13.2 要求按 site_id 隔离数据。"
)
@given(
table=sampled_from(RLS_TABLES),
site_id=integers(min_value=1, max_value=10**15),
)
@settings(max_examples=100)
def test_rls_policy_using_clause_pattern_valid_for_any_site_id(
table: str, site_id: int
):
"""
Property 11子属性 DRLS 策略的 USING 子句模式对任意 site_id 值有效。
对于任意启用了 RLS 的表和任意正整数 site_id策略的 USING 子句
应为 `<field> = current_setting('app.current_site_id')::<type>` 的等值比较模式,
确保设置任意 site_id 值时都能正确过滤。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
# 验证策略使用等值比较模式field = current_setting(...)::type
# \w* 允许匹配 site_id无前缀和 register_site_id有前缀
equality_pattern = re.compile(
r"\w*site_id\s*=\s*current_setting\s*\(\s*'app\.current_site_id'\s*\)\s*::\s*\w+",
re.IGNORECASE,
)
has_equality = any(
equality_pattern.search(p["using_clause"])
for p in policies
)
assert has_equality, (
f"{table} 的 RLS 策略未使用等值比较模式 "
f"(field = current_setting('app.current_site_id')::type)。"
f"对于 site_id={site_id},无法保证正确过滤。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
)