# -*- coding: utf-8 -*- """ RLS 按 site_id 隔离属性测试 **Validates: Requirements 13.2** Property 11: 对于任意 app schema 中启用了 RLS 的视图,当会话变量 `app.current_site_id` 设置为某个门店 ID 时,查询结果应仅包含该 `site_id` 的数据行。 实现方式:基于 DDL 文件的静态分析(不需要实际数据库连接) - 解析 app.sql 中所有 ENABLE ROW LEVEL SECURITY 的表 - 解析所有 CREATE POLICY 语句 - 验证每个启用 RLS 的表都有包含 site_id 过滤的策略 - 验证策略的 USING 子句使用 current_setting('app.current_site_id') 模式 """ import os import re from hypothesis import given, settings, assume from hypothesis.strategies import sampled_from, integers # ── 路径常量 ────────────────────────────────────────────────────── SCHEMAS_DIR = os.path.join(r"C:\NeoZQYY", "db", "etl_feiqiu", "schemas") APP_SQL = os.path.join(SCHEMAS_DIR, "app.sql") # ── 解析工具 ────────────────────────────────────────────────────── def _read_app_sql() -> str: """读取 app.sql 文件内容。""" with open(APP_SQL, encoding="utf-8") as f: return f.read() # 匹配 ALTER TABLE [schema.]table_name ENABLE ROW LEVEL SECURITY _ENABLE_RLS_RE = re.compile( r"ALTER\s+TABLE\s+([\w]+\.[\w]+)\s+ENABLE\s+ROW\s+LEVEL\s+SECURITY", re.IGNORECASE, ) # 匹配 CREATE POLICY policy_name ON [schema.]table_name ... USING (...) # 捕获:策略名、表全名、USING 子句内容 _CREATE_POLICY_RE = re.compile( r"CREATE\s+POLICY\s+(\w+)\s+ON\s+([\w]+\.[\w]+)" r".*?USING\s*\((.+?)\)\s*;", re.IGNORECASE | re.DOTALL, ) # 匹配 USING 子句中的 current_setting('app.current_site_id') 模式 _SITE_ID_FILTER_RE = re.compile( r"current_setting\s*\(\s*'app\.current_site_id'\s*\)", re.IGNORECASE, ) # 匹配 USING 子句中的 site_id 相关字段(site_id 或 register_site_id 等) _SITE_ID_FIELD_RE = re.compile( r"\b\w*site_id\b", re.IGNORECASE, ) def _parse_rls_enabled_tables(content: str) -> list[str]: """提取所有启用了 RLS 的表(schema.table 格式)。""" return [m.group(1).lower() for m in _ENABLE_RLS_RE.finditer(content)] def _parse_policies(content: str) -> list[dict]: """ 提取所有 CREATE POLICY 语句。 返回 [{"name": ..., "table": ..., "using_clause": ...}, ...] """ policies = [] for m in _CREATE_POLICY_RE.finditer(content): policies.append({ "name": m.group(1).lower(), "table": m.group(2).lower(), "using_clause": m.group(3).strip(), }) return policies # ── 预加载(模块级,只解析一次) ────────────────────────────────── _content = _read_app_sql() RLS_TABLES = _parse_rls_enabled_tables(_content) POLICIES = _parse_policies(_content) # 构建 table -> [policy] 映射 POLICY_MAP: dict[str, list[dict]] = {} for p in POLICIES: POLICY_MAP.setdefault(p["table"], []).append(p) assert len(RLS_TABLES) > 0, "未找到任何启用 RLS 的表,请检查 app.sql" assert len(POLICIES) > 0, "未找到任何 CREATE POLICY 语句,请检查 app.sql" # ── 属性测试 ────────────────────────────────────────────────────── @given(table=sampled_from(RLS_TABLES)) @settings(max_examples=100) def test_rls_table_has_site_isolation_policy(table: str): """ Property 11(子属性 A):每个启用 RLS 的表都有对应的隔离策略。 对于任意启用了 RLS 的表,应存在至少一条 CREATE POLICY 语句。 **Validates: Requirements 13.2** """ assert table in POLICY_MAP, ( f"表 {table} 启用了 RLS 但没有对应的 CREATE POLICY 语句。" f"Requirements 13.2 要求所有启用 RLS 的表都有隔离策略。" ) @given(table=sampled_from(RLS_TABLES)) @settings(max_examples=100) def test_rls_policy_uses_current_site_id_setting(table: str): """ Property 11(子属性 B):RLS 策略使用 app.current_site_id 会话变量过滤。 对于任意启用了 RLS 的表,其策略的 USING 子句应包含 current_setting('app.current_site_id') 模式,确保按门店 ID 隔离。 **Validates: Requirements 13.2** """ assume(table in POLICY_MAP) policies = POLICY_MAP[table] has_site_filter = any( _SITE_ID_FILTER_RE.search(p["using_clause"]) for p in policies ) assert has_site_filter, ( f"表 {table} 的 RLS 策略未使用 current_setting('app.current_site_id') 过滤。" f"策略 USING 子句: {[p['using_clause'] for p in policies]}。" f"Requirements 13.2 要求根据会话变量 app.current_site_id 自动过滤。" ) @given(table=sampled_from(RLS_TABLES)) @settings(max_examples=100) def test_rls_policy_filters_by_site_id_field(table: str): """ Property 11(子属性 C):RLS 策略的 USING 子句包含 site_id 相关字段。 对于任意启用了 RLS 的表,其策略的 USING 子句应引用 site_id 或 register_site_id 等 site_id 相关字段。 **Validates: Requirements 13.2** """ assume(table in POLICY_MAP) policies = POLICY_MAP[table] has_site_id_field = any( _SITE_ID_FIELD_RE.search(p["using_clause"]) for p in policies ) assert has_site_id_field, ( f"表 {table} 的 RLS 策略 USING 子句中未引用 site_id 相关字段。" f"策略 USING 子句: {[p['using_clause'] for p in policies]}。" f"Requirements 13.2 要求按 site_id 隔离数据。" ) @given( table=sampled_from(RLS_TABLES), site_id=integers(min_value=1, max_value=10**15), ) @settings(max_examples=100) def test_rls_policy_using_clause_pattern_valid_for_any_site_id( table: str, site_id: int ): """ Property 11(子属性 D):RLS 策略的 USING 子句模式对任意 site_id 值有效。 对于任意启用了 RLS 的表和任意正整数 site_id,策略的 USING 子句 应为 ` = current_setting('app.current_site_id')::` 的等值比较模式, 确保设置任意 site_id 值时都能正确过滤。 **Validates: Requirements 13.2** """ assume(table in POLICY_MAP) policies = POLICY_MAP[table] # 验证策略使用等值比较模式:field = current_setting(...)::type # \w* 允许匹配 site_id(无前缀)和 register_site_id(有前缀) equality_pattern = re.compile( r"\w*site_id\s*=\s*current_setting\s*\(\s*'app\.current_site_id'\s*\)\s*::\s*\w+", re.IGNORECASE, ) has_equality = any( equality_pattern.search(p["using_clause"]) for p in policies ) assert has_equality, ( f"表 {table} 的 RLS 策略未使用等值比较模式 " f"(field = current_setting('app.current_site_id')::type)。" f"对于 site_id={site_id},无法保证正确过滤。" f"策略 USING 子句: {[p['using_clause'] for p in policies]}" )