# -*- coding: utf-8 -*- """业务运行上下文与业务时钟服务。 该模块是开发/测试沙箱的统一控制层: - live 模式:沿用真实系统日期和正式数据。 - sandbox 模式:业务上假设今天是配置的历史日期,并用 sandbox_instance_id 隔离写入。 """ from __future__ import annotations import uuid from dataclasses import dataclass from datetime import date, datetime, time, timedelta, timezone from typing import Any from app import config _LOCAL_TZ = timezone(timedelta(hours=8)) MODE_LIVE = "live" MODE_SANDBOX = "sandbox" AI_MODE_LIVE = "live" LIVE_INSTANCE_ID = "live" @dataclass(frozen=True) class RuntimeContext: """单门店当前业务运行上下文。""" site_id: int mode: str = MODE_LIVE business_day_start_hour: int = config.BUSINESS_DAY_START_HOUR sandbox_date: date | None = None sandbox_instance_id: str | None = None ai_mode: str = AI_MODE_LIVE status: str = "active" @property def is_sandbox(self) -> bool: return self.mode == MODE_SANDBOX and self.sandbox_date is not None @property def business_date(self) -> date: if self.is_sandbox and self.sandbox_date is not None: return self.sandbox_date now = datetime.now(_LOCAL_TZ) today = now.date() if now.hour < self.business_day_start_hour: return today - timedelta(days=1) return today @property def business_now(self) -> datetime: if not self.is_sandbox: return datetime.now(_LOCAL_TZ) now = datetime.now(_LOCAL_TZ) return datetime.combine(self.business_date, now.timetz(), tzinfo=_LOCAL_TZ) @property def active_sandbox_instance_id(self) -> str | None: if not self.is_sandbox: return None return self.sandbox_instance_id def to_dict(self) -> dict[str, Any]: return { "site_id": self.site_id, "mode": self.mode, "business_day_start_hour": self.business_day_start_hour, "business_date": self.business_date.isoformat(), "business_now": self.business_now.isoformat(), "sandbox_date": self.sandbox_date.isoformat() if self.sandbox_date else None, "sandbox_instance_id": self.sandbox_instance_id, "ai_mode": self.ai_mode, "status": self.status, "is_sandbox": self.is_sandbox, } def new_sandbox_instance_id() -> str: """生成新的沙箱实例 ID。""" return f"sbx_{uuid.uuid4().hex[:24]}" def _default_context(site_id: int) -> RuntimeContext: return RuntimeContext(site_id=site_id) def get_runtime_context(site_id: int, conn: Any | None = None) -> RuntimeContext: """读取门店运行上下文。 表不存在或未配置时降级为 live,保证迁移前不影响正式链路。 """ own_conn = conn is None if own_conn: from app.database import get_connection conn = get_connection() try: with conn.cursor() as cur: try: cur.execute( """ SELECT mode, sandbox_date, sandbox_instance_id, ai_mode, status FROM biz.site_runtime_context WHERE site_id = %s """, (site_id,), ) except Exception: if own_conn: conn.rollback() return _default_context(site_id) row = cur.fetchone() if own_conn: conn.commit() finally: if own_conn: conn.close() if not row: return _default_context(site_id) mode, sandbox_date, sandbox_instance_id, ai_mode, status = row if mode not in (MODE_LIVE, MODE_SANDBOX): mode = MODE_LIVE if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id): mode = MODE_LIVE return RuntimeContext( site_id=site_id, mode=mode, sandbox_date=sandbox_date, sandbox_instance_id=sandbox_instance_id, ai_mode=ai_mode or AI_MODE_LIVE, status=status or "active", ) def namespace_ai_target_id(site_id: int, target_id: str, conn: Any | None = None) -> str: """按当前上下文转换 AI cache target_id。 前端和调用方继续使用原始 target_id;沙箱命名空间在后端统一处理。 """ ctx = get_runtime_context(site_id, conn=conn) if not ctx.is_sandbox or not ctx.sandbox_instance_id: return target_id return f"{ctx.sandbox_instance_id}:{target_id}" def task_runtime_filter( site_id: int, *, alias: str = "", conn: Any | None = None, ) -> tuple[str, list[Any]]: """返回 coach_tasks 等表的运行上下文过滤条件。""" ctx = get_runtime_context(site_id, conn=conn) prefix = f"{alias}." if alias else "" if ctx.is_sandbox and ctx.sandbox_instance_id: return ( f" AND {prefix}runtime_mode = %s AND {prefix}sandbox_instance_id = %s", [MODE_SANDBOX, ctx.sandbox_instance_id], ) return ( f" AND COALESCE({prefix}runtime_mode, 'live') = %s " f"AND COALESCE({prefix}sandbox_instance_id, %s) = %s", [MODE_LIVE, LIVE_INSTANCE_ID, LIVE_INSTANCE_ID], ) def runtime_insert_columns(site_id: int, conn: Any | None = None) -> tuple[str, str, list[Any]]: """返回 INSERT SQL 片段:列名、占位符和值。""" ctx = get_runtime_context(site_id, conn=conn) return ( "runtime_mode, sandbox_instance_id", "%s, %s", [ MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE, ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID, ], ) def runtime_update_assignments(site_id: int, conn: Any | None = None) -> tuple[str, list[Any]]: """返回 UPDATE SQL 片段,用于把运行上下文写回已有记录。""" ctx = get_runtime_context(site_id, conn=conn) return ( "runtime_mode = %s, sandbox_instance_id = %s", [ MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE, ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID, ], ) def as_runtime_now_param(site_id: int, conn: Any | None = None) -> datetime: """返回可传给 SQL 的业务当前时间。""" return get_runtime_context(site_id, conn=conn).business_now def as_runtime_today_param(site_id: int, conn: Any | None = None) -> date: """返回可传给 SQL 的业务当前日期。""" return get_runtime_context(site_id, conn=conn).business_date def as_runtime_year_month_param(site_id: int, conn: Any | None = None) -> str: """返回 'YYYY-MM' 形式的业务年月,用于 performance 等月度查询。""" bd = get_runtime_context(site_id, conn=conn).business_date return f"{bd.year:04d}-{bd.month:02d}" def as_runtime_business_now_str(site_id: int, conn: Any | None = None, fmt: str = "%Y-%m-%d %H:%M:%S") -> str: """返回业务当前时间的格式化字符串,用于 AI prompts 中的 current_time。""" return get_runtime_context(site_id, conn=conn).business_now.strftime(fmt) def business_date_upper_bound_sql( site_id: int, *, column: str, alias: str = "", cast: str | None = None, conn: Any | None = None, ) -> tuple[str, list[Any]]: """返回业务日上界 SQL 片段。 sandbox 模式下,强制把 ``column`` 限制在业务日及之前(避免读到「未来」数据)。 live 模式下返回空片段,不影响任何逻辑。 cast 用于把 timestamp/timestamptz 列裁剪成日期再比较,例如 ``cast='date'``。 """ ctx = get_runtime_context(site_id, conn=conn) if not ctx.is_sandbox: return ("", []) prefix = f"{alias}." if alias else "" expr = f"{prefix}{column}" if cast: expr = f"({expr})::{cast}" return (f" AND {expr} <= %s", [ctx.business_date]) def apply_runtime_session_vars(conn: Any, ctx: RuntimeContext | None = None, *, site_id: int | None = None) -> None: """在已有数据库连接上设置 ``app.current_business_date`` 等 GUC 变量。 供 RLS 视图层(C 方案)使用:视图通过 ``current_setting('app.current_business_date', true)`` 读取业务日,再对事实/维度表做日期上界裁剪。 无论 live / sandbox 都设置该变量;live 下视图仍按真实 ``CURRENT_DATE`` 行为。 """ if ctx is None: if site_id is None: raise ValueError("apply_runtime_session_vars 需要 ctx 或 site_id 之一") ctx = get_runtime_context(site_id, conn=conn) bd = ctx.business_date.isoformat() mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE with conn.cursor() as cur: cur.execute( "SELECT set_config('app.current_business_date', %s, true), " "set_config('app.current_runtime_mode', %s, true)", (bd, mode), )