Files
Neo-ZQYY/apps/backend/app/routers/internal_events.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

166 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# AI_CHANGELOG
# - 2026-03-29 | Prompt: DWS_TASK_ENGINE ETL 任务 | 新建文件。
# 提供 POST /api/internal/run-job 端点,供 ETL 按 job_name 执行
# biz.trigger_jobs 中的任务。Internal-Token 认证。
# -*- coding: utf-8 -*-
"""
内部任务执行 API — ETL/内部服务调用入口。
端点:
- POST /api/internal/run-job — 按 job_name 执行 biz.trigger_jobs 中的任务
认证方式Authorization: Internal-Token {token}
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from app.auth.internal_token import verify_internal_token
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/internal", tags=["internal-events"])
class RunJobByNameRequest(BaseModel):
"""按 job_name 执行任务的请求体。"""
job_name: str = Field(..., description="任务名称,如 recall_completion_check")
class RunJobByNameResponse(BaseModel):
"""执行结果。"""
success: bool
message: str
job_name: str
class EtlCompletedRequest(BaseModel):
"""ETL 完成通知请求体。"""
pipeline: str = Field(default="api_full", description="完成的 pipeline 名称")
site_id: int | None = Field(default=None, description="门店 ID可选")
class EtlCompletedResponse(BaseModel):
"""ETL 完成编排结果。"""
success: bool
recall_result: dict | None = None
task_gen_result: dict | None = None
message: str = ""
@router.post("/etl-completed", response_model=EtlCompletedResponse)
async def etl_completed_endpoint(
body: EtlCompletedRequest,
_token: str = Depends(verify_internal_token),
) -> EtlCompletedResponse:
"""ETL pipeline 完成后的统一编排入口。
CHANGE 2026-04-07 | Fix-12ETL 完成后自动触发。
编排顺序recall_detector.run() → task_generator.run()
"""
from app.services import recall_detector, task_generator
recall_result = None
task_gen_result = None
errors = []
# Step 1: 先检查召回完成(含回溯)
try:
recall_result = recall_detector.run()
logger.info("ETL 编排 Step1 recall_detector 完成: %s", recall_result)
except Exception:
logger.exception("ETL 编排 Step1 recall_detector 失败")
errors.append("recall_detector failed")
# Step 2: 再生成新任务
try:
task_gen_result = task_generator.run()
logger.info("ETL 编排 Step2 task_generator 完成: %s", task_gen_result)
except Exception:
logger.exception("ETL 编排 Step2 task_generator 失败")
errors.append("task_generator failed")
# Step 3: 触发 AI 财务洞察预生成App2 × 8 时间维度)
# 若请求未带 site_id查询所有 active site 逐个触发
try:
from app.services.trigger_scheduler import fire_event
site_ids: list[int] = []
if body.site_id is not None:
site_ids = [body.site_id]
else:
from app.database import get_connection as _gc
_c = _gc()
try:
with _c.cursor() as _cur:
_cur.execute("SELECT DISTINCT site_id FROM biz.trigger_jobs WHERE site_id IS NOT NULL")
site_ids = [r[0] for r in _cur.fetchall()]
_c.commit()
finally:
_c.close()
for sid in site_ids:
try:
fire_event("ai_dws_completed", {"site_id": sid})
except Exception:
logger.exception("触发 ai_dws_completed 失败: site_id=%s", sid)
except Exception:
logger.exception("ai_dws_completed 事件批量触发失败")
success = len(errors) == 0
return EtlCompletedResponse(
success=success,
recall_result=recall_result,
task_gen_result=task_gen_result,
message="; ".join(errors) if errors else "ok",
)
@router.post("/run-job", response_model=RunJobByNameResponse)
async def run_job_by_name_endpoint(
body: RunJobByNameRequest,
_token: str = Depends(verify_internal_token),
) -> RunJobByNameResponse:
"""按 job_name 查找并执行 biz.trigger_jobs 中的任务。
ETL DWS_TASK_ENGINE 任务通过此端点按顺序执行后端任务引擎的各个步骤。
"""
from app.database import get_connection
from app.services.trigger_scheduler import run_job_by_id
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM biz.trigger_jobs WHERE job_name = %s",
(body.job_name,),
)
row = cur.fetchone()
conn.commit()
finally:
conn.close()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"任务 '{body.job_name}' 不存在",
)
job_id = row[0]
result = run_job_by_id(job_id)
logger.info(
"内部任务执行: job_name=%s, success=%s",
body.job_name, result.get("success"),
)
return RunJobByNameResponse(
success=result.get("success", False),
message=result.get("message", ""),
job_name=body.job_name,
)