Files
Neo-ZQYY/tests/test_property_rls_site_id.py

203 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
RLS 按 site_id 隔离属性测试
**Validates: Requirements 13.2**
Property 11: 对于任意 app schema 中启用了 RLS 的视图,当会话变量
`app.current_site_id` 设置为某个门店 ID 时,查询结果应仅包含该
`site_id` 的数据行。
实现方式:基于 DDL 文件的静态分析(不需要实际数据库连接)
- 解析 app.sql 中所有 ENABLE ROW LEVEL SECURITY 的表
- 解析所有 CREATE POLICY 语句
- 验证每个启用 RLS 的表都有包含 site_id 过滤的策略
- 验证策略的 USING 子句使用 current_setting('app.current_site_id') 模式
"""
import os
import re
from hypothesis import given, settings, assume
from hypothesis.strategies import sampled_from, integers
# ── 路径常量 ──────────────────────────────────────────────────────
SCHEMAS_DIR = os.path.join(r"C:\NeoZQYY", "db", "etl_feiqiu", "schemas")
APP_SQL = os.path.join(SCHEMAS_DIR, "app.sql")
# ── 解析工具 ──────────────────────────────────────────────────────
def _read_app_sql() -> str:
"""读取 app.sql 文件内容。"""
with open(APP_SQL, encoding="utf-8") as f:
return f.read()
# 匹配 ALTER TABLE [schema.]table_name ENABLE ROW LEVEL SECURITY
_ENABLE_RLS_RE = re.compile(
r"ALTER\s+TABLE\s+([\w]+\.[\w]+)\s+ENABLE\s+ROW\s+LEVEL\s+SECURITY",
re.IGNORECASE,
)
# 匹配 CREATE POLICY policy_name ON [schema.]table_name ... USING (...)
# 捕获策略名、表全名、USING 子句内容
_CREATE_POLICY_RE = re.compile(
r"CREATE\s+POLICY\s+(\w+)\s+ON\s+([\w]+\.[\w]+)"
r".*?USING\s*\((.+?)\)\s*;",
re.IGNORECASE | re.DOTALL,
)
# 匹配 USING 子句中的 current_setting('app.current_site_id') 模式
_SITE_ID_FILTER_RE = re.compile(
r"current_setting\s*\(\s*'app\.current_site_id'\s*\)",
re.IGNORECASE,
)
# 匹配 USING 子句中的 site_id 相关字段site_id 或 register_site_id 等)
_SITE_ID_FIELD_RE = re.compile(
r"\b\w*site_id\b",
re.IGNORECASE,
)
def _parse_rls_enabled_tables(content: str) -> list[str]:
"""提取所有启用了 RLS 的表schema.table 格式)。"""
return [m.group(1).lower() for m in _ENABLE_RLS_RE.finditer(content)]
def _parse_policies(content: str) -> list[dict]:
"""
提取所有 CREATE POLICY 语句。
返回 [{"name": ..., "table": ..., "using_clause": ...}, ...]
"""
policies = []
for m in _CREATE_POLICY_RE.finditer(content):
policies.append({
"name": m.group(1).lower(),
"table": m.group(2).lower(),
"using_clause": m.group(3).strip(),
})
return policies
# ── 预加载(模块级,只解析一次) ──────────────────────────────────
_content = _read_app_sql()
RLS_TABLES = _parse_rls_enabled_tables(_content)
POLICIES = _parse_policies(_content)
# 构建 table -> [policy] 映射
POLICY_MAP: dict[str, list[dict]] = {}
for p in POLICIES:
POLICY_MAP.setdefault(p["table"], []).append(p)
assert len(RLS_TABLES) > 0, "未找到任何启用 RLS 的表,请检查 app.sql"
assert len(POLICIES) > 0, "未找到任何 CREATE POLICY 语句,请检查 app.sql"
# ── 属性测试 ──────────────────────────────────────────────────────
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_table_has_site_isolation_policy(table: str):
"""
Property 11子属性 A每个启用 RLS 的表都有对应的隔离策略。
对于任意启用了 RLS 的表,应存在至少一条 CREATE POLICY 语句。
**Validates: Requirements 13.2**
"""
assert table in POLICY_MAP, (
f"{table} 启用了 RLS 但没有对应的 CREATE POLICY 语句。"
f"Requirements 13.2 要求所有启用 RLS 的表都有隔离策略。"
)
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_policy_uses_current_site_id_setting(table: str):
"""
Property 11子属性 BRLS 策略使用 app.current_site_id 会话变量过滤。
对于任意启用了 RLS 的表,其策略的 USING 子句应包含
current_setting('app.current_site_id') 模式,确保按门店 ID 隔离。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
has_site_filter = any(
_SITE_ID_FILTER_RE.search(p["using_clause"])
for p in policies
)
assert has_site_filter, (
f"{table} 的 RLS 策略未使用 current_setting('app.current_site_id') 过滤。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
f"Requirements 13.2 要求根据会话变量 app.current_site_id 自动过滤。"
)
@given(table=sampled_from(RLS_TABLES))
@settings(max_examples=100)
def test_rls_policy_filters_by_site_id_field(table: str):
"""
Property 11子属性 CRLS 策略的 USING 子句包含 site_id 相关字段。
对于任意启用了 RLS 的表,其策略的 USING 子句应引用 site_id
或 register_site_id 等 site_id 相关字段。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
has_site_id_field = any(
_SITE_ID_FIELD_RE.search(p["using_clause"])
for p in policies
)
assert has_site_id_field, (
f"{table} 的 RLS 策略 USING 子句中未引用 site_id 相关字段。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
f"Requirements 13.2 要求按 site_id 隔离数据。"
)
@given(
table=sampled_from(RLS_TABLES),
site_id=integers(min_value=1, max_value=10**15),
)
@settings(max_examples=100)
def test_rls_policy_using_clause_pattern_valid_for_any_site_id(
table: str, site_id: int
):
"""
Property 11子属性 DRLS 策略的 USING 子句模式对任意 site_id 值有效。
对于任意启用了 RLS 的表和任意正整数 site_id策略的 USING 子句
应为 `<field> = current_setting('app.current_site_id')::<type>` 的等值比较模式,
确保设置任意 site_id 值时都能正确过滤。
**Validates: Requirements 13.2**
"""
assume(table in POLICY_MAP)
policies = POLICY_MAP[table]
# 验证策略使用等值比较模式field = current_setting(...)::type
# \w* 允许匹配 site_id无前缀和 register_site_id有前缀
equality_pattern = re.compile(
r"\w*site_id\s*=\s*current_setting\s*\(\s*'app\.current_site_id'\s*\)\s*::\s*\w+",
re.IGNORECASE,
)
has_equality = any(
equality_pattern.search(p["using_clause"])
for p in policies
)
assert has_equality, (
f"{table} 的 RLS 策略未使用等值比较模式 "
f"(field = current_setting('app.current_site_id')::type)。"
f"对于 site_id={site_id},无法保证正确过滤。"
f"策略 USING 子句: {[p['using_clause'] for p in policies]}"
)