# -*- coding: utf-8 -*- """业务运行上下文管理 API。""" from __future__ import annotations import logging from datetime import date from fastapi import APIRouter, Depends, HTTPException, Query, status from psycopg2.extras import RealDictCursor from app.auth.dependencies import CurrentUser, get_current_user from app.database import get_connection from app.schemas.runtime_context import ( RuntimeContextResponse, RuntimeSwitchRequest, RuntimeSwitchResponse, RuntimeTransitionStep, ) from app.services.runtime_context import ( MODE_LIVE, MODE_SANDBOX, RuntimeContext, get_runtime_context, new_sandbox_instance_id, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/admin/runtime-context", tags=["业务运行上下文"]) config_router = APIRouter(prefix="/api/config", tags=["业务配置"]) def _require_super_admin(user: CurrentUser) -> None: if "super_admin" not in user.roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="仅超级管理员可切换业务运行上下文", ) def _context_response(ctx: RuntimeContext) -> RuntimeContextResponse: return RuntimeContextResponse(**ctx.to_dict()) @config_router.get("/runtime-context", response_model=RuntimeContextResponse) async def get_current_runtime_context( user: CurrentUser = Depends(get_current_user), ) -> RuntimeContextResponse: """返回当前登录用户门店的业务运行上下文。""" return _context_response(get_runtime_context(user.site_id)) @router.get("", response_model=RuntimeContextResponse) async def get_admin_runtime_context( site_id: int = Query(..., ge=1), user: CurrentUser = Depends(get_current_user), ) -> RuntimeContextResponse: """系统管理端按门店查看业务运行上下文。""" _require_super_admin(user) return _context_response(get_runtime_context(site_id)) @router.get("/sites") async def list_runtime_sites( user: CurrentUser = Depends(get_current_user), ) -> list[dict]: """列出可配置门店及其当前运行上下文。""" _require_super_admin(user) conn = get_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute( """ SELECT s.site_id, s.site_name, s.site_code, s.is_active, c.mode, c.sandbox_date, c.sandbox_instance_id, c.ai_mode, c.status, c.updated_at FROM biz.sites s LEFT JOIN biz.site_runtime_context c ON c.site_id = s.site_id ORDER BY s.is_active DESC, s.site_id """ ) rows = cur.fetchall() conn.commit() except Exception: conn.rollback() raise finally: conn.close() return [dict(row) for row in rows] @router.patch("", response_model=RuntimeSwitchResponse) async def switch_runtime_context( body: RuntimeSwitchRequest, user: CurrentUser = Depends(get_current_user), ) -> RuntimeSwitchResponse: """切换门店业务运行上下文。 切换前会终止当前运行中的 ETL、取消未完成 AI 触发记录。 `biz.trigger_jobs` 是全局调度表(无 site_id 列),不随单门店沙箱切换暂停; 多门店隔离完全通过 runtime_mode + sandbox_instance_id 实现。 """ _require_super_admin(user) if body.mode == MODE_SANDBOX and body.sandbox_date is None: raise HTTPException(status_code=422, detail="沙箱模式必须设置 sandbox_date") if body.mode == MODE_LIVE and body.sandbox_date is not None: raise HTTPException(status_code=422, detail="live 模式不能设置 sandbox_date") if body.mode == MODE_SANDBOX and body.sandbox_date and body.sandbox_date > date.today(): raise HTTPException(status_code=422, detail="sandbox_date 不能晚于真实今天") steps: list[RuntimeTransitionStep] = [] steps.extend(await _stop_runtime_activity(body.site_id)) conn = get_connection() try: with conn.cursor() as cur: old_ctx = get_runtime_context(body.site_id, conn=conn) sandbox_instance_id = None if body.mode == MODE_SANDBOX: if body.reset_sandbox or not old_ctx.sandbox_instance_id: sandbox_instance_id = new_sandbox_instance_id() else: sandbox_instance_id = old_ctx.sandbox_instance_id cur.execute( """ INSERT INTO biz.site_runtime_context (site_id, mode, sandbox_date, sandbox_instance_id, ai_mode, status, updated_by, updated_at, reason) VALUES (%s, %s, %s, %s, 'live', 'active', %s, NOW(), %s) ON CONFLICT (site_id) DO UPDATE SET mode = EXCLUDED.mode, sandbox_date = EXCLUDED.sandbox_date, sandbox_instance_id = EXCLUDED.sandbox_instance_id, ai_mode = EXCLUDED.ai_mode, status = EXCLUDED.status, updated_by = EXCLUDED.updated_by, updated_at = NOW(), reason = EXCLUDED.reason """, ( body.site_id, body.mode, body.sandbox_date, sandbox_instance_id, user.user_id, body.reason, ), ) steps.append(RuntimeTransitionStep( key="biz_triggers_unchanged", title="保持业务触发器", status="skipped", count=0, detail=( "biz.trigger_jobs 为全局调度表(无 site_id 列),单门店沙箱切换不影响其它门店;" "沙箱隔离由 runtime_mode + sandbox_instance_id 在数据写入层完成。" ), )) conn.commit() except Exception: conn.rollback() logger.exception("切换业务运行上下文失败: site_id=%s", body.site_id) raise finally: conn.close() ctx = get_runtime_context(body.site_id) steps.append(RuntimeTransitionStep( key="apply_context", title="写入业务运行上下文", status="success", detail=( f"当前模式={ctx.mode},业务日期={ctx.business_date}" + (f",沙箱实例={ctx.sandbox_instance_id}" if ctx.is_sandbox else "") ), )) return RuntimeSwitchResponse(context=_context_response(ctx), steps=steps) async def _stop_runtime_activity(site_id: int) -> list[RuntimeTransitionStep]: """终止切换前仍在运行的 ETL/AI/队列活动。""" steps: list[RuntimeTransitionStep] = [] # 1. 终止当前进程内 ETL 执行。 try: from app.services.task_executor import task_executor running_ids = task_executor.get_running_ids() cancelled = 0 for execution_id in running_ids: if await task_executor.cancel(execution_id): cancelled += 1 steps.append(RuntimeTransitionStep( key="cancel_etl_processes", title="终止当前 ETL 执行", status="success", count=cancelled, detail=f"检测到 {len(running_ids)} 个当前进程内执行,已发送取消信号。", )) except Exception as exc: logger.exception("终止 ETL 执行失败") steps.append(RuntimeTransitionStep( key="cancel_etl_processes", title="终止当前 ETL 执行", status="warning", detail=str(exc)[:300], )) # 2. 清理当前门店队列中未完成任务。 conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ UPDATE task_queue SET status = 'cancelled', finished_at = NOW(), error_message = COALESCE(error_message, '') || E'\n[runtime-context] 切换业务运行上下文时取消' WHERE site_id = %s AND status IN ('pending', 'running') """, (site_id,), ) queue_cancelled = cur.rowcount conn.commit() steps.append(RuntimeTransitionStep( key="cancel_task_queue", title="取消 ETL 队列", status="success", count=queue_cancelled, detail="已取消当前门店 pending/running 的 task_queue 记录。", )) except Exception as exc: conn.rollback() logger.exception("取消 ETL 队列失败") steps.append(RuntimeTransitionStep( key="cancel_task_queue", title="取消 ETL 队列", status="warning", detail=str(exc)[:300], )) finally: conn.close() # 3. 取消当前站点内存 AI 调用链,并标记未完成 ai_trigger_jobs。 try: from app.ai.dispatcher import get_dispatcher dispatcher = get_dispatcher() cancelled = dispatcher.cancel_running(site_id) steps.append(RuntimeTransitionStep( key="cancel_ai_runtime", title="取消当前 AI 调用链", status="success", count=cancelled, detail="已取消当前进程内属于该门店的 AI 异步调用链。", )) except Exception as exc: steps.append(RuntimeTransitionStep( key="cancel_ai_runtime", title="取消当前 AI 调用链", status="warning", detail=f"AI Dispatcher 不可用或取消失败:{str(exc)[:240]}", )) conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ UPDATE biz.ai_trigger_jobs SET status = 'cancelled', finished_at = NOW(), error_message = COALESCE(error_message, '') || E'\n[runtime-context] 切换业务运行上下文时取消' WHERE site_id = %s AND status IN ('pending', 'running') """, (site_id,), ) ai_cancelled = cur.rowcount conn.commit() steps.append(RuntimeTransitionStep( key="cancel_ai_jobs", title="标记未完成 AI 触发", status="success", count=ai_cancelled, detail="已将当前门店 pending/running 的 ai_trigger_jobs 标记为 cancelled。", )) except Exception as exc: conn.rollback() logger.exception("标记 AI 触发失败") steps.append(RuntimeTransitionStep( key="cancel_ai_jobs", title="标记未完成 AI 触发", status="warning", detail=str(exc)[:300], )) finally: conn.close() return steps