# -*- coding: utf-8 -*- """数据库查看器 API 提供 4 个端点: - GET /api/db/schemas — 返回 Schema 列表 - GET /api/db/schemas/{name}/tables — 返回表列表和行数 - GET /api/db/tables/{schema}/{table}/columns — 返回列定义 - POST /api/db/query — 只读 SQL 执行 所有端点需要 JWT 认证。 使用 get_etl_readonly_connection(site_id) 确保 RLS 隔离。 """ from __future__ import annotations import logging import re from fastapi import APIRouter, Depends, HTTPException, status from psycopg2 import errors as pg_errors, OperationalError from app.auth.dependencies import CurrentUser, get_current_user from app.database import get_etl_readonly_connection from app.schemas.db_viewer import ( ColumnInfo, QueryRequest, QueryResponse, SchemaInfo, TableInfo, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/db", tags=["数据库查看器"]) # P0-8 修复(2026-05-04 Wave 1):改为白名单 + 黑名单双保险 # 白名单:SQL 必须以这些关键词开头(去注释/空白后) _ALLOWED_PREFIXES = ("SELECT", "WITH", "EXPLAIN", "SHOW") # 黑名单:深度防御,即使开头通过,语句中包含这些关键词也拒绝 # 涵盖 DML / DDL / DCL,补全原仅 5 个关键词的漏洞 _DENY_KEYWORDS = re.compile( r"\b(INSERT|UPDATE|DELETE|DROP|TRUNCATE|ALTER|CREATE|GRANT|REVOKE|COPY|CALL|COMMENT|VACUUM|REINDEX|CLUSTER|REFRESH|LOCK)\b", re.IGNORECASE, ) def _strip_comments(sql: str) -> str: """剥离 SQL 中的 -- 行注释 与 /* */ 块注释,避免黑名单误匹配注释里的英文词。""" # 块注释 /* ... */ (非贪婪) sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.DOTALL) # 行注释 -- 直到行尾 sql = re.sub(r"--[^\n]*", " ", sql) return sql def _extract_first_keyword(sql: str) -> str: """提取 SQL 第一个有效关键词(去掉前导注释 + 空白)。 支持 -- 行注释 和 /* */ 块注释。返回大写关键词,无关键词返回空字符串。 """ s = _strip_comments(sql).strip() m = re.match(r"\s*([A-Za-z_]+)", s) return m.group(1).upper() if m else "" # 查询结果行数上限 _MAX_ROWS = 1000 # 查询超时(秒) _QUERY_TIMEOUT_SEC = 30 # ── GET /api/db/schemas ────────────────────────────────────── @router.get("/schemas", response_model=list[SchemaInfo]) async def list_schemas( user: CurrentUser = Depends(get_current_user), ) -> list[SchemaInfo]: """返回 ETL 数据库中的 Schema 列表。""" conn = get_etl_readonly_connection(user.site_id) try: with conn.cursor() as cur: cur.execute( """ SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast') ORDER BY schema_name """ ) rows = cur.fetchall() return [SchemaInfo(name=row[0]) for row in rows] finally: conn.close() # ── GET /api/db/schemas/{name}/tables ──────────────────────── @router.get("/schemas/{name}/tables", response_model=list[TableInfo]) async def list_tables( name: str, user: CurrentUser = Depends(get_current_user), ) -> list[TableInfo]: """返回指定 Schema 下所有表的名称和行数统计。""" conn = get_etl_readonly_connection(user.site_id) try: with conn.cursor() as cur: cur.execute( """ SELECT t.table_name, s.n_live_tup FROM information_schema.tables t LEFT JOIN pg_stat_user_tables s ON s.schemaname = t.table_schema AND s.relname = t.table_name WHERE t.table_schema = %s AND t.table_type = 'BASE TABLE' ORDER BY t.table_name """, (name,), ) rows = cur.fetchall() return [ TableInfo(name=row[0], row_count=row[1]) for row in rows ] finally: conn.close() # ── GET /api/db/tables/{schema}/{table}/columns ────────────── @router.get( "/tables/{schema}/{table}/columns", response_model=list[ColumnInfo], ) async def list_columns( schema: str, table: str, user: CurrentUser = Depends(get_current_user), ) -> list[ColumnInfo]: """返回指定表的列定义。""" conn = get_etl_readonly_connection(user.site_id) try: with conn.cursor() as cur: cur.execute( """ SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (schema, table), ) rows = cur.fetchall() return [ ColumnInfo( name=row[0], data_type=row[1], is_nullable=row[2] == "YES", column_default=row[3], ) for row in rows ] finally: conn.close() # ── POST /api/db/query ─────────────────────────────────────── @router.post("/query", response_model=QueryResponse) async def execute_query( body: QueryRequest, user: CurrentUser = Depends(get_current_user), ) -> QueryResponse: """只读 SQL 执行。 安全措施(P0-8 修复 2026-05-04): 1. 白名单:SQL 必须以 SELECT / WITH / EXPLAIN / SHOW 开头(去注释/空白后) 2. 黑名单:深度防御,语句含 DML/DDL/DCL 关键词一律拒绝(覆盖 INSERT/UPDATE/DELETE/DROP/ TRUNCATE/ALTER/CREATE/GRANT/REVOKE/COPY/CALL/COMMENT/VACUUM/REINDEX/CLUSTER/REFRESH/LOCK) 3. 限制返回行数上限 1000 行 4. 设置查询超时 30 秒 5. 数据库连接为只读账号(get_etl_readonly_connection,默认 read-only 事务) """ sql = body.sql.strip() if not sql: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="SQL 语句不能为空", ) # 白名单:首关键词校验 first_kw = _extract_first_keyword(sql) if first_kw not in _ALLOWED_PREFIXES: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"只允许只读查询,SQL 必须以 {' / '.join(_ALLOWED_PREFIXES)} 开头", ) # 黑名单:深度防御(剥离注释后再匹配,避免注释中"comment"等英文词误伤) if _DENY_KEYWORDS.search(_strip_comments(sql)): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="检测到禁止的关键词(DML/DDL/DCL),只允许只读查询", ) conn = get_etl_readonly_connection(user.site_id) try: with conn.cursor() as cur: # 设置查询超时 cur.execute( "SET LOCAL statement_timeout = %s", (f"{_QUERY_TIMEOUT_SEC}s",), ) try: cur.execute(sql) except pg_errors.QueryCanceled: raise HTTPException( status_code=status.HTTP_408_REQUEST_TIMEOUT, detail=f"查询超时(超过 {_QUERY_TIMEOUT_SEC} 秒)", ) except Exception as exc: # SQL 语法错误或其他执行错误 raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"SQL 执行错误: {exc}", ) # 提取列名 columns = ( [desc[0] for desc in cur.description] if cur.description else [] ) # 限制返回行数 rows = cur.fetchmany(_MAX_ROWS) # 将元组转为列表,便于 JSON 序列化 rows_list = [list(row) for row in rows] return QueryResponse( columns=columns, rows=rows_list, row_count=len(rows_list), ) except HTTPException: raise except OperationalError as exc: # 连接级错误 logger.error("数据库查看器连接错误: %s", exc) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="数据库连接错误", ) finally: conn.close()