diff --git a/apps/README.md b/apps/README.md index 1267ba7..041f0ef 100644 --- a/apps/README.md +++ b/apps/README.md @@ -1,17 +1,18 @@ -# apps/ - -## 作用说明 - -应用项目顶层目录,存放所有可独立部署/运行的子项目。当前包含 ETL Connector、FastAPI 后端、微信小程序前端,以及预留的管理后台。 - -## 内部结构 - -- `etl/pipelines/feiqiu/` — 飞球 Connector(数据源连接器,抽取→清洗→汇总全流程) -- `backend/` — FastAPI 后端(小程序 API、权限、审批) -- `miniprogram/` — 微信小程序前端(Donut + TDesign) -- `admin-web/` — 管理后台(预留,暂未实施) - -## Roadmap - -- 新增更多 Connector 时,在 `etl/pipelines/` 下按平台名创建子目录 -- `admin-web/` 待产品需求确认后启动 +# apps/ + +## 作用说明 + +应用项目顶层目录,存放所有可独立部署/运行的子项目。当前包含 ETL Connector、FastAPI 后端、微信小程序前端,以及预留的管理后台。 + +## 内部结构 + +- `etl/pipelines/feiqiu/` — 飞球 Connector(数据源连接器,抽取→清洗→汇总全流程) +- `backend/` — FastAPI 后端(小程序 API、权限、审批) +- `miniprogram/` — 微信小程序前端(Donut + TDesign) +- `admin-web/` — 管理后台(预留,暂未实施) +- `mcp-server/` — MCP Server(为百炼 AI 应用提供 PostgreSQL 只读查询) + +## Roadmap + +- 新增更多 Connector 时,在 `etl/pipelines/` 下按平台名创建子目录 +- `admin-web/` 待产品需求确认后启动 diff --git a/apps/admin-web/src/App.tsx b/apps/admin-web/src/App.tsx index 9cba910..e373aca 100644 --- a/apps/admin-web/src/App.tsx +++ b/apps/admin-web/src/App.tsx @@ -175,8 +175,15 @@ const AppLayout: React.FC = () => { const App: React.FC = () => { const hydrate = useAuthStore((s) => s.hydrate); + const [hydrated, setHydrated] = useState(false); - useEffect(() => { hydrate(); }, [hydrate]); + useEffect(() => { + hydrate(); + setHydrated(true); + }, [hydrate]); + + /* hydrate 完成前不渲染路由,避免 PrivateRoute 误判跳转到 /login */ + if (!hydrated) return ; return ( diff --git a/apps/admin-web/src/api/tasks.ts b/apps/admin-web/src/api/tasks.ts index 50f4486..59d7583 100644 --- a/apps/admin-web/src/api/tasks.ts +++ b/apps/admin-web/src/api/tasks.ts @@ -1,32 +1,77 @@ -/** - * 任务相关 API 调用。 - * - * - fetchTaskRegistry:获取按业务域分组的任务注册表 - */ - -import { apiClient } from './client'; -import type { TaskConfig, TaskDefinition } from '../types'; - -/** 获取按业务域分组的任务注册表 */ -export async function fetchTaskRegistry(): Promise> { - // 后端返回 { groups: { 域名: [TaskItem] } },需要解包 - const { data } = await apiClient.get<{ groups: Record }>('/tasks/registry'); - return data.groups; -} - -/** 获取按业务域分组的 DWD 表定义 */ -export async function fetchDwdTables(): Promise> { - // 后端返回 { groups: { 域名: [DwdTableItem] } },需要解包并提取 table_name - const { data } = await apiClient.get<{ groups: Record }>('/tasks/dwd-tables'); - const result: Record = {}; - for (const [domain, items] of Object.entries(data.groups)) { - result[domain] = items.map((item) => item.table_name); - } - return result; -} - -/** 验证任务配置并返回生成的 CLI 命令预览 */ -export async function validateTaskConfig(config: TaskConfig): Promise<{ command: string }> { - const { data } = await apiClient.post<{ command: string }>('/tasks/validate', { config }); - return data; -} +/** + * 任务相关 API 调用。 + */ + +import { apiClient } from './client'; +import type { TaskConfig, TaskDefinition } from '../types'; + +/** DWD 表项(后端返回的原始结构) */ +export interface DwdTableItem { + table_name: string; + display_name: string; + domain: string; + ods_source: string; + is_dimension: boolean; +} + +/** Flow 定义 */ +export interface FlowDef { + id: string; + name: string; + layers: string[]; +} + +/** 处理模式定义 */ +export interface ProcessingModeDef { + id: string; + name: string; + description: string; +} + +/** 同步检查结果 */ +export interface SyncCheckResult { + in_sync: boolean; + backend_only: string[]; + etl_only: string[]; + error: string | null; +} + +/** 获取按业务域分组的任务注册表 */ +export async function fetchTaskRegistry(): Promise> { + const { data } = await apiClient.get<{ groups: Record }>('/tasks/registry'); + return data.groups; +} + +/** 获取按业务域分组的 DWD 表定义(保留完整结构) */ +export async function fetchDwdTablesRich(): Promise> { + const { data } = await apiClient.get<{ groups: Record }>('/tasks/dwd-tables'); + return data.groups; +} + +/** 获取按业务域分组的 DWD 表定义(仅表名,兼容旧调用) */ +export async function fetchDwdTables(): Promise> { + const groups = await fetchDwdTablesRich(); + const result: Record = {}; + for (const [domain, items] of Object.entries(groups)) { + result[domain] = items.map((item) => item.table_name); + } + return result; +} + +/** 获取 Flow 定义和处理模式定义 */ +export async function fetchFlows(): Promise<{ flows: FlowDef[]; processing_modes: ProcessingModeDef[] }> { + const { data } = await apiClient.get<{ flows: FlowDef[]; processing_modes: ProcessingModeDef[] }>('/tasks/flows'); + return data; +} + +/** 验证任务配置并返回生成的 CLI 命令预览 */ +export async function validateTaskConfig(config: TaskConfig): Promise<{ command: string }> { + const { data } = await apiClient.post<{ command: string }>('/tasks/validate', { config }); + return data; +} + +/** 对比后端与 ETL 真实注册表的任务列表差异 */ +export async function checkTaskSync(): Promise { + const { data } = await apiClient.get('/tasks/sync-check'); + return data; +} diff --git a/apps/admin-web/src/components/TaskSelector.tsx b/apps/admin-web/src/components/TaskSelector.tsx index 924d2a6..c92cb6f 100644 --- a/apps/admin-web/src/components/TaskSelector.tsx +++ b/apps/admin-web/src/components/TaskSelector.tsx @@ -1,307 +1,445 @@ /** - * 按业务域分组的任务选择器。 + * 按业务域全链路展示的任务选择器(v2)。 * - * 从 /api/tasks/registry 获取任务注册表,按业务域折叠展示, - * 支持全选/反选和按 Flow 层级过滤。 - * 当 Flow 包含 DWD 层时,在 DWD 任务下方内嵌表过滤子选项。 + * 每个业务域一个折叠面板,内部按层分组展示完整链路: + * ODS 任务 → DWD 表(该域的) → DWS/INDEX 任务 + * + * 功能: + * - 同步检查:工具栏右侧 Badge 指示,点击展示差异 Modal + * - 全选常用 / 全选 / 反选 / 清空 按钮 + * - DWD 表选中 = 过滤 DWD_LOAD_FROM_ODS 的装载范围 */ import React, { useEffect, useState, useMemo, useCallback } from "react"; import { - Collapse, - Checkbox, - Spin, - Alert, - Button, - Space, - Typography, - Tag, - Divider, + Collapse, Checkbox, Spin, Alert, Button, Space, Typography, + Tag, Badge, Modal, Tooltip, Divider, } from "antd"; +import { + CheckCircleOutlined, WarningOutlined, SyncOutlined, TableOutlined, +} from "@ant-design/icons"; import type { CheckboxChangeEvent } from "antd/es/checkbox"; -import { fetchTaskRegistry, fetchDwdTables } from "../api/tasks"; -import type { TaskDefinition } from "../types"; +import { fetchTaskRegistry, fetchDwdTablesRich, checkTaskSync } from "../api/tasks"; +import type { DwdTableItem as ApiDwdTableItem, SyncCheckResult } from "../api/tasks"; +import type { TaskDefinition, DwdTableItem } from "../types"; const { Text } = Typography; -/* ------------------------------------------------------------------ */ -/* Props */ -/* ------------------------------------------------------------------ */ +/* 层排序 / 标签 / 颜色 */ +const LAYER_ORDER: Record = { ODS: 0, DWD: 1, DWS: 2, INDEX: 3, UTILITY: 4 }; +const LAYER_LABELS: Record = { + ODS: "ODS 抽取", DWD: "DWD 装载", DWS: "DWS 汇总", INDEX: "DWS 指数", UTILITY: "工具", +}; +const LAYER_COLORS: Record = { + ODS: "blue", DWD: "green", DWS: "orange", INDEX: "purple", UTILITY: "default", +}; +/* 域排序 */ +const DOMAIN_ORDER: Record = { + 助教: 0, 结算: 1, 台桌: 2, 会员: 3, 商品: 4, 团购: 5, 库存: 6, 财务: 7, 指数: 8, 通用: 9, 工具: 10, +}; export interface TaskSelectorProps { - /** 当前 Flow 包含的层(如 ["ODS", "DWD"]) */ layers: string[]; - /** 已选中的任务编码列表 */ selectedTasks: string[]; - /** 选中任务变化回调 */ onTasksChange: (tasks: string[]) => void; - /** DWD 表过滤:已选中的表名列表 */ selectedDwdTables?: string[]; - /** DWD 表过滤变化回调 */ onDwdTablesChange?: (tables: string[]) => void; } -/* ------------------------------------------------------------------ */ -/* 过滤逻辑 */ -/* ------------------------------------------------------------------ */ - -export function filterTasksByLayers( - tasks: TaskDefinition[], - layers: string[], -): TaskDefinition[] { - if (layers.length === 0) return []; - return tasks; +interface DomainGroup { + domain: string; + layerTasks: { layer: string; tasks: TaskDefinition[] }[]; + dwdTables: DwdTableItem[]; } -/* ------------------------------------------------------------------ */ -/* 组件 */ -/* ------------------------------------------------------------------ */ +/** 当 layers 包含 DWD 时,DWD_LOAD_FROM_ODS 由 DWD 表过滤区块隐含,不单独显示 */ +const HIDDEN_WHEN_DWD_VISIBLE = new Set(["DWD_LOAD_FROM_ODS"]); + +/** 按域 + 层构建分组 */ +function buildDomainGroups( + registry: Record, + dwdTableGroups: Record, + layers: string[], +): DomainGroup[] { + const hideDwdTasks = layers.includes("DWD"); + const domainSet = new Set(); + const tasksByDomainLayer = new Map>(); + + for (const tasks of Object.values(registry)) { + for (const t of tasks) { + if (!layers.includes(t.layer)) continue; + if (hideDwdTasks && HIDDEN_WHEN_DWD_VISIBLE.has(t.code)) continue; + domainSet.add(t.domain); + if (!tasksByDomainLayer.has(t.domain)) tasksByDomainLayer.set(t.domain, new Map()); + const layerMap = tasksByDomainLayer.get(t.domain)!; + if (!layerMap.has(t.layer)) layerMap.set(t.layer, []); + layerMap.get(t.layer)!.push(t); + } + } + + if (layers.includes("DWD")) { + for (const domain of Object.keys(dwdTableGroups)) domainSet.add(domain); + } + + const groups: DomainGroup[] = []; + for (const domain of domainSet) { + const layerMap = tasksByDomainLayer.get(domain) ?? new Map(); + const layerTasks: { layer: string; tasks: TaskDefinition[] }[] = []; + const sortedLayers = [...layerMap.keys()].sort( + (a, b) => (LAYER_ORDER[a] ?? 99) - (LAYER_ORDER[b] ?? 99), + ); + for (const layer of sortedLayers) { + const tasks = layerMap.get(layer)!; + tasks.sort((a, b) => (a.is_common === b.is_common ? 0 : a.is_common ? -1 : 1)); + layerTasks.push({ layer, tasks }); + } + const dwdTables = layers.includes("DWD") ? (dwdTableGroups[domain] ?? []) : []; + if (layerTasks.length > 0 || dwdTables.length > 0) { + groups.push({ domain, layerTasks, dwdTables }); + } + } + groups.sort((a, b) => (DOMAIN_ORDER[a.domain] ?? 99) - (DOMAIN_ORDER[b.domain] ?? 99)); + return groups; +} const TaskSelector: React.FC = ({ - layers, - selectedTasks, - onTasksChange, - selectedDwdTables = [], - onDwdTablesChange, + layers, selectedTasks, onTasksChange, + selectedDwdTables = [], onDwdTablesChange, }) => { const [registry, setRegistry] = useState>({}); + const [dwdTableGroups, setDwdTableGroups] = useState>({}); const [loading, setLoading] = useState(false); const [error, setError] = useState(null); + const [syncResult, setSyncResult] = useState(null); + const [syncLoading, setSyncLoading] = useState(false); + const [syncModalOpen, setSyncModalOpen] = useState(false); - // DWD 表定义(按域分组) - const [dwdTableGroups, setDwdTableGroups] = useState>({}); - const showDwdFilter = layers.includes("DWD") && !!onDwdTablesChange; - - /* ---------- 加载任务注册表 ---------- */ + /* 加载数据 */ useEffect(() => { let cancelled = false; setLoading(true); setError(null); - const promises: Promise[] = [ fetchTaskRegistry() .then((data) => { if (!cancelled) setRegistry(data); }) .catch((err) => { if (!cancelled) setError(err?.message ?? "获取任务列表失败"); }), ]; - // 如果包含 DWD 层,同时加载 DWD 表定义 if (layers.includes("DWD")) { promises.push( - fetchDwdTables() - .then((data) => { if (!cancelled) setDwdTableGroups(data); }) - .catch(() => { /* DWD 表加载失败不阻塞任务列表 */ }), + fetchDwdTablesRich() + .then((data) => { + if (cancelled) return; + const converted: Record = {}; + for (const [domain, items] of Object.entries(data)) { + converted[domain] = items.map((item: ApiDwdTableItem) => ({ + table_name: item.table_name, display_name: item.display_name, + domain: item.domain, ods_source: item.ods_source, is_dimension: item.is_dimension, + })); + } + setDwdTableGroups(converted); + }) + .catch(() => {}), ); } - Promise.all(promises).finally(() => { if (!cancelled) setLoading(false); }); return () => { cancelled = true; }; }, [layers]); - /* ---------- 按 layers 过滤后的分组 ---------- */ - const filteredGroups = useMemo(() => { - const result: Record = {}; - for (const [domain, tasks] of Object.entries(registry)) { - const visible = filterTasksByLayers(tasks, layers); - if (visible.length > 0) { - result[domain] = [...visible].sort((a, b) => { - if (a.is_common === b.is_common) return 0; - return a.is_common ? -1 : 1; - }); - } - } - return result; - }, [registry, layers]); + /* 首次加载后自动同步检查 */ + useEffect(() => { + if (Object.keys(registry).length > 0) handleSyncCheck(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [registry]); + + const domainGroups = useMemo( + () => buildDomainGroups(registry, dwdTableGroups, layers), + [registry, dwdTableGroups, layers], + ); const allVisibleCodes = useMemo( - () => Object.values(filteredGroups).flatMap((t) => t.map((d) => d.code)), - [filteredGroups], + () => domainGroups.flatMap((g) => g.layerTasks.flatMap((lt) => lt.tasks.map((t) => t.code))), + [domainGroups], + ); + const allCommonCodes = useMemo( + () => domainGroups.flatMap((g) => + g.layerTasks.flatMap((lt) => lt.tasks.filter((t) => t.is_common).map((t) => t.code)), + ), + [domainGroups], ); - // DWD 表扁平列表 - const allDwdTableNames = useMemo( - () => Object.values(dwdTableGroups).flat(), - [dwdTableGroups], - ); + /* 同步检查 */ + const handleSyncCheck = useCallback(async () => { + setSyncLoading(true); + try { setSyncResult(await checkTaskSync()); } + catch { setSyncResult({ in_sync: false, backend_only: [], etl_only: [], error: "检查失败" }); } + finally { setSyncLoading(false); } + }, []); - /* ---------- 事件处理 ---------- */ - - const handleDomainChange = useCallback( - (domain: string, checkedCodes: string[]) => { - const otherDomainCodes = selectedTasks.filter( - (code) => !filteredGroups[domain]?.some((t) => t.code === code), - ); - onTasksChange([...otherDomainCodes, ...checkedCodes]); - }, - [selectedTasks, filteredGroups, onTasksChange], - ); - - const handleSelectAll = useCallback(() => { - onTasksChange(allVisibleCodes); - }, [allVisibleCodes, onTasksChange]); - - const handleInvertSelection = useCallback(() => { - const currentSet = new Set(selectedTasks); - const inverted = allVisibleCodes.filter((code) => !currentSet.has(code)); - onTasksChange(inverted); + /* 任务选择 */ + const handleSelectAll = useCallback(() => onTasksChange(allVisibleCodes), [allVisibleCodes, onTasksChange]); + const handleSelectCommon = useCallback(() => onTasksChange(allCommonCodes), [allCommonCodes, onTasksChange]); + const handleInvert = useCallback(() => { + const set = new Set(selectedTasks); + onTasksChange(allVisibleCodes.filter((c) => !set.has(c))); }, [allVisibleCodes, selectedTasks, onTasksChange]); + const handleClear = useCallback(() => onTasksChange([]), [onTasksChange]); - /* ---------- DWD 表过滤事件 ---------- */ - - const handleDwdDomainTableChange = useCallback( - (domain: string, checked: string[]) => { - if (!onDwdTablesChange) return; - const domainTables = new Set(dwdTableGroups[domain] ?? []); - const otherSelected = selectedDwdTables.filter((t) => !domainTables.has(t)); - onDwdTablesChange([...otherSelected, ...checked]); + const handleDomainToggle = useCallback( + (group: DomainGroup, checked: boolean) => { + const codes = new Set(group.layerTasks.flatMap((lt) => lt.tasks.map((t) => t.code))); + if (checked) { + const merged = new Set(selectedTasks); + codes.forEach((c) => merged.add(c)); + onTasksChange([...merged]); + } else { + onTasksChange(selectedTasks.filter((c) => !codes.has(c))); + } }, - [selectedDwdTables, dwdTableGroups, onDwdTablesChange], + [selectedTasks, onTasksChange], ); - const handleDwdSelectAll = useCallback(() => { - onDwdTablesChange?.(allDwdTableNames); - }, [allDwdTableNames, onDwdTablesChange]); + const handleTaskToggle = useCallback( + (code: string, checked: boolean) => { + onTasksChange(checked ? [...selectedTasks, code] : selectedTasks.filter((c) => c !== code)); + }, + [selectedTasks, onTasksChange], + ); - const handleDwdClearAll = useCallback(() => { - onDwdTablesChange?.([]); - }, [onDwdTablesChange]); + /* DWD 表选择 */ + const handleDwdTableToggle = useCallback( + (tableName: string, checked: boolean) => { + if (!onDwdTablesChange) return; + onDwdTablesChange(checked + ? [...selectedDwdTables, tableName] + : selectedDwdTables.filter((t) => t !== tableName)); + }, + [selectedDwdTables, onDwdTablesChange], + ); - /* ---------- 渲染 ---------- */ + const handleDwdDomainToggle = useCallback( + (tables: DwdTableItem[], checked: boolean) => { + if (!onDwdTablesChange) return; + const names = new Set(tables.map((t) => t.table_name)); + if (checked) { + const merged = new Set(selectedDwdTables); + names.forEach((n) => merged.add(n)); + onDwdTablesChange([...merged]); + } else { + onDwdTablesChange(selectedDwdTables.filter((t) => !names.has(t))); + } + }, + [selectedDwdTables, onDwdTablesChange], + ); + /* 渲染 */ if (loading) return ; if (error) return ; - - const domainEntries = Object.entries(filteredGroups); - if (domainEntries.length === 0) return 当前 Flow 无可选任务; + if (domainGroups.length === 0) return 当前 Flow 无可选任务; const selectedCount = selectedTasks.filter((c) => allVisibleCodes.includes(c)).length; - // DWD 装载任务是否被选中 - const dwdLoadSelected = selectedTasks.includes("DWD_LOAD_FROM_ODS"); + const showDwdFilter = layers.includes("DWD") && !!onDwdTablesChange; + + /** 渲染某个域下的 DWD 表过滤区块 */ + const renderDwdTableFilter = (dwdTables: DwdTableItem[]) => { + if (!showDwdFilter || dwdTables.length === 0) return null; + const domainDwdSelected = selectedDwdTables.filter((t) => dwdTables.some((d) => d.table_name === t)); + return ( +
+
+ + + DWD 表过滤 + + {domainDwdSelected.length === 0 ? "(未选 = 全部装载)" : `${domainDwdSelected.length}/${dwdTables.length}`} + + + + + + +
+ {dwdTables.map((dt) => ( +
+ handleDwdTableToggle(dt.table_name, e.target.checked)} + > + {dt.table_name} + {dt.display_name} + {dt.is_dimension && 维度} + +
+ ))} +
+ ); + }; return (
- - - - 已选 {selectedCount} / {allVisibleCodes.length} - + {/* 工具栏 */} +
+ + + + + + 已选 {selectedCount} / {allVisibleCodes.length} + + + {syncLoading ? ( + + ) : syncResult === null ? ( + + ) : syncResult.in_sync ? ( + + ) : ( + + + + )} + +
+ {/* 域折叠面板 */} d)} - items={domainEntries.map(([domain, tasks]) => { - const domainCodes = tasks.map((t) => t.code); + defaultActiveKey={domainGroups.filter((g) => g.domain !== "工具" && g.domain !== "通用").map((g) => g.domain)} + items={domainGroups.map((group) => { + const domainCodes = group.layerTasks.flatMap((lt) => lt.tasks.map((t) => t.code)); const domainSelected = selectedTasks.filter((c) => domainCodes.includes(c)); - const allChecked = domainSelected.length === domainCodes.length; + const allChecked = domainCodes.length > 0 && domainSelected.length === domainCodes.length; const indeterminate = domainSelected.length > 0 && !allChecked; - const handleDomainCheckAll = (e: CheckboxChangeEvent) => { - handleDomainChange(domain, e.target.checked ? domainCodes : []); - }; - return { - key: domain, + key: group.domain, label: ( e.stopPropagation()}> handleDomainToggle(group, e.target.checked)} style={{ marginRight: 8 }} /> - {domain} - - ({domainSelected.length}/{domainCodes.length}) - + {group.domain} + ({domainSelected.length}/{domainCodes.length}) ), children: ( - handleDomainChange(domain, checked as string[])} - > - - {tasks.map((t) => ( - - {t.code} - {t.name} - {t.is_common === false && ( - 不常用 - )} - - ))} - - +
+ {(() => { + /* 找到 DWD 表过滤应插入的位置:ODS 之后、DWS/INDEX 之前 */ + const hasDwdLayer = group.layerTasks.some((lt) => lt.layer === "DWD"); + const shouldInsertDwd = !hasDwdLayer && group.dwdTables.length > 0 && showDwdFilter; + /* 插入点:第一个 DWS/INDEX/UTILITY 层之前,若全是 ODS 则在末尾 */ + const insertIdx = shouldInsertDwd + ? group.layerTasks.findIndex((lt) => (LAYER_ORDER[lt.layer] ?? 99) >= (LAYER_ORDER["DWS"] ?? 2)) + : -1; + const effectiveInsertIdx = shouldInsertDwd && insertIdx === -1 ? group.layerTasks.length : insertIdx; + + const elements: React.ReactNode[] = []; + group.layerTasks.forEach((lt, idx) => { + /* 在此位置插入 DWD 表过滤 */ + if (shouldInsertDwd && idx === effectiveInsertIdx) { + elements.push( +
+ {elements.length > 0 && } +
+ DWD 装载 +
+ {renderDwdTableFilter(group.dwdTables)} +
, + ); + } + elements.push( +
+ {elements.length > 0 && } +
+ + {LAYER_LABELS[lt.layer] ?? lt.layer} + +
+
+ {lt.tasks.map((t) => ( +
+ handleTaskToggle(t.code, e.target.checked)} + > + {t.code} + {t.name} + {!t.is_common && 不常用} + +
+ ))} +
+ {/* DWD 表过滤紧跟 DWD 层任务 */} + {lt.layer === "DWD" && renderDwdTableFilter(group.dwdTables)} +
, + ); + }); + /* 所有层遍历完后,若插入点在末尾 */ + if (shouldInsertDwd && effectiveInsertIdx >= group.layerTasks.length) { + elements.push( +
+ {elements.length > 0 && } +
+ DWD 装载 +
+ {renderDwdTableFilter(group.dwdTables)} +
, + ); + } + return elements; + })()} +
), }; })} /> - {/* DWD 表过滤:仅在 DWD 层且 DWD_LOAD_FROM_ODS 被选中时显示 */} - {showDwdFilter && dwdLoadSelected && allDwdTableNames.length > 0 && ( - <> - -
- - DWD 表过滤 - - {selectedDwdTables.length === 0 - ? "(未选择 = 全部装载)" - : `已选 ${selectedDwdTables.length} / ${allDwdTableNames.length}`} - - -
- - - - -
- { - const domainSelected = selectedDwdTables.filter((t) => tables.includes(t)); - const allDomainChecked = domainSelected.length === tables.length; - const domainIndeterminate = domainSelected.length > 0 && !allDomainChecked; - - return { - key: domain, - label: ( - e.stopPropagation()}> - - handleDwdDomainTableChange(domain, e.target.checked ? tables : []) - } - style={{ marginRight: 8 }} - /> - {domain} - - ({domainSelected.length}/{tables.length}) - - - ), - children: ( - handleDwdDomainTableChange(domain, checked as string[])} - > - - {tables.map((table) => ( - - {table} - - ))} - - - ), - }; - })} - /> + {/* 同步差异 Modal */} + setSyncModalOpen(false)} + footer={[ + , + , + ]} + > + {syncResult?.error ? ( + + ) : ( +
+ {syncResult?.backend_only && syncResult.backend_only.length > 0 && ( +
+ 后端有但 ETL 无({syncResult.backend_only.length}): +
+ {syncResult.backend_only.map((code) => ( + {code} + ))} +
+
+ )} + {syncResult?.etl_only && syncResult.etl_only.length > 0 && ( +
+ ETL 有但后端无({syncResult.etl_only.length}): +
+ {syncResult.etl_only.map((code) => ( + {code} + ))} +
+
+ )} + {syncResult?.in_sync && ( + + )}
- - )} + )} +
); }; diff --git a/apps/admin-web/src/pages/TaskConfig.tsx b/apps/admin-web/src/pages/TaskConfig.tsx index cbb164e..0c209b9 100644 --- a/apps/admin-web/src/pages/TaskConfig.tsx +++ b/apps/admin-web/src/pages/TaskConfig.tsx @@ -24,6 +24,7 @@ import { TreeSelect, Tooltip, Segmented, + Spin, } from "antd"; import { SendOutlined, @@ -37,7 +38,8 @@ import { } from "@ant-design/icons"; import { useNavigate } from "react-router-dom"; import TaskSelector from "../components/TaskSelector"; -import { validateTaskConfig } from "../api/tasks"; +import { validateTaskConfig, fetchFlows } from "../api/tasks"; +import type { FlowDef, ProcessingModeDef } from "../api/tasks"; import { submitToQueue, executeDirectly } from "../api/execution"; import { useAuthStore } from "../store/authStore"; import type { RadioChangeEvent } from "antd"; @@ -48,32 +50,45 @@ const { Title, Text } = Typography; const { TextArea } = Input; /* ------------------------------------------------------------------ */ -/* Flow 定义 */ +/* Flow / 处理模式 — 本地 fallback(API 不可用时兜底) */ /* ------------------------------------------------------------------ */ -const FLOW_DEFINITIONS: Record = { - api_ods: { name: "API → ODS", layers: ["ODS"], desc: "仅抓取原始数据" }, - api_ods_dwd: { name: "API → ODS → DWD", layers: ["ODS", "DWD"], desc: "抓取并清洗装载" }, - api_full: { name: "API → ODS → DWD → DWS → INDEX", layers: ["ODS", "DWD", "DWS", "INDEX"], desc: "全链路执行" }, - ods_dwd: { name: "ODS → DWD", layers: ["DWD"], desc: "仅清洗装载" }, - dwd_dws: { name: "DWD → DWS汇总", layers: ["DWS"], desc: "仅汇总计算" }, - dwd_dws_index: { name: "DWD → DWS → INDEX", layers: ["DWS", "INDEX"], desc: "汇总+指数" }, - dwd_index: { name: "DWD → INDEX", layers: ["INDEX"], desc: "仅指数计算" }, +interface FlowEntry { name: string; layers: string[] } + +const FALLBACK_FLOWS: Record = { + api_ods: { name: "API → ODS", layers: ["ODS"] }, + api_ods_dwd: { name: "API → ODS → DWD", layers: ["ODS", "DWD"] }, + api_full: { name: "API → ODS → DWD → DWS → INDEX", layers: ["ODS", "DWD", "DWS", "INDEX"] }, + ods_dwd: { name: "ODS → DWD", layers: ["DWD"] }, + dwd_dws: { name: "DWD → DWS汇总", layers: ["DWS"] }, + dwd_dws_index: { name: "DWD → DWS → INDEX", layers: ["DWS", "INDEX"] }, + dwd_index: { name: "DWD → INDEX", layers: ["INDEX"] }, }; -export function getFlowLayers(flowId: string): string[] { - return FLOW_DEFINITIONS[flowId]?.layers ?? []; -} +interface ProcModeEntry { value: string; label: string; desc: string } -/* ------------------------------------------------------------------ */ -/* 处理模式 */ -/* ------------------------------------------------------------------ */ - -const PROCESSING_MODES = [ +const FALLBACK_PROCESSING_MODES: ProcModeEntry[] = [ { value: "increment_only", label: "仅增量", desc: "按游标增量抓取和装载" }, { value: "verify_only", label: "校验并修复", desc: "对比源和目标,修复差异" }, { value: "increment_verify", label: "增量+校验", desc: "先增量再校验" }, -] as const; +]; + +/** 将 API 返回的 FlowDef[] 转为 Record */ +function apiFlowsToRecord(flows: FlowDef[]): Record { + const result: Record = {}; + for (const f of flows) result[f.id] = { name: f.name, layers: f.layers }; + return result; +} + +/** 将 API 返回的 ProcessingModeDef[] 转为 ProcModeEntry[] */ +function apiModesToEntries(modes: ProcessingModeDef[]): ProcModeEntry[] { + return modes.map((m) => ({ value: m.id, label: m.name, desc: m.description })); +} + +/** 外部可用的 getFlowLayers(使用 fallback,组件内部用动态数据) */ +export function getFlowLayers(flowId: string): string[] { + return FALLBACK_FLOWS[flowId]?.layers ?? []; +} /* ------------------------------------------------------------------ */ /* 时间窗口 */ @@ -147,6 +162,24 @@ const TaskConfig: React.FC = () => { const navigate = useNavigate(); const user = useAuthStore((s) => s.user); + /* ---------- Flow / 处理模式 动态加载 ---------- */ + const [flowDefs, setFlowDefs] = useState>(FALLBACK_FLOWS); + const [procModes, setProcModes] = useState(FALLBACK_PROCESSING_MODES); + const [flowsLoading, setFlowsLoading] = useState(true); + + useEffect(() => { + let cancelled = false; + fetchFlows() + .then(({ flows, processing_modes }) => { + if (cancelled) return; + if (flows.length > 0) setFlowDefs(apiFlowsToRecord(flows)); + if (processing_modes.length > 0) setProcModes(apiModesToEntries(processing_modes)); + }) + .catch(() => { /* API 不可用,使用 fallback */ }) + .finally(() => { if (!cancelled) setFlowsLoading(false); }); + return () => { cancelled = true; }; + }, []); + /* ---------- 连接器 & Store 树形选择 ---------- */ const { treeData: connectorTreeData, allValues: allConnectorStoreValues } = useMemo( () => buildConnectorStoreTree(CONNECTOR_DEFS, user?.site_id ?? null), @@ -199,12 +232,17 @@ const TaskConfig: React.FC = () => { const [submitting, setSubmitting] = useState(false); /* ---------- 派生状态 ---------- */ - const layers = getFlowLayers(flow); + const layers = flowDefs[flow]?.layers ?? []; const showVerifyOption = processingMode === "verify_only"; /* ---------- 构建 TaskConfig 对象 ---------- */ - const buildTaskConfig = (): TaskConfigType => ({ - tasks: selectedTasks, + const buildTaskConfig = (): TaskConfigType => { + /* layers 包含 DWD 时自动注入 DWD_LOAD_FROM_ODS(UI 上由 DWD 表过滤区块隐含) */ + const tasks = layers.includes("DWD") && !selectedTasks.includes("DWD_LOAD_FROM_ODS") + ? [...selectedTasks, "DWD_LOAD_FROM_ODS"] + : selectedTasks; + return { + tasks, pipeline: flow, processing_mode: processingMode, pipeline_flow: "FULL", @@ -223,7 +261,8 @@ const TaskConfig: React.FC = () => { dwd_only_tables: selectedDwdTables.length > 0 ? selectedDwdTables : null, force_full: forceFull, extra_args: {}, - }); + }; + }; /* ---------- 自动刷新 CLI 预览 ---------- */ const refreshCli = async () => { @@ -326,12 +365,12 @@ const TaskConfig: React.FC = () => { - + 执行流程 (Flow) : "执行流程 (Flow)"} style={cardStyle}> - {Object.entries(FLOW_DEFINITIONS).map(([id, def]) => ( + {Object.entries(flowDefs).map(([id, def]) => ( - + {id} @@ -361,7 +400,7 @@ const TaskConfig: React.FC = () => { }} > - {PROCESSING_MODES.map((m) => ( + {procModes.map((m) => ( {m.label}
diff --git a/apps/admin-web/src/types/index.ts b/apps/admin-web/src/types/index.ts index 82ed551..7896dda 100644 --- a/apps/admin-web/src/types/index.ts +++ b/apps/admin-web/src/types/index.ts @@ -1,133 +1,144 @@ -/** - * 前后端共享的 TypeScript 类型定义。 - * 与设计文档中的 Pydantic 模型和数据库表结构对应。 - */ - -/** ETL 任务执行配置 */ -export interface TaskConfig { - tasks: string[]; - /** 执行流程 Flow ID(对应 CLI --pipeline) */ - pipeline: string; - /** 处理模式 */ - processing_mode: string; - /** 传统模式兼容(已弃用) */ - pipeline_flow: string; - dry_run: boolean; - /** lookback / custom */ - window_mode: string; - window_start: string | null; - window_end: string | null; - /** none / day */ - window_split: string | null; - /** 1 / 10 / 30 */ - window_split_days: number | null; - lookback_hours: number; - overlap_seconds: number; - fetch_before_verify: boolean; - skip_ods_when_fetch_before_verify: boolean; - ods_use_local_json: boolean; - /** 门店 ID(由后端从 JWT 注入) */ - store_id: number | null; - /** DWD 表级选择 */ - dwd_only_tables: string[] | null; - /** 强制全量处理(跳过 hash 去重和变更对比) */ - force_full: boolean; - extra_args: Record; -} - -/** 执行流程(Flow)定义 */ -export interface PipelineDefinition { - id: string; - name: string; - /** 包含的层:ODS / DWD / DWS / INDEX */ - layers: string[]; -} - -/** 处理模式定义 */ -export interface ProcessingModeDefinition { - id: string; - name: string; - description: string; -} - -/** 任务注册表中的任务定义 */ -export interface TaskDefinition { - code: string; - name: string; - description: string; - /** 业务域(会员、结算、助教等) */ - domain: string; - requires_window: boolean; - is_ods: boolean; - is_dimension: boolean; - default_enabled: boolean; - /** 常用任务标记,false 表示工具类/手动类任务 */ - is_common: boolean; -} - -/** 调度配置 */ -export interface ScheduleConfig { - schedule_type: "once" | "interval" | "daily" | "weekly" | "cron"; - interval_value: number; - interval_unit: "minutes" | "hours" | "days"; - daily_time: string; - weekly_days: number[]; - weekly_time: string; - cron_expression: string; - enabled: boolean; - start_date: string | null; - end_date: string | null; -} - -/** 队列中的任务 */ -export interface QueuedTask { - id: string; - site_id: number; - config: TaskConfig; - status: "pending" | "running" | "success" | "failed" | "cancelled"; - position: number; - created_at: string; - started_at: string | null; - finished_at: string | null; - exit_code: number | null; - error_message: string | null; -} - -/** 执行历史记录 */ -export interface ExecutionLog { - id: string; - site_id: number; - task_codes: string[]; - status: string; - started_at: string; - finished_at: string | null; - exit_code: number | null; - duration_ms: number | null; - command: string; - summary: Record | null; -} - -/** 调度任务 */ -export interface ScheduledTask { - id: string; - site_id: number; - name: string; - task_codes: string[]; - task_config: TaskConfig; - schedule_config: ScheduleConfig; - enabled: boolean; - last_run_at: string | null; - next_run_at: string | null; - run_count: number; - last_status: string | null; - created_at: string; - updated_at: string; -} - -/** 环境配置项 */ -export interface EnvConfigItem { - key: string; - value: string; - is_sensitive: boolean; -} - +/** + * 前后端共享的 TypeScript 类型定义。 + * 与设计文档中的 Pydantic 模型和数据库表结构对应。 + */ + +/** ETL 任务执行配置 */ +export interface TaskConfig { + tasks: string[]; + /** 执行流程 Flow ID(对应 CLI --pipeline) */ + pipeline: string; + /** 处理模式 */ + processing_mode: string; + /** 传统模式兼容(已弃用) */ + pipeline_flow: string; + dry_run: boolean; + /** lookback / custom */ + window_mode: string; + window_start: string | null; + window_end: string | null; + /** none / day */ + window_split: string | null; + /** 1 / 10 / 30 */ + window_split_days: number | null; + lookback_hours: number; + overlap_seconds: number; + fetch_before_verify: boolean; + skip_ods_when_fetch_before_verify: boolean; + ods_use_local_json: boolean; + /** 门店 ID(由后端从 JWT 注入) */ + store_id: number | null; + /** DWD 表级选择 */ + dwd_only_tables: string[] | null; + /** 强制全量处理(跳过 hash 去重和变更对比) */ + force_full: boolean; + extra_args: Record; +} + +/** 执行流程(Flow)定义 */ +export interface PipelineDefinition { + id: string; + name: string; + /** 包含的层:ODS / DWD / DWS / INDEX */ + layers: string[]; +} + +/** 处理模式定义 */ +export interface ProcessingModeDefinition { + id: string; + name: string; + description: string; +} + +/** 任务注册表中的任务定义 */ +export interface TaskDefinition { + code: string; + name: string; + description: string; + /** 业务域(会员、结算、助教等) */ + domain: string; + /** 所属层:ODS / DWD / DWS / INDEX / UTILITY */ + layer: string; + requires_window: boolean; + is_ods: boolean; + is_dimension: boolean; + default_enabled: boolean; + /** 常用任务标记,false 表示工具类/手动类任务 */ + is_common: boolean; +} + +/** DWD 表定义(后端返回的完整结构) */ +export interface DwdTableItem { + table_name: string; + display_name: string; + domain: string; + ods_source: string; + is_dimension: boolean; +} + +/** 调度配置 */ +export interface ScheduleConfig { + schedule_type: "once" | "interval" | "daily" | "weekly" | "cron"; + interval_value: number; + interval_unit: "minutes" | "hours" | "days"; + daily_time: string; + weekly_days: number[]; + weekly_time: string; + cron_expression: string; + enabled: boolean; + start_date: string | null; + end_date: string | null; +} + +/** 队列中的任务 */ +export interface QueuedTask { + id: string; + site_id: number; + config: TaskConfig; + status: "pending" | "running" | "success" | "failed" | "cancelled"; + position: number; + created_at: string; + started_at: string | null; + finished_at: string | null; + exit_code: number | null; + error_message: string | null; +} + +/** 执行历史记录 */ +export interface ExecutionLog { + id: string; + site_id: number; + task_codes: string[]; + status: string; + started_at: string; + finished_at: string | null; + exit_code: number | null; + duration_ms: number | null; + command: string; + summary: Record | null; +} + +/** 调度任务 */ +export interface ScheduledTask { + id: string; + site_id: number; + name: string; + task_codes: string[]; + task_config: TaskConfig; + schedule_config: ScheduleConfig; + enabled: boolean; + last_run_at: string | null; + next_run_at: string | null; + run_count: number; + last_status: string | null; + created_at: string; + updated_at: string; +} + +/** 环境配置项 */ +export interface EnvConfigItem { + key: string; + value: string; + is_sensitive: boolean; +} + diff --git a/apps/backend/app/routers/tasks.py b/apps/backend/app/routers/tasks.py index ffe6e54..2a214e1 100644 --- a/apps/backend/app/routers/tasks.py +++ b/apps/backend/app/routers/tasks.py @@ -1,209 +1,264 @@ -# -*- coding: utf-8 -*- -"""任务注册表 & 配置 API - -提供 4 个端点: -- GET /api/tasks/registry — 按业务域分组的任务列表 -- GET /api/tasks/dwd-tables — 按业务域分组的 DWD 表定义 -- GET /api/tasks/flows — 7 种 Flow + 3 种处理模式 -- POST /api/tasks/validate — 验证 TaskConfig 并返回 CLI 命令预览 - -所有端点需要 JWT 认证。validate 端点从 JWT 注入 store_id。 -""" - -from __future__ import annotations - -from typing import Any - -from fastapi import APIRouter, Depends -from pydantic import BaseModel - -from app.auth.dependencies import CurrentUser, get_current_user -from app.config import ETL_PROJECT_PATH -from app.schemas.tasks import ( - FlowDefinition, - ProcessingModeDefinition, - TaskConfigSchema, -) -from app.services.cli_builder import cli_builder -from app.services.task_registry import ( - DWD_TABLES, - FLOW_LAYER_MAP, - get_dwd_tables_grouped_by_domain, - get_tasks_grouped_by_domain, -) - -router = APIRouter(prefix="/api/tasks", tags=["任务配置"]) - - -# ── 响应模型 ────────────────────────────────────────────────── - -class TaskItem(BaseModel): - code: str - name: str - description: str - domain: str - layer: str - requires_window: bool - is_ods: bool - is_dimension: bool - default_enabled: bool - is_common: bool - - -class DwdTableItem(BaseModel): - table_name: str - display_name: str - domain: str - ods_source: str - is_dimension: bool - - -class TaskRegistryResponse(BaseModel): - """按业务域分组的任务列表""" - groups: dict[str, list[TaskItem]] - - -class DwdTablesResponse(BaseModel): - """按业务域分组的 DWD 表定义""" - groups: dict[str, list[DwdTableItem]] - - -class FlowsResponse(BaseModel): - """Flow 定义 + 处理模式定义""" - flows: list[FlowDefinition] - processing_modes: list[ProcessingModeDefinition] - - -class ValidateRequest(BaseModel): - """验证请求体 — 复用 TaskConfigSchema,但 store_id 由后端注入""" - config: TaskConfigSchema - - -class ValidateResponse(BaseModel): - """验证结果 + CLI 命令预览""" - valid: bool - command: str - command_args: list[str] - errors: list[str] - - -# ── Flow 定义(静态) ──────────────────────────────────────── - -FLOW_DEFINITIONS: list[FlowDefinition] = [ - FlowDefinition(id="api_ods", name="API → ODS", layers=["ODS"]), - FlowDefinition(id="api_ods_dwd", name="API → ODS → DWD", layers=["ODS", "DWD"]), - FlowDefinition(id="api_full", name="API → ODS → DWD → DWS汇总 → DWS指数", layers=["ODS", "DWD", "DWS", "INDEX"]), - FlowDefinition(id="ods_dwd", name="ODS → DWD", layers=["DWD"]), - FlowDefinition(id="dwd_dws", name="DWD → DWS汇总", layers=["DWS"]), - FlowDefinition(id="dwd_dws_index", name="DWD → DWS汇总 → DWS指数", layers=["DWS", "INDEX"]), - FlowDefinition(id="dwd_index", name="DWD → DWS指数", layers=["INDEX"]), -] - -PROCESSING_MODE_DEFINITIONS: list[ProcessingModeDefinition] = [ - ProcessingModeDefinition(id="increment_only", name="仅增量处理", description="只处理新增和变更的数据"), - ProcessingModeDefinition(id="verify_only", name="仅校验修复", description="校验现有数据并修复不一致(可选'校验前从 API 获取')"), - ProcessingModeDefinition(id="increment_verify", name="增量 + 校验修复", description="先增量处理,再校验并修复"), -] - - -# ── 端点 ────────────────────────────────────────────────────── - -@router.get("/registry", response_model=TaskRegistryResponse) -async def get_task_registry( - user: CurrentUser = Depends(get_current_user), -) -> TaskRegistryResponse: - """返回按业务域分组的任务列表""" - grouped = get_tasks_grouped_by_domain() - return TaskRegistryResponse( - groups={ - domain: [ - TaskItem( - code=t.code, - name=t.name, - description=t.description, - domain=t.domain, - layer=t.layer, - requires_window=t.requires_window, - is_ods=t.is_ods, - is_dimension=t.is_dimension, - default_enabled=t.default_enabled, - is_common=t.is_common, - ) - for t in tasks - ] - for domain, tasks in grouped.items() - } - ) - - -@router.get("/dwd-tables", response_model=DwdTablesResponse) -async def get_dwd_tables( - user: CurrentUser = Depends(get_current_user), -) -> DwdTablesResponse: - """返回按业务域分组的 DWD 表定义""" - grouped = get_dwd_tables_grouped_by_domain() - return DwdTablesResponse( - groups={ - domain: [ - DwdTableItem( - table_name=t.table_name, - display_name=t.display_name, - domain=t.domain, - ods_source=t.ods_source, - is_dimension=t.is_dimension, - ) - for t in tables - ] - for domain, tables in grouped.items() - } - ) - - -@router.get("/flows", response_model=FlowsResponse) -async def get_flows( - user: CurrentUser = Depends(get_current_user), -) -> FlowsResponse: - """返回 7 种 Flow 定义和 3 种处理模式定义""" - return FlowsResponse( - flows=FLOW_DEFINITIONS, - processing_modes=PROCESSING_MODE_DEFINITIONS, - ) - - -@router.post("/validate", response_model=ValidateResponse) -async def validate_task_config( - body: ValidateRequest, - user: CurrentUser = Depends(get_current_user), -) -> ValidateResponse: - """验证 TaskConfig 并返回生成的 CLI 命令预览 - - 从 JWT 注入 store_id,前端无需传递。 - """ - config = body.config.model_copy(update={"store_id": user.site_id}) - errors: list[str] = [] - - # 验证 Flow ID - if config.pipeline not in FLOW_LAYER_MAP: - errors.append(f"无效的执行流程: {config.pipeline}") - - # 验证任务列表非空 - if not config.tasks: - errors.append("任务列表不能为空") - - if errors: - return ValidateResponse( - valid=False, - command="", - command_args=[], - errors=errors, - ) - - cmd_args = cli_builder.build_command(config, ETL_PROJECT_PATH) - cmd_str = cli_builder.build_command_string(config, ETL_PROJECT_PATH) - - return ValidateResponse( - valid=True, - command=cmd_str, - command_args=cmd_args, - errors=[], - ) +# -*- coding: utf-8 -*- +"""任务注册表 & 配置 API + +提供 4 个端点: +- GET /api/tasks/registry — 按业务域分组的任务列表 +- GET /api/tasks/dwd-tables — 按业务域分组的 DWD 表定义 +- GET /api/tasks/flows — 7 种 Flow + 3 种处理模式 +- POST /api/tasks/validate — 验证 TaskConfig 并返回 CLI 命令预览 + +所有端点需要 JWT 认证。validate 端点从 JWT 注入 store_id。 +""" + +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Depends +from pydantic import BaseModel + +from app.auth.dependencies import CurrentUser, get_current_user +from app.config import ETL_PROJECT_PATH +from app.schemas.tasks import ( + FlowDefinition, + ProcessingModeDefinition, + TaskConfigSchema, +) +from app.services.cli_builder import cli_builder +from app.services.task_registry import ( + DWD_TABLES, + FLOW_LAYER_MAP, + get_dwd_tables_grouped_by_domain, + get_tasks_grouped_by_domain, +) + +router = APIRouter(prefix="/api/tasks", tags=["任务配置"]) + + +# ── 响应模型 ────────────────────────────────────────────────── + +class TaskItem(BaseModel): + code: str + name: str + description: str + domain: str + layer: str + requires_window: bool + is_ods: bool + is_dimension: bool + default_enabled: bool + is_common: bool + + +class DwdTableItem(BaseModel): + table_name: str + display_name: str + domain: str + ods_source: str + is_dimension: bool + + +class TaskRegistryResponse(BaseModel): + """按业务域分组的任务列表""" + groups: dict[str, list[TaskItem]] + + +class DwdTablesResponse(BaseModel): + """按业务域分组的 DWD 表定义""" + groups: dict[str, list[DwdTableItem]] + + +class FlowsResponse(BaseModel): + """Flow 定义 + 处理模式定义""" + flows: list[FlowDefinition] + processing_modes: list[ProcessingModeDefinition] + + +class ValidateRequest(BaseModel): + """验证请求体 — 复用 TaskConfigSchema,但 store_id 由后端注入""" + config: TaskConfigSchema + + +class ValidateResponse(BaseModel): + """验证结果 + CLI 命令预览""" + valid: bool + command: str + command_args: list[str] + errors: list[str] + + +# ── Flow 定义(静态) ──────────────────────────────────────── + +FLOW_DEFINITIONS: list[FlowDefinition] = [ + FlowDefinition(id="api_ods", name="API → ODS", layers=["ODS"]), + FlowDefinition(id="api_ods_dwd", name="API → ODS → DWD", layers=["ODS", "DWD"]), + FlowDefinition(id="api_full", name="API → ODS → DWD → DWS汇总 → DWS指数", layers=["ODS", "DWD", "DWS", "INDEX"]), + FlowDefinition(id="ods_dwd", name="ODS → DWD", layers=["DWD"]), + FlowDefinition(id="dwd_dws", name="DWD → DWS汇总", layers=["DWS"]), + FlowDefinition(id="dwd_dws_index", name="DWD → DWS汇总 → DWS指数", layers=["DWS", "INDEX"]), + FlowDefinition(id="dwd_index", name="DWD → DWS指数", layers=["INDEX"]), +] + +PROCESSING_MODE_DEFINITIONS: list[ProcessingModeDefinition] = [ + ProcessingModeDefinition(id="increment_only", name="仅增量处理", description="只处理新增和变更的数据"), + ProcessingModeDefinition(id="verify_only", name="仅校验修复", description="校验现有数据并修复不一致"), + ProcessingModeDefinition(id="increment_verify", name="增量 + 校验修复", description="先增量处理,再校验并修复"), +] + + +# ── 端点 ────────────────────────────────────────────────────── + +@router.get("/registry", response_model=TaskRegistryResponse) +async def get_task_registry( + user: CurrentUser = Depends(get_current_user), +) -> TaskRegistryResponse: + """返回按业务域分组的任务列表""" + grouped = get_tasks_grouped_by_domain() + return TaskRegistryResponse( + groups={ + domain: [ + TaskItem( + code=t.code, + name=t.name, + description=t.description, + domain=t.domain, + layer=t.layer, + requires_window=t.requires_window, + is_ods=t.is_ods, + is_dimension=t.is_dimension, + default_enabled=t.default_enabled, + is_common=t.is_common, + ) + for t in tasks + ] + for domain, tasks in grouped.items() + } + ) + + +@router.get("/dwd-tables", response_model=DwdTablesResponse) +async def get_dwd_tables( + user: CurrentUser = Depends(get_current_user), +) -> DwdTablesResponse: + """返回按业务域分组的 DWD 表定义""" + grouped = get_dwd_tables_grouped_by_domain() + return DwdTablesResponse( + groups={ + domain: [ + DwdTableItem( + table_name=t.table_name, + display_name=t.display_name, + domain=t.domain, + ods_source=t.ods_source, + is_dimension=t.is_dimension, + ) + for t in tables + ] + for domain, tables in grouped.items() + } + ) + + +@router.get("/flows", response_model=FlowsResponse) +async def get_flows( + user: CurrentUser = Depends(get_current_user), +) -> FlowsResponse: + """返回 7 种 Flow 定义和 3 种处理模式定义""" + return FlowsResponse( + flows=FLOW_DEFINITIONS, + processing_modes=PROCESSING_MODE_DEFINITIONS, + ) + + +@router.post("/validate", response_model=ValidateResponse) +async def validate_task_config( + body: ValidateRequest, + user: CurrentUser = Depends(get_current_user), +) -> ValidateResponse: + """验证 TaskConfig 并返回生成的 CLI 命令预览 + + 从 JWT 注入 store_id,前端无需传递。 + """ + config = body.config.model_copy(update={"store_id": user.site_id}) + errors: list[str] = [] + + # 验证 Flow ID + if config.pipeline not in FLOW_LAYER_MAP: + errors.append(f"无效的执行流程: {config.pipeline}") + + # 验证任务列表非空 + if not config.tasks: + errors.append("任务列表不能为空") + + if errors: + return ValidateResponse( + valid=False, + command="", + command_args=[], + errors=errors, + ) + + cmd_args = cli_builder.build_command(config, ETL_PROJECT_PATH) + cmd_str = cli_builder.build_command_string(config, ETL_PROJECT_PATH) + + return ValidateResponse( + valid=True, + command=cmd_str, + command_args=cmd_args, + errors=[], + ) + + +# ── GET /api/tasks/sync-check — 对比 ETL 真实注册表 ────────── + +class SyncCheckResponse(BaseModel): + """同步检查结果""" + in_sync: bool + backend_only: list[str] + etl_only: list[str] + error: str | None = None + + +@router.get("/sync-check", response_model=SyncCheckResponse) +async def sync_check( + user: CurrentUser = Depends(get_current_user), +) -> SyncCheckResponse: + """对比后端硬编码任务列表与 ETL 真实注册表,返回差异。 + + 通过子进程调用 ETL CLI 获取真实任务列表,避免直接导入 ETL 代码。 + """ + import subprocess + import sys + + from app.services.task_registry import ALL_TASKS + + backend_codes = {t.code for t in ALL_TASKS} + + try: + result = subprocess.run( + [sys.executable, "-c", + "from orchestration.task_registry import default_registry; " + "print(','.join(sorted(default_registry.get_all_task_codes())))"], + capture_output=True, text=True, timeout=15, + cwd=ETL_PROJECT_PATH, encoding="utf-8", errors="replace", + ) + if result.returncode != 0: + return SyncCheckResponse( + in_sync=False, backend_only=[], etl_only=[], + error=f"ETL 子进程失败: {result.stderr.strip()[:200]}", + ) + etl_codes = {c.strip() for c in result.stdout.strip().split(",") if c.strip()} + except Exception as exc: + return SyncCheckResponse( + in_sync=False, backend_only=[], etl_only=[], + error=f"无法连接 ETL: {exc}", + ) + + backend_only = sorted(backend_codes - etl_codes) + etl_only = sorted(etl_codes - backend_codes) + + return SyncCheckResponse( + in_sync=len(backend_only) == 0 and len(etl_only) == 0, + backend_only=backend_only, + etl_only=etl_only, + ) diff --git a/apps/mcp-server/.gitignore b/apps/mcp-server/.gitignore new file mode 100644 index 0000000..a521f92 --- /dev/null +++ b/apps/mcp-server/.gitignore @@ -0,0 +1,4 @@ +.env +.env.local +__pycache__/ +*.pyc diff --git a/apps/mcp-server/README.md b/apps/mcp-server/README.md new file mode 100644 index 0000000..e105156 --- /dev/null +++ b/apps/mcp-server/README.md @@ -0,0 +1,58 @@ +# MCP Server + +为阿里云百炼 AI 应用提供 PostgreSQL 只读查询能力的 MCP 服务。 + +## 用途 + +小程序端调用百炼 AI 应用时,百炼通过 MCP 协议连接本服务,读取 `etl_feiqiu` 数据库中的运营数据(会员、订单、支付、助教业绩、财务汇总等)。 + +## 架构 + +``` +微信小程序 → 百炼 AI 应用 → MCP (Streamable HTTP) → 本服务 → PostgreSQL (etl_feiqiu) +``` + +## 暴露的 Tools + +| Tool | 说明 | +|------|------| +| `list_tables` | 列出指定 schema 下的表 | +| `describe_table` | 查看单表结构 | +| `describe_schemas` | 批量返回多个 schema 的表结构(含主键) | +| `query_sql` | 在指定 schema 内执行只读 SQL | + +## 可访问的 Schema + +`ods` / `dwd` / `dws` / `core` / `meta` / `app`(etl_feiqiu 六层架构) + +## 安全策略 + +- 仅允许 SELECT/WITH/SHOW/EXPLAIN,禁止 DDL/DML +- 正则 + sqlparse 双重校验 +- 跨 schema 引用白名单限制 +- 可选 Bearer Token 鉴权(`MCP_TOKEN` 环境变量) +- 生产环境建议使用只读数据库账号 + +## 配置 + +环境变量优先级:`MCP_PG_*` > `DB_*` / `ETL_DB_NAME` / `PG_NAME` > 项目根 `.env` + +| 变量 | 说明 | 默认值 | +|------|------|--------| +| `MCP_PG_HOST` | 数据库主机(优先) | 回退 `DB_HOST` | +| `MCP_PG_PORT` | 数据库端口(优先) | 回退 `DB_PORT`,默认 5432 | +| `MCP_PG_DATABASE` | 数据库名(优先) | 回退 `ETL_DB_NAME` → `PG_NAME` | +| `MCP_PG_USER` | 数据库用户(优先) | 回退 `DB_USER` | +| `MCP_PG_PASSWORD` | 数据库密码(优先) | 回退 `DB_PASSWORD` | +| `MCP_TOKEN` | 鉴权 token(空则不启用) | 空 | +| `MCP_MAX_ROWS` | query_sql 最大返回行数 | 500 | +| `PORT` | 服务监听端口 | 9000 | + +## 启动 + +```bash +cd apps/mcp-server +python server.py +``` + +百炼端 MCP 服务地址配置为:`https://mcp.langlangzhuoqiu.cn/mcp` diff --git a/apps/mcp-server/pyproject.toml b/apps/mcp-server/pyproject.toml new file mode 100644 index 0000000..04b4e13 --- /dev/null +++ b/apps/mcp-server/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "zqyy-mcp-server" +version = "0.1.0" +description = "MCP Server — 为阿里云百炼 AI 应用提供 PostgreSQL 只读查询能力" +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.9", + "psycopg[pool]>=3.1", + "python-dotenv>=1.0", + "sqlparse>=0.5", + "starlette>=0.27", + "uvicorn[standard]>=0.34", +] + +[dependency-groups] +dev = [ + "pytest>=8.0", +] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["."] diff --git a/apps/mcp-server/server.py b/apps/mcp-server/server.py new file mode 100644 index 0000000..1b7d4b8 --- /dev/null +++ b/apps/mcp-server/server.py @@ -0,0 +1,412 @@ +import os +import re +import contextlib +from collections import defaultdict +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import sqlparse +from dotenv import load_dotenv +from psycopg_pool import ConnectionPool + +from mcp.server.fastmcp import FastMCP +from mcp.server.transport_security import TransportSecuritySettings + +from starlette.applications import Starlette +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import Mount + +# 加载配置:.env.local > 同级 .env > 项目根 .env +_here = Path(__file__).resolve().parent +_root = _here.parent.parent # apps/mcp-server -> apps -> NeoZQYY +load_dotenv(_here / ".env.local", override=True) +load_dotenv(_here / ".env", override=False) +load_dotenv(_root / ".env", override=False) + + +# ---------------------------- +# 工具:环境变量解析(避免 int("") 报错) +# ---------------------------- +def env_str(name: str, default: str = "", required: bool = False) -> str: + v = os.getenv(name, default) + v = v if v is not None else default + v = v.strip() if isinstance(v, str) else v + if required and (v is None or v == ""): + raise RuntimeError(f"Missing required env var: {name}") + return v + + +def env_int(name: str, default: Optional[int] = None, required: bool = False) -> int: + raw = os.getenv(name, "") + if raw is None or raw.strip() == "": + if required and default is None: + raise RuntimeError(f"Missing required env var: {name}") + if default is None: + raise RuntimeError(f"Missing env var: {name}") + return default + try: + return int(raw.strip()) + except ValueError as e: + raise RuntimeError(f"Invalid int env var {name}={raw!r}") from e + + +# ---------------------------- +# 配置(用环境变量注入) +# MCP_PG_* 优先(独立部署),回退到项目公共 DB_* / PG_NAME +# ---------------------------- +PGHOST = env_str("MCP_PG_HOST", default="") or env_str("DB_HOST", required=True) +PGPORT = env_int("MCP_PG_PORT", default=0) or env_int("DB_PORT", default=5432) +PGDATABASE = env_str("MCP_PG_DATABASE", default="") or env_str("ETL_DB_NAME", default="") or env_str("PG_NAME", required=True) +PGUSER = env_str("MCP_PG_USER", default="") or env_str("DB_USER", required=True) +PGPASSWORD = env_str("MCP_PG_PASSWORD", default="") or env_str("DB_PASSWORD", required=True) + +MCP_TOKEN = env_str("MCP_TOKEN", default="") # 鉴权 token(可空:不启用鉴权) +MAX_ROWS = env_int("MCP_MAX_ROWS", default=500) # query_sql 默认最大行数 +PORT = env_int("PORT", default=9000) # uvicorn 端口 + +# etl_feiqiu 库的六层 schema 架构 +ALLOWED_SCHEMAS = ("ods", "dwd", "dws", "core", "meta", "app") +ALLOWED_SCHEMA_SET = set(ALLOWED_SCHEMAS) + +# psycopg DSN(如果密码包含空格等特殊字符,建议改用 URL 形式并做编码) +DSN = ( + f"host={PGHOST} port={PGPORT} dbname={PGDATABASE} " + f"user={PGUSER} password={PGPASSWORD}" +) + +# 连接池:不要 open=True(避免解释器退出时 __del__ 清理触发异常) +pool = ConnectionPool(conninfo=DSN, min_size=1, max_size=10, timeout=60, open=False) + + +# ---------------------------- +# SQL 只读门禁(最终底线仍是 DB 只读账号) +# ---------------------------- +FORBIDDEN = re.compile( + r"\b(insert|update|delete|drop|alter|truncate|create|grant|revoke|copy|call|execute|do)\b", + re.IGNORECASE, +) + +# 额外禁止显式跨 schema 访问(避免越权) +# 匹配 schema.table 模式,但排除单字母别名(如 t.id、o.amount) +SCHEMA_QUAL = re.compile(r"\b([a-zA-Z_][a-zA-Z0-9_]{1,})\s*\.\s*[a-zA-Z_]", re.IGNORECASE) + + +def _is_probably_readonly(sql: str) -> bool: + if FORBIDDEN.search(sql): + return False + parsed = sqlparse.parse(sql) + if not parsed: + return False + stmt = parsed[0] + for tok in stmt.tokens: + if tok.is_whitespace: + continue + first = str(tok).strip().lower() + return first in ("select", "with", "show", "explain") + return False + + +def _validate_schema(schema: str) -> Optional[Dict[str, Any]]: + if schema not in ALLOWED_SCHEMA_SET: + return {"error": f"schema 不允许:{schema}。仅允许:{sorted(ALLOWED_SCHEMA_SET)}"} + return None + + +def _reject_cross_schema(sql: str, allowed_schema: str) -> Optional[Dict[str, Any]]: + """ + 简单防护:如果出现显式 schema 前缀(xxx.),要求必须是白名单内的 schema 或系统 schema。 + 注:这不是 SQL parser 级别的严格策略,但能挡住绝大多数越权写法。 + """ + matches = set(m.group(1) for m in SCHEMA_QUAL.finditer(sql or "")) + # 允许所有业务 schema + 系统 schema + safe = ALLOWED_SCHEMA_SET | {"pg_catalog", "information_schema"} + bad = sorted([s for s in matches if s.lower() not in {a.lower() for a in safe}]) + if bad: + return {"error": f"SQL 被拒绝:检测到不允许的 schema 引用 {bad},仅允许 {sorted(ALLOWED_SCHEMA_SET)} / 系统 schema。"} + return None + + +# ---------------------------- +# FastMCP:Streamable HTTP + JSON 响应 +# ---------------------------- +mcp = FastMCP( + "postgres-mcp", + stateless_http=True, + json_response=True, + transport_security=TransportSecuritySettings( + enable_dns_rebinding_protection=True, + allowed_hosts=[ + # 关键:既允许不带端口,也允许带端口 + "mcp.langlangzhuoqiu.cn", + "mcp.langlangzhuoqiu.cn:*", + "localhost", + "localhost:*", + "127.0.0.1", + "127.0.0.1:*", + "100.64.0.4", + "100.64.0.4:*", + "100.64.0.1", + "100.64.0.1:*", + "106.52.16.235", + "106.52.16.235:*", + ], + allowed_origins=[ + "https://mcp.langlangzhuoqiu.cn", + "https://mcp.langlangzhuoqiu.cn:*", + "http://localhost", + "http://localhost:*", + "http://127.0.0.1", + "http://127.0.0.1:*", + ], + ), +) + + +# ---------------------------- +# Tools:面向 etl_feiqiu 六层 schema +# ---------------------------- +@mcp.tool() +def list_tables(schema: str = "dwd", include_views: bool = False) -> Dict[str, Any]: + """列出指定 schema(ods/dwd/dws/core/meta/app)下的表(可选包含视图)""" + err = _validate_schema(schema) + if err: + return err + + table_types = ("BASE TABLE", "VIEW") if include_views else ("BASE TABLE",) + sql = """ + SELECT table_name, table_type + FROM information_schema.tables + WHERE table_schema = %s AND table_type = ANY(%s) + ORDER BY table_name; + """ + with pool.connection() as conn: + with conn.cursor() as cur: + cur.execute(sql, (schema, list(table_types))) + rows = cur.fetchall() + + return { + "schema": schema, + "include_views": include_views, + "tables": [{"name": r[0], "type": r[1]} for r in rows], + "table_count": len(rows), + } + + +@mcp.tool() +def describe_table(table: str, schema: str = "dwd") -> Dict[str, Any]: + """查看表结构(字段、类型、是否可空、默认值)""" + err = _validate_schema(schema) + if err: + return err + + sql = """ + SELECT column_name, data_type, is_nullable, column_default, ordinal_position + FROM information_schema.columns + WHERE table_schema=%s AND table_name=%s + ORDER BY ordinal_position; + """ + with pool.connection() as conn: + with conn.cursor() as cur: + cur.execute(sql, (schema, table)) + rows = cur.fetchall() + + return { + "schema": schema, + "table": table, + "columns": [ + {"name": r[0], "type": r[1], "nullable": r[2], "default": r[3], "position": r[4]} + for r in rows + ], + "column_count": len(rows), + } + + +@mcp.tool() +def describe_schemas( + schemas: Optional[List[str]] = None, + include_views: bool = False, + max_tables_per_schema: int = 500, +) -> Dict[str, Any]: + """ + 返回 ods/dwd/dws/core/meta/app schema 下的表结构(含主键)。 + 不传 schemas 则返回全部六个 schema。 + """ + schemas = schemas or list(ALLOWED_SCHEMAS) + + invalid = [s for s in schemas if s not in ALLOWED_SCHEMA_SET] + if invalid: + return {"error": f"存在不允许的 schema:{invalid}。仅允许:{sorted(ALLOWED_SCHEMA_SET)}"} + + table_types = ("BASE TABLE", "VIEW") if include_views else ("BASE TABLE",) + + with pool.connection() as conn: + with conn.cursor() as cur: + # 1) 表清单 + cur.execute( + """ + SELECT table_schema, table_name, table_type + FROM information_schema.tables + WHERE table_schema = ANY(%s) + AND table_type = ANY(%s) + ORDER BY table_schema, table_name; + """, + (schemas, list(table_types)), + ) + table_rows = cur.fetchall() + + tables_by_schema: Dict[str, List[Tuple[str, str]]] = defaultdict(list) + for s, t, tt in table_rows: + if len(tables_by_schema[s]) < max_tables_per_schema: + tables_by_schema[s].append((t, tt)) + + # 2) 所有列(一次性取;如表非常多,可考虑拆分/分页) + cur.execute( + """ + SELECT table_schema, table_name, column_name, data_type, is_nullable, column_default, ordinal_position + FROM information_schema.columns + WHERE table_schema = ANY(%s) + ORDER BY table_schema, table_name, ordinal_position; + """, + (schemas,), + ) + col_rows = cur.fetchall() + + cols_map: Dict[Tuple[str, str], List[Dict[str, Any]]] = defaultdict(list) + for s, t, c, dt, nul, dft, pos in col_rows: + cols_map[(s, t)].append( + {"name": c, "type": dt, "nullable": nul, "default": dft, "position": pos} + ) + + # 3) 主键 + cur.execute( + """ + SELECT kcu.table_schema, kcu.table_name, kcu.column_name, kcu.ordinal_position + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + AND tc.table_name = kcu.table_name + WHERE tc.constraint_type = 'PRIMARY KEY' + AND tc.table_schema = ANY(%s) + ORDER BY kcu.table_schema, kcu.table_name, kcu.ordinal_position; + """, + (schemas,), + ) + pk_rows = cur.fetchall() + + pk_map: Dict[Tuple[str, str], List[str]] = defaultdict(list) + for s, t, col, _pos in pk_rows: + pk_map[(s, t)].append(col) + + # 4) 组装 + result: Dict[str, Any] = { + "schemas": {}, + "include_views": include_views, + "limits": {"max_tables_per_schema": max_tables_per_schema}, + } + + for s in schemas: + schema_tables = tables_by_schema.get(s, []) + result["schemas"][s] = {"table_count": len(schema_tables), "tables": {}} + for t, tt in schema_tables: + key = (s, t) + result["schemas"][s]["tables"][t] = { + "type": tt, + "primary_key": pk_map.get(key, []), + "columns": cols_map.get(key, []), + "column_count": len(cols_map.get(key, [])), + } + + return result + + +@mcp.tool() +def query_sql(schema: str, sql: str, max_rows: int = MAX_ROWS) -> Dict[str, Any]: + """ + 在指定 schema 内执行只读 SQL(会 SET LOCAL search_path),并限制显式跨 schema 引用。 + """ + err = _validate_schema(schema) + if err: + return err + + sql = (sql or "").strip().rstrip(";") + if not _is_probably_readonly(sql): + return {"error": "SQL 被拒绝:仅允许只读(select/with/show/explain)并禁止危险关键字。"} + + cross = _reject_cross_schema(sql, allowed_schema=schema) + if cross: + return cross + + with pool.connection() as conn: + with conn.cursor() as cur: + # schema 已白名单校验,可安全拼接 + cur.execute(f"SET LOCAL search_path TO {schema}") + cur.execute(sql) + + cols = [d.name for d in (cur.description or [])] + rows = cur.fetchmany(max_rows + 1) + + truncated = len(rows) > max_rows + rows = rows[:max_rows] + + safe_rows: List[List[Any]] = [] + for r in rows: + safe_rows.append([v if isinstance(v, (int, float, str, bool)) or v is None else str(v) for v in r]) + + return { + "schema": schema, + "columns": cols, + "rows": safe_rows, + "row_count": len(safe_rows), + "truncated": truncated, + "max_rows": max_rows, + } + + +# ---------------------------- +# 鉴权 Middleware(支持 Bearer 或 query token) +# ---------------------------- +class AuthMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + if MCP_TOKEN and request.url.path.startswith("/mcp"): + auth = request.headers.get("authorization", "") + token_q = request.query_params.get("token", "") + if auth != f"Bearer {MCP_TOKEN}" and token_q != MCP_TOKEN: + return JSONResponse({"error": "unauthorized"}, status_code=401) + return await call_next(request) + + +# ---------------------------- +# lifespan:显式 open/close pool,并运行 session_manager +# ---------------------------- +@contextlib.asynccontextmanager +async def lifespan(app: Starlette): + pool.open(wait=True, timeout=30) + try: + async with mcp.session_manager.run(): + yield + finally: + # 避免解释器退出阶段 __del__ 清理导致异常 + pool.close(timeout=5) + + +# MCP endpoint:/mcp(默认 streamable_http_path="/mcp") +app = Starlette( + routes=[Mount("/", app=mcp.streamable_http_app())], + lifespan=lifespan, +) +app.add_middleware(AuthMiddleware) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + app, + host="0.0.0.0", + port=PORT, + proxy_headers=True, + forwarded_allow_ips="*", + ) diff --git a/docs/deployment/LAUNCH-CHECKLIST.md b/docs/deployment/LAUNCH-CHECKLIST.md index 600d39e..37e6ae7 100644 --- a/docs/deployment/LAUNCH-CHECKLIST.md +++ b/docs/deployment/LAUNCH-CHECKLIST.md @@ -1,648 +1,686 @@ -# 微信小程序上线清单 - -> 最后更新:2026-02-19 -> 本文档合并自 ENV-MANAGEMENT.md、MINIPROGRAM-RELEASE.md、PRE-TEST-VERIFICATION.md 及补充建议。 -> 按优先级从高到低排列,同时兼顾依赖关系(后续步骤依赖前置步骤完成)。 -> 每项完成后在状态栏标注完成日期。 - ---- - -## 阅读指南 - -- 状态标记:待办 = 空框,已完成 = 日期 -- 优先级:P0 = 不做就上不了线,P1 = 上线前必须做,P2 = 可上线后迭代 -- 依赖关系用箭头标注,如 "依赖 1.1" 表示需要先完成第 1.1 项 - ---- - -## 第一阶段:基础设施(P0 - 一切的前提) - -所有后续步骤都建立在"三个环境能跑起来"的基础上。 - -### 1.1 服务器环境初始化 - -| 状态 | 项目 | -|------|------| -| | 在 Windows Server 上创建目录结构 | -| | 克隆仓库并切换分支 | -| | 配置环境变量文件 | -| | 安装 Python 依赖 | - -在 Windows Server 上执行: - -```powershell -# 创建目录 -New-Item -ItemType Directory -Path D:\NeoZQYY\test\repo -Force -New-Item -ItemType Directory -Path D:\NeoZQYY\test\logs -Force -New-Item -ItemType Directory -Path D:\NeoZQYY\prod\repo -Force -New-Item -ItemType Directory -Path D:\NeoZQYY\prod\logs -Force -New-Item -ItemType Directory -Path D:\NeoZQYY\scripts -Force -``` - -```powershell -# 克隆仓库 -cd D:\NeoZQYY\test -git clone <你的仓库地址> repo -cd repo -git checkout test - -cd D:\NeoZQYY\prod -git clone <你的仓库地址> repo -cd repo -git checkout master -``` - -环境变量文件(不从 Git 同步,手动创建): - -测试环境 `D:\NeoZQYY\test\repo\.env`: -```env -DB_HOST=100.64.0.4 -DB_PORT=5432 -DB_USER=local-Python -DB_PASSWORD=<密码> -APP_DB_NAME=test_zqyy_app -ETL_DB_NAME=test_etl_feiqiu -PG_NAME=test_etl_feiqiu -LOG_LEVEL=DEBUG -``` - -正式环境 `D:\NeoZQYY\prod\repo\.env`: -```env -DB_HOST=100.64.0.4 -DB_PORT=5432 -DB_USER=prod-Python -DB_PASSWORD=<正式密码> -APP_DB_NAME=zqyy_app -ETL_DB_NAME=etl_feiqiu -PG_NAME=etl_feiqiu -LOG_LEVEL=INFO -``` - -> 正式环境建议使用独立的数据库用户(如 `prod-Python`),权限最小化。 - -```powershell -# 安装依赖(每个环境各自执行) -cd D:\NeoZQYY\test\repo -uv sync --all-packages - -cd D:\NeoZQYY\prod\repo -uv sync --all-packages -``` - -环境总览: - -| 环境 | 位置 | Git 分支 | 数据库 | 用途 | -|------|------|----------|--------|------| -| 开发 | 本机 `C:\NeoZQYY` | `dev` | `test_etl_feiqiu` / `test_zqyy_app` | 日常开发 | -| 测试 | 服务器 `D:\NeoZQYY\test\repo` | `test` | `test_etl_feiqiu` / `test_zqyy_app` | 集成测试 + 小程序体验版 | -| 正式 | 服务器 `D:\NeoZQYY\prod\repo` | `master` | `etl_feiqiu` / `zqyy_app` | 生产环境 + 小程序正式版 | - - -### 1.2 后端服务管理 - bat 脚本(依赖 1.1) - -| 状态 | 项目 | -|------|------| -| | 将 bat 脚本放到服务器 `D:\NeoZQYY\scripts\` | -| | 登录服务器手动运行对应脚本启动服务 | - -> 后续将由监控系统(见 7.2)统一管理所有服务的启停和状态监控。 -> 在监控系统上线之前,登录 Windows Server 手动双击 bat 脚本启动。 - -端口分配: - -| 服务 | 测试环境 | 正式环境 | -|------|----------|----------| -| FastAPI 后端 | 8001 | 8000 | - -启动脚本 `D:\NeoZQYY\scripts\start-test-api.bat`: -```bat -@echo off -title NeoZQYY Test API (port 8001) -cd /d D:\NeoZQYY\test\repo\apps\backend -D:\NeoZQYY\test\repo\.venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8001 -pause -``` - -启动脚本 `D:\NeoZQYY\scripts\start-prod-api.bat`: -```bat -@echo off -title NeoZQYY Prod API (port 8000) -cd /d D:\NeoZQYY\prod\repo\apps\backend -D:\NeoZQYY\prod\repo\.venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8000 -pause -``` - -一键全部启动 `D:\NeoZQYY\scripts\start-all.bat`: -```bat -@echo off -echo 启动测试环境后端... -start "NeoZQYY Test API" cmd /c "D:\NeoZQYY\scripts\start-test-api.bat" -echo 启动正式环境后端... -start "NeoZQYY Prod API" cmd /c "D:\NeoZQYY\scripts\start-prod-api.bat" -echo 全部已启动。 -pause -``` - -> 每个 bat 会打开一个独立的 cmd 窗口,窗口标题显示服务名称,方便识别。 -> 关闭窗口即停止服务。服务器重启后需要重新手动运行。 - -### 1.3 跳板机 Nginx 反代(依赖 1.2) - -| 状态 | 项目 | -|------|------| -| 已完成 | 跳板机已配置好(用户确认) | -| 已完成 | Tailscale 内网已配置(DB_HOST=100.64.0.4) | -| | 确认 Nginx 将 `api.langlangzhuoqiu.cn` 反代到 Tailscale IP:8000(正式) | -| | 确认 Nginx 将测试环境反代到 Tailscale IP:8001(如需区分域名) | -| | 确认 SSL 证书有效且自动续期 | - -> 跳板机本身已配好,这里只需确认反代规则指向了正确的后端端口。 -> 如果测试和正式共用 `api.langlangzhuoqiu.cn`,则体验版和正式版会打到同一个后端。 -> 建议至少在初期区分:`test-api.langlangzhuoqiu.cn` 指向 8001,`api.langlangzhuoqiu.cn` 指向 8000。 - -### 1.4 数据库备份方案(依赖 1.1) - -| 状态 | 项目 | -|------|------| -| | 编写 pg_dump 备份脚本 | -| | 配置 Windows 计划任务定时执行 | -| | 执行一次恢复演练验证备份可用 | - -> 你的 Windows Server 是单点,正式库 `etl_feiqiu` 和 `zqyy_app` 丢了不可逆。 -> 建议每天凌晨自动 pg_dump,保留最近 7 天。备份文件可以同步到跳板机或其他位置做异地冗余。 - -示例备份脚本(放 `D:\NeoZQYY\scripts\backup-db.ps1`): - -```powershell -$date = Get-Date -Format "yyyy-MM-dd" -$backupDir = "D:\NeoZQYY\backups" -New-Item -ItemType Directory -Path $backupDir -Force - -# 正式 ETL 库 -pg_dump -h 100.64.0.4 -U prod-Python -d etl_feiqiu -F c -f "$backupDir\etl_feiqiu_$date.dump" - -# 正式业务库 -pg_dump -h 100.64.0.4 -U prod-Python -d zqyy_app -F c -f "$backupDir\zqyy_app_$date.dump" - -# 清理 7 天前的备份 -Get-ChildItem $backupDir -Filter "*.dump" | Where-Object { $_.LastWriteTime -lt (Get-Date).AddDays(-7) } | Remove-Item -``` - -用 Windows 计划任务每天凌晨 3:00 执行此脚本。 - ---- - -## 第二阶段:微信侧配置(P0 - 小程序能跑的前提) - -这些是微信平台的硬性要求,缺任何一项小程序都无法在真机上正常运行或通过审核。 - -### 2.1 合法域名 + HTTPS - -| 状态 | 项目 | -|------|------| -| 已完成 | request 合法域名:`https://api.langlangzhuoqiu.cn` | -| 已完成 | socket 合法域名:`wss://socket.langlangzhuoqiu.cn` | -| 已完成 | uploadFile 合法域名:`https://file.langlangzhuoqiu.cn` | -| 已完成 | downloadFile 合法域名:`https://file.langlangzhuoqiu.cn` | - -> 已在微信公众平台后台配置完成。 - -### 2.2 消息推送配置(依赖 1.2 + 1.3) - -| 状态 | 项目 | -|------|------| -| 已完成 | 后端接口 `GET/POST /api/wx/callback` 已实现(`wx_callback.py`) | -| | 在 `apps/backend/.env.local` 中配置 `WX_CALLBACK_TOKEN` | -| | 服务器上部署最新代码并重启后端 | -| | 微信后台填写消息推送配置并提交验证 | - -> 消息推送配置必须在服务器后端已启动、跳板机反代已就绪之后才能操作。 -> 微信会向你的 URL 发 GET 请求验签,后端必须在线才能通过。 - -微信后台配置(开发 - 开发管理 - 消息推送): - -| 字段 | 值 | -|------|------| -| URL | `https://api.langlangzhuoqiu.cn/api/wx/callback` | -| Token | `LLZQwx2026push`(和 .env.local 里一致,可自定义) | -| EncodingAESKey | 点"随机生成" | -| 消息加解密方式 | 先选"明文模式"(跑通后再切安全模式) | -| 数据格式 | JSON | - -点"提交"后微信发 GET 验证。如果失败,最常见原因: -- 服务器后端未启动 -- Nginx 反代未指向正确端口 -- Token 两边不一致 - -### 2.3 隐私协议 / 用户隐私保护指引 - -| 状态 | 项目 | -|------|------| -| | 在微信后台填写用户隐私保护指引 | - -操作路径:微信后台 - 设置 - 基本设置 - 服务内容声明 - 用户隐私保护指引。 - -需要声明你收集了哪些用户信息: -- 微信昵称、头像(如果用到 `wx.getUserProfile`) -- 用户标识(openid,`wx.login` 必然涉及) -- 设备信息(如果用到 `wx.getSystemInfo`) - -> 2023 年 9 月起微信强制要求。不填写的话,调用 `wx.login` 等隐私相关 API 会直接报错。 -> 即使你当前只用了 `wx.login`,也需要声明"用户标识"这一项。 - -### 2.4 小程序基本信息 - -| 状态 | 项目 | -|------|------| -| 已完成 | AppID 已配置:`wx7c07793d82732921` | -| | 确认小程序名称、图标、简介已填写完整 | -| | 确认小程序类目已选择(建议"工具 - 企业管理"或"生活服务") | - -> 审核时会检查这些基本信息。类目选择需注意:部分类目需要上传营业执照等资质文件,提前确认。 - -### 2.5 体验成员配置 - -| 状态 | 项目 | -|------|------| -| | 在微信后台添加体验成员(成员管理 - 添加体验成员) | - -> 体验版只有体验成员能扫码访问,最多 100 人。内部测试阶段必须配置。 - - ---- - -## 第三阶段:后端核心功能(P0 - 上线的硬前提) - -没有这些功能,小程序即使能打开也无法提供真实服务。 - -### 3.1 微信登录接口(依赖 2.2) - -| 状态 | 项目 | -|------|------| -| | 后端实现 `POST /api/auth/wechat_login` | -| | 在 `.env.local` 中配置 `WX_APP_ID` 和 `WX_APP_SECRET` | -| | 小程序端 `app.ts` 改造:`wx.login` 拿 code 后调后端换 JWT | -| | 后续请求统一带 Authorization header | - -当前状态: -- 后端 JWT 框架已有(`app/auth/jwt.py`,签发/验证/刷新) -- 管理后台登录已实现(`POST /api/auth/login`) -- 小程序端 `app.ts` 里 `wx.login` 只是 `console.log(res.code)`,未发给后端 - -需要实现的完整链路: -``` -小程序 wx.login() 拿到 code - --> POST /api/auth/wechat_login { code: "xxx" } - --> 后端调微信 jscode2session 换 openid + session_key - --> 查 users 表,不存在则创建 - --> 签发 JWT(access_token + refresh_token) - --> 小程序存储 token,后续请求带上 -``` - -> `WX_APP_SECRET` 从微信后台获取(开发 - 开发管理 - 开发设置 - AppSecret)。 -> 这个值绝对不能出现在前端代码或 Git 仓库里,只放在服务器的 `.env.local` 中。 - -### 3.2 权限中间件(依赖 3.1) - -| 状态 | 项目 | -|------|------| -| | 实现权限中间件(基于 JWT 中的 site_id + role 校验) | -| | 需要鉴权的路由挂上 `Depends(get_current_user)` | - -当前状态: -- RBAC 的 DDL 已定义(roles / permissions / user_roles / role_permissions 表) -- `app/auth/dependencies.py` 有 `get_current_user` 但未实际使用 -- `app/middleware/__init__.py` 为空,无鉴权逻辑 -- 后端 API 目前全部裸奔,任何人可调用任何接口 - -> 这是安全底线。涉及多门店、多角色、财务数据的场景,没有权限校验等于公开数据库。 - -### 3.3 小程序业务页面(依赖 3.1) - -| 状态 | 项目 | -|------|------| -| 已完成 | MVP 全链路验证页面(`pages/mvp/mvp`,显示 "t91") | -| | 至少一个有实际功能的首页(审核要求) | -| | 清理调试代码(`console.log` 等) | - -当前状态: -- 只有 mvp 测试页、默认 index 和 logs 页 -- `app.ts` 里有 `console.log(res.code)` 调试输出 -- 提交审核时需要有实际功能的页面,否则大概率被拒 - -> 小程序的 API 地址已通过 `utils/config.ts` 实现环境自动切换: -> develop(开发版)指向本机,trial(体验版)和 release(正式版)指向线上域名。 - -### 3.4 密钥配置到后端 - -| 状态 | 项目 | -|------|------| -| 已完成 | `WX_CALLBACK_TOKEN` 已写入 `.env.local` | -| | `WX_APP_ID` 写入服务器 `.env.local` | -| | `WX_APP_SECRET` 写入服务器 `.env.local` | -| | 确认 `JWT_SECRET_KEY` 已配置(不要用默认值) | - -> `.env.template` 中已有这些配置项的模板,复制到 `.env.local` 后填入实际值。 -> 正式环境的 `JWT_SECRET_KEY` 必须是一个随机强密码,不能和测试环境相同。 - ---- - -## 第四阶段:部署与发布流程(P0 - 让代码流动起来) - -### 4.1 日常部署流程 - -开发到测试: - -``` -1. 本机 dev 分支开发完成,push 到远程 -2. 本机合并 dev 到 test:git checkout test && git merge dev && git push -3. 服务器测试环境拉取:cd D:\NeoZQYY\test\repo && git pull -4. 如有依赖变更:uv sync --all-packages -5. 关闭测试环境后端窗口,重新运行 start-test-api.bat -6. 微信开发者工具上传体验版(指向测试环境 API) -``` - -测试到正式: - -``` -1. 测试通过后,合并 test 到 master:git checkout master && git merge test && git push -2. 服务器正式环境拉取:cd D:\NeoZQYY\prod\repo && git pull -3. 如有依赖变更:uv sync --all-packages -4. 关闭正式环境后端窗口,重新运行 start-prod-api.bat -5. 微信小程序提交审核 --> 发布 -``` - -### 4.2 小程序版本与发布 - -版本类型: - -| 版本 | 对应环境 | 谁能访问 | 用途 | -|------|----------|----------|------| -| 开发版 | 本机 dev | 开发者自己(微信开发者工具) | 日常开发调试 | -| 体验版 | 服务器 test | 体验成员(最多 100 人) | 内部测试 | -| 正式版 | 服务器 prod | 所有用户 | 线上运营 | - -完整发布流水线: - -``` -本机开发(开发版) - | 微信开发者工具"上传" - v -体验版(指向测试环境 API) - | 测试通过 - v -微信后台"提交审核" - | 审核通过(通常 1-3 个工作日,快的几小时) - v -微信后台"发布" - v -正式版(指向正式环境 API) -``` - -开发版操作: -1. 微信开发者工具打开 `apps/miniprogram/` -2. 勾选"详情 - 本地设置 - 不校验合法域名"(仅开发阶段) -3. 编译预览,确认功能正常 - -上传体验版: -1. 微信开发者工具点右上角"上传" -2. 填写版本号(如 `0.1.0`)和项目备注 -3. 微信后台 - 开发管理 - 开发版本 - 点"选为体验版" -4. 体验版会生成二维码,扫码即可体验 - -提交审核: -1. 微信后台 - 开发管理 - 开发版本 - 点"提交审核" -2. 填写功能页面截图、类目、功能介绍 -3. 如有登录功能,提供测试账号密码 -4. 等待审核(预留至少一周缓冲期) - -发布上线: -1. 审核通过后,微信后台 - 开发管理 - 审核版本 - 点"发布" -2. 可选全量发布或灰度发布(建议先 10% 观察) - -版本号规范(语义化): -- `0.1.0` -- MVP 验证 -- `0.2.0` -- 新增登录功能 -- `0.2.1` -- 修复 bug -- `1.0.0` -- 正式上线第一版 - -### 4.3 紧急回滚 - -后端回滚: -```powershell -cd D:\NeoZQYY\prod\repo -git log --oneline -5 # 查看最近提交 -git reset --hard # 回退到指定版本 -# 关闭正式环境后端窗口,重新运行 start-prod-api.bat -``` - -小程序回滚: -1. 微信后台 - 开发管理 - 线上版本 - 点"版本回退" -2. 回退即时生效,用户下次打开加载旧版本 -3. 注意:只能回退一个版本,不要连续发布多个有问题的版本 - -### 4.4 发布检查清单 - -每次发布前确认: - -- [ ] API 地址指向正确环境 -- [ ] 服务器后端已部署最新代码并重启 -- [ ] 数据库迁移已执行(如有) -- [ ] 关键功能手动测试通过 -- [ ] 无 console.log 调试信息残留 -- [ ] 版本号已更新 -- [ ] 备注中写明本次变更内容 - - ---- - -## 第五阶段:安全与审计(P1 - 上线前必须做) - -这些不会阻止小程序"能用",但不做会导致可预期的安全事故或管理问题。 - -### 5.1 用户申请/审核流 - -| 状态 | 项目 | -|------|------| -| | 创建 `user_application` 表(DDL + 迁移脚本) | -| | 实现审核 API:`POST /api/applications`、`POST /api/applications/{id}/approve` | -| | 状态机:pending / approved / rejected / disabled | - -> `users` 表 DDL 已有(`db/zqyy_app/schemas/init.sql`),但缺少申请表和审核接口。 - -### 5.2 审计日志 - -| 状态 | 项目 | -|------|------| -| | 创建 `audit_log` 表 | -| | 实现审计中间件(关键写操作自动记录) | - -> 涉及工资/财务场景,"谁在何时改了什么"必须可追溯。 -> 至少记录:审批操作、xlsx 导入、口径裁决、工资重算。 - -### 5.3 后端结构化日志 - -| 状态 | 项目 | -|------|------| -| | 后端接入结构化日志(替代 uvicorn 默认日志) | - -当前状态: -- ETL 有完整日志体系(`logging_utils.py`) -- 后端仅 uvicorn 默认日志,排查问题困难 - -### 5.4 安全加固 - -| 状态 | 项目 | -|------|------| -| | 确认服务器防火墙只允许 Tailscale 网卡入站到 API 端口 | -| | 确认 PostgreSQL 只监听内网/本机,不对公网开放 | -| | 确认 pg_hba.conf 配置合理 | -| | 消息推送从明文模式切换到安全模式(AES 加解密) | - ---- - -## 第六阶段:审核准备(P1 - 提交审核前) - -### 6.1 审核材料准备 - -| 状态 | 项目 | -|------|------| -| | 准备主要页面功能截图 | -| | 准备测试账号(如有登录功能) | -| | 确认类目资质文件(营业执照等,视类目要求) | -| | 撰写功能介绍文案 | - -> 首次审核通常 1-3 个工作日,快的几小时。被拒后修改重新提交。 -> 建议预留至少一周的审核缓冲期。 - ---- - -## 第七阶段:可上线后迭代(P2) - -这些可以在小程序上线后逐步补齐。 - -### 7.1 xlsx 导入/导出 - -| 状态 | 项目 | -|------|------| -| | 后端实现上传、解析、校验、落库、错误报告接口 | - -> ETL 层已有 openpyxl 依赖,但后端无上传接口。 - -### 7.2 运维监控系统(将取代 bat 脚本手动管理) - -| 状态 | 项目 | -|------|------| -| | 需求与 spec 编写 | -| | 后端采集器 + 监控 API | -| | 前端监控面板(管理后台新页面) | -| | 告警推送(企业微信机器人 webhook) | -| | 上线后取代 bat 脚本,接管服务启停 | - -规划方案(BS 架构,集成到现有管理后台): - -技术选型: -- 后端:FastAPI 定时任务采集指标,写入 PostgreSQL `monitor` schema -- 前端:`apps/admin-web/` 新增 Monitor 页面,Ant Design Charts 可视化 -- 告警:企业微信机器人 webhook 推送 - -监控维度: - -| 维度 | 数据来源 | 复杂度 | -|------|----------|--------| -| 服务器载荷(CPU/内存/磁盘) | `psutil` 采集 | 低 | -| 服务状态(后端进程存活) | `/health` 探活 | 低 | -| 网络上下行 | `psutil.net_io_counters` | 低 | -| 数据库状态(连接数/慢查询/表大小) | `pg_stat_*` 系统视图 | 中 | -| ETL 连接器字段漂移检测 | ETL `meta` 层运行元数据 | 中 | - -代码落点: -``` -apps/backend/app/ - routers/monitor.py # 监控数据 API - services/monitor_collector.py # 采集器(定时任务) -apps/admin-web/src/pages/ - Monitor.tsx # 监控面板页面 -db/zqyy_app/migrations/ - YYYYMMDD_create_monitor_tables.sql -``` - -功能目标: -- 数据可视化:实时仪表盘 + 历史趋势图 -- 告警分级:按信息紧急程度分 INFO / WARNING / CRITICAL -- 服务管理:上线后可通过面板启停服务,取代手动 bat 脚本 - -> 优先级 P2,建议在小程序上线稳定后再启动开发。 -> 到时候用一个 spec 来拆解需求和实现计划。 - -### 7.3 租户模型 - -| 状态 | 项目 | -|------|------| -| | tenant 层实现(当前单租户场景可暂缓) | -| | RLS Policy DDL 落库(当前 ETL 只读连接已设置 `app.current_site_id`) | - -### 7.4 ETL SDK 抽象 - -| 状态 | 项目 | -|------|------| -| | 将飞球 Connector 抽象为通用连接器基类 | - -> 当前飞球 Connector 结构清晰,但未抽象为通用 SDK。多球房扩展时需要。 - -### 7.5 自动化测试 - -| 状态 | 项目 | -|------|------| -| | 后端 API 集成测试 | -| | 小程序端自动化测试 | - ---- - -## 已完成项摘要 - -| 完成日期 | 项目 | 说明 | -|----------|------|------| -| 2026-02-19 | MVP 全链路验证 | 小程序 - FastAPI - PostgreSQL 通路已跑通,`GET /api/xcx-test` 返回 `{"ti": "t91"}` | -| 2026-02-19 | 后端 MVP 页面 | `pages/mvp/mvp` 已创建并注册为首页,从 API 读取数据并显示 | -| 2026-02-19 | 环境管理方案 | 服务器目录结构、Git 流程、bat 脚本服务管理方案已输出 | -| 2026-02-19 | PRE-TEST 验证报告 | 逐项对照 8 大板块标注完成度,已合并到本文档 | -| 2026-02-19 | 合法域名 + HTTPS | 微信后台已配置 request/socket/upload/download 合法域名 | -| 2026-02-19 | 消息推送后端接口 | `GET/POST /api/wx/callback` 已实现(`wx_callback.py`),支持验签和消息接收 | -| 2026-02-19 | 小程序环境配置 | `utils/config.ts` 已创建,根据运行环境自动切换 API 地址 | -| 2026-02-19 | 版本与发布流程文档 | 已合并到本文档第四阶段 | -| 已有 | JWT 认证框架 | `app/auth/jwt.py` 已实现签发/验证/刷新 | -| 已有 | 管理后台登录 | `POST /api/auth/login` 已实现 | -| 已有 | 健康检查 | `GET /health` 已实现 | -| 已有 | CORS 配置 | 已配置,支持环境变量覆盖 | -| 已有 | OpenAPI 文档 | `/docs`(Swagger)和 `/redoc` 已启用 | -| 已有 | 数据分层架构 | ODS/DWD/DWS/Core/Meta/App 六层 schema 已建立 | -| 已有 | Monorepo + Steering | 单仓库统一管理,Steering 规则已配置 | -| 已有 | Git 三分支 | dev/test/master 已建立 | -| 已有 | RBAC DDL | roles/permissions/user_roles/role_permissions 表已定义 | -| 已有 | site_id 贯穿 | 所有业务表均有 site_id 字段 | -| 已有 | 跳板机 + Tailscale | 内外网分层已配置 | - ---- - -## 数据库环境隔离速查 - -| 数据库 | 用途 | 备注 | -|--------|------|------| -| `test_etl_feiqiu` | 开发 + 测试 ETL 数据 | 可随时重建 | -| `test_zqyy_app` | 开发 + 测试业务数据 | 可随时重建 | -| `etl_feiqiu` | 正式 ETL 数据 | 需备份 | -| `zqyy_app` | 正式业务数据 | 需备份 | - ---- - -## 风险提醒 - -| 风险 | 级别 | 当前缓解 | 是否充分 | -|------|------|----------|----------| -| 数据越权/串账 | P0 | site_id 已贯穿,ETL 只读连接有 RLS | 后端 API 无权限中间件,需尽快补齐 | -| 财务口径不可追溯 | P0 | ETL 有 meta 层记录运行元数据 | 后端无 audit_log,需补齐 | -| 单点故障 | P0 | 无 | 无备份/恢复方案,需尽快建立 | -| 密钥泄露 | P1 | .env 已 gitignore | AppSecret 需确认只在服务端 | -| ETL 扩展维护 | P1 | 飞球 Connector 结构清晰 | 未抽象为通用 SDK | -| 联调回归成本 | P2 | Steering + Hooks 已配置 | 缺自动化集成测试 | +# 微信小程序上线清单 + +> 最后更新:2026-02-19 +> 本文档合并自 ENV-MANAGEMENT.md、MINIPROGRAM-RELEASE.md、PRE-TEST-VERIFICATION.md 及补充建议。 +> 按优先级从高到低排列,同时兼顾依赖关系(后续步骤依赖前置步骤完成)。 +> 每项完成后在状态栏标注完成日期。 + +--- + +## 阅读指南 + +- 状态标记:待办 = 空框,已完成 = 日期 +- 优先级:P0 = 不做就上不了线,P1 = 上线前必须做,P2 = 可上线后迭代 +- 依赖关系用箭头标注,如 "依赖 1.1" 表示需要先完成第 1.1 项 + +--- + +## 第一阶段:基础设施(P0 - 一切的前提) + +所有后续步骤都建立在"三个环境能跑起来"的基础上。 + +### 1.1 服务器环境初始化 + +| 状态 | 项目 | +|------|------| +| | 在 Windows Server 上创建目录结构 | +| | 克隆仓库并切换分支 | +| | 配置环境变量文件 | +| | 安装 Python 依赖 | +| | 运行 `setup-server-git.py` 配置 Git 排除规则 | + +在 Windows Server 上执行: + +```powershell +# 创建目录 +New-Item -ItemType Directory -Path D:\NeoZQYY\test\repo -Force +New-Item -ItemType Directory -Path D:\NeoZQYY\test\logs -Force +New-Item -ItemType Directory -Path D:\NeoZQYY\prod\repo -Force +New-Item -ItemType Directory -Path D:\NeoZQYY\prod\logs -Force +New-Item -ItemType Directory -Path D:\NeoZQYY\scripts -Force +``` + +```powershell +# 克隆仓库 +cd D:\NeoZQYY\test +git clone <你的仓库地址> repo +cd repo +git checkout test + +cd D:\NeoZQYY\prod +git clone <你的仓库地址> repo +cd repo +git checkout master +``` + +环境变量文件(不从 Git 同步,手动创建): + +测试环境 `D:\NeoZQYY\test\repo\.env`: +```env +DB_HOST=100.64.0.4 +DB_PORT=5432 +DB_USER=local-Python +DB_PASSWORD=<密码> +APP_DB_NAME=test_zqyy_app +ETL_DB_NAME=test_etl_feiqiu +PG_NAME=test_etl_feiqiu +LOG_LEVEL=DEBUG +``` + +正式环境 `D:\NeoZQYY\prod\repo\.env`: +```env +DB_HOST=100.64.0.4 +DB_PORT=5432 +DB_USER=prod-Python +DB_PASSWORD=<正式密码> +APP_DB_NAME=zqyy_app +ETL_DB_NAME=etl_feiqiu +PG_NAME=etl_feiqiu +LOG_LEVEL=INFO +``` + +> 正式环境建议使用独立的数据库用户(如 `prod-Python`),权限最小化。 + +```powershell +# 安装依赖(每个环境各自执行) +cd D:\NeoZQYY\test\repo +uv sync --all-packages + +cd D:\NeoZQYY\prod\repo +uv sync --all-packages +``` + +```powershell +# 配置服务器 Git 排除规则(每个环境各执行一次) +# 跳过 export/、.env 等开发机留存文件,避免占用服务器磁盘 +cd D:\NeoZQYY\test\repo +python scripts/server/setup-server-git.py + +cd D:\NeoZQYY\prod\repo +python scripts/server/setup-server-git.py +``` + +Git 排除方案说明(统一 .gitignore + skip-worktree): + +三个分支(dev / test / master)共用同一份 `.gitignore`(宽松版,允许 `.env`、`export/` 等留存文件提交)。 +服务器上通过 `setup-server-git.py` 一次性配置,不需要每个分支维护不同的 `.gitignore`。 + +工作原理: +1. 脚本将 `scripts/server/server-exclude.txt` 复制到 `.git/info/exclude`(本地排除,不影响仓库) +2. 对已 track 但服务器不需要的文件设置 `git update-index --skip-worktree` +3. 后续 `git pull` 不会还原这些文件,可安全删除释放磁盘空间 + +被排除的内容(完整列表见 `scripts/server/server-exclude.txt`): +- `.env` / `.env.local` -- 服务器有自己的环境配置 +- `export/` -- ETL 导出数据(仅开发机留存) +- `docs/` -- 全部文档(部署、PRD、H5 原型、审计、架构等) +- `apps/miniprogram/` -- 小程序源码(服务器不编译小程序) +- `apps/admin-web/src/` -- 管理后台源码(保留 dist/) +- `tests/`、`.hypothesis/` -- 测试相关 +- `samples/`、`infra/` -- 示例数据和基础设施文档 +- `scripts/ops/`、`scripts/audit/`、`scripts/migrate/` -- 开发用脚本 +- `.kiro/` -- Kiro 配置 +- 根目录截图(`*.png`)、`.code-workspace` 等 + +优点: +- merge 零冲突(三个分支 `.gitignore` 完全一致) +- 服务器首次 clone 后运行一次脚本即可,后续 `git pull` 正常工作 +- 开发机的留存文件正常提交到 Git,不受影响 + +环境总览: + +| 环境 | 位置 | Git 分支 | 数据库 | 用途 | +|------|------|----------|--------|------| +| 开发 | 本机 `C:\NeoZQYY` | `dev` | `test_etl_feiqiu` / `test_zqyy_app` | 日常开发 | +| 测试 | 服务器 `D:\NeoZQYY\test\repo` | `test` | `test_etl_feiqiu` / `test_zqyy_app` | 集成测试 + 小程序体验版 | +| 正式 | 服务器 `D:\NeoZQYY\prod\repo` | `master` | `etl_feiqiu` / `zqyy_app` | 生产环境 + 小程序正式版 | + + +### 1.2 后端服务管理 - bat 脚本(依赖 1.1) + +| 状态 | 项目 | +|------|------| +| | 将 bat 脚本放到服务器 `D:\NeoZQYY\scripts\` | +| | 登录服务器手动运行对应脚本启动服务 | + +> 后续将由监控系统(见 7.2)统一管理所有服务的启停和状态监控。 +> 在监控系统上线之前,登录 Windows Server 手动双击 bat 脚本启动。 + +端口分配: + +| 服务 | 测试环境 | 正式环境 | +|------|----------|----------| +| FastAPI 后端 | 8001 | 8000 | + +启动脚本 `D:\NeoZQYY\scripts\start-test-api.bat`: +```bat +@echo off +title NeoZQYY Test API (port 8001) +cd /d D:\NeoZQYY\test\repo\apps\backend +D:\NeoZQYY\test\repo\.venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8001 +pause +``` + +启动脚本 `D:\NeoZQYY\scripts\start-prod-api.bat`: +```bat +@echo off +title NeoZQYY Prod API (port 8000) +cd /d D:\NeoZQYY\prod\repo\apps\backend +D:\NeoZQYY\prod\repo\.venv\Scripts\uvicorn.exe app.main:app --host 0.0.0.0 --port 8000 +pause +``` + +一键全部启动 `D:\NeoZQYY\scripts\start-all.bat`: +```bat +@echo off +echo 启动测试环境后端... +start "NeoZQYY Test API" cmd /c "D:\NeoZQYY\scripts\start-test-api.bat" +echo 启动正式环境后端... +start "NeoZQYY Prod API" cmd /c "D:\NeoZQYY\scripts\start-prod-api.bat" +echo 全部已启动。 +pause +``` + +> 每个 bat 会打开一个独立的 cmd 窗口,窗口标题显示服务名称,方便识别。 +> 关闭窗口即停止服务。服务器重启后需要重新手动运行。 + +### 1.3 跳板机 Nginx 反代(依赖 1.2) + +| 状态 | 项目 | +|------|------| +| 已完成 | 跳板机已配置好(用户确认) | +| 已完成 | Tailscale 内网已配置(DB_HOST=100.64.0.4) | +| | 确认 Nginx 将 `api.langlangzhuoqiu.cn` 反代到 Tailscale IP:8000(正式) | +| | 确认 Nginx 将测试环境反代到 Tailscale IP:8001(如需区分域名) | +| | 确认 SSL 证书有效且自动续期 | + +> 跳板机本身已配好,这里只需确认反代规则指向了正确的后端端口。 +> 如果测试和正式共用 `api.langlangzhuoqiu.cn`,则体验版和正式版会打到同一个后端。 +> 建议至少在初期区分:`test-api.langlangzhuoqiu.cn` 指向 8001,`api.langlangzhuoqiu.cn` 指向 8000。 + +### 1.4 数据库备份方案(依赖 1.1) + +| 状态 | 项目 | +|------|------| +| | 编写 pg_dump 备份脚本 | +| | 配置 Windows 计划任务定时执行 | +| | 执行一次恢复演练验证备份可用 | + +> 你的 Windows Server 是单点,正式库 `etl_feiqiu` 和 `zqyy_app` 丢了不可逆。 +> 建议每天凌晨自动 pg_dump,保留最近 7 天。备份文件可以同步到跳板机或其他位置做异地冗余。 + +示例备份脚本(放 `D:\NeoZQYY\scripts\backup-db.ps1`): + +```powershell +$date = Get-Date -Format "yyyy-MM-dd" +$backupDir = "D:\NeoZQYY\backups" +New-Item -ItemType Directory -Path $backupDir -Force + +# 正式 ETL 库 +pg_dump -h 100.64.0.4 -U prod-Python -d etl_feiqiu -F c -f "$backupDir\etl_feiqiu_$date.dump" + +# 正式业务库 +pg_dump -h 100.64.0.4 -U prod-Python -d zqyy_app -F c -f "$backupDir\zqyy_app_$date.dump" + +# 清理 7 天前的备份 +Get-ChildItem $backupDir -Filter "*.dump" | Where-Object { $_.LastWriteTime -lt (Get-Date).AddDays(-7) } | Remove-Item +``` + +用 Windows 计划任务每天凌晨 3:00 执行此脚本。 + +--- + +## 第二阶段:微信侧配置(P0 - 小程序能跑的前提) + +这些是微信平台的硬性要求,缺任何一项小程序都无法在真机上正常运行或通过审核。 + +### 2.1 合法域名 + HTTPS + +| 状态 | 项目 | +|------|------| +| 已完成 | request 合法域名:`https://api.langlangzhuoqiu.cn` | +| 已完成 | socket 合法域名:`wss://socket.langlangzhuoqiu.cn` | +| 已完成 | uploadFile 合法域名:`https://file.langlangzhuoqiu.cn` | +| 已完成 | downloadFile 合法域名:`https://file.langlangzhuoqiu.cn` | + +> 已在微信公众平台后台配置完成。 + +### 2.2 消息推送配置(依赖 1.2 + 1.3) + +| 状态 | 项目 | +|------|------| +| 已完成 | 后端接口 `GET/POST /api/wx/callback` 已实现(`wx_callback.py`) | +| | 在 `apps/backend/.env.local` 中配置 `WX_CALLBACK_TOKEN` | +| | 服务器上部署最新代码并重启后端 | +| | 微信后台填写消息推送配置并提交验证 | + +> 消息推送配置必须在服务器后端已启动、跳板机反代已就绪之后才能操作。 +> 微信会向你的 URL 发 GET 请求验签,后端必须在线才能通过。 + +微信后台配置(开发 - 开发管理 - 消息推送): + +| 字段 | 值 | +|------|------| +| URL | `https://api.langlangzhuoqiu.cn/api/wx/callback` | +| Token | `LLZQwx2026push`(和 .env.local 里一致,可自定义) | +| EncodingAESKey | 点"随机生成" | +| 消息加解密方式 | 先选"明文模式"(跑通后再切安全模式) | +| 数据格式 | JSON | + +点"提交"后微信发 GET 验证。如果失败,最常见原因: +- 服务器后端未启动 +- Nginx 反代未指向正确端口 +- Token 两边不一致 + +### 2.3 隐私协议 / 用户隐私保护指引 + +| 状态 | 项目 | +|------|------| +| | 在微信后台填写用户隐私保护指引 | + +操作路径:微信后台 - 设置 - 基本设置 - 服务内容声明 - 用户隐私保护指引。 + +需要声明你收集了哪些用户信息: +- 微信昵称、头像(如果用到 `wx.getUserProfile`) +- 用户标识(openid,`wx.login` 必然涉及) +- 设备信息(如果用到 `wx.getSystemInfo`) + +> 2023 年 9 月起微信强制要求。不填写的话,调用 `wx.login` 等隐私相关 API 会直接报错。 +> 即使你当前只用了 `wx.login`,也需要声明"用户标识"这一项。 + +### 2.4 小程序基本信息 + +| 状态 | 项目 | +|------|------| +| 已完成 | AppID 已配置:`wx7c07793d82732921` | +| | 确认小程序名称、图标、简介已填写完整 | +| | 确认小程序类目已选择(建议"工具 - 企业管理"或"生活服务") | + +> 审核时会检查这些基本信息。类目选择需注意:部分类目需要上传营业执照等资质文件,提前确认。 + +### 2.5 体验成员配置 + +| 状态 | 项目 | +|------|------| +| | 在微信后台添加体验成员(成员管理 - 添加体验成员) | + +> 体验版只有体验成员能扫码访问,最多 100 人。内部测试阶段必须配置。 + + +--- + +## 第三阶段:后端核心功能(P0 - 上线的硬前提) + +没有这些功能,小程序即使能打开也无法提供真实服务。 + +### 3.1 微信登录接口(依赖 2.2) + +| 状态 | 项目 | +|------|------| +| | 后端实现 `POST /api/auth/wechat_login` | +| | 在 `.env.local` 中配置 `WX_APP_ID` 和 `WX_APP_SECRET` | +| | 小程序端 `app.ts` 改造:`wx.login` 拿 code 后调后端换 JWT | +| | 后续请求统一带 Authorization header | + +当前状态: +- 后端 JWT 框架已有(`app/auth/jwt.py`,签发/验证/刷新) +- 管理后台登录已实现(`POST /api/auth/login`) +- 小程序端 `app.ts` 里 `wx.login` 只是 `console.log(res.code)`,未发给后端 + +需要实现的完整链路: +``` +小程序 wx.login() 拿到 code + --> POST /api/auth/wechat_login { code: "xxx" } + --> 后端调微信 jscode2session 换 openid + session_key + --> 查 users 表,不存在则创建 + --> 签发 JWT(access_token + refresh_token) + --> 小程序存储 token,后续请求带上 +``` + +> `WX_APP_SECRET` 从微信后台获取(开发 - 开发管理 - 开发设置 - AppSecret)。 +> 这个值绝对不能出现在前端代码或 Git 仓库里,只放在服务器的 `.env.local` 中。 + +### 3.2 权限中间件(依赖 3.1) + +| 状态 | 项目 | +|------|------| +| | 实现权限中间件(基于 JWT 中的 site_id + role 校验) | +| | 需要鉴权的路由挂上 `Depends(get_current_user)` | + +当前状态: +- RBAC 的 DDL 已定义(roles / permissions / user_roles / role_permissions 表) +- `app/auth/dependencies.py` 有 `get_current_user` 但未实际使用 +- `app/middleware/__init__.py` 为空,无鉴权逻辑 +- 后端 API 目前全部裸奔,任何人可调用任何接口 + +> 这是安全底线。涉及多门店、多角色、财务数据的场景,没有权限校验等于公开数据库。 + +### 3.3 小程序业务页面(依赖 3.1) + +| 状态 | 项目 | +|------|------| +| 已完成 | MVP 全链路验证页面(`pages/mvp/mvp`,显示 "t91") | +| | 至少一个有实际功能的首页(审核要求) | +| | 清理调试代码(`console.log` 等) | + +当前状态: +- 只有 mvp 测试页、默认 index 和 logs 页 +- `app.ts` 里有 `console.log(res.code)` 调试输出 +- 提交审核时需要有实际功能的页面,否则大概率被拒 + +> 小程序的 API 地址已通过 `utils/config.ts` 实现环境自动切换: +> develop(开发版)指向本机,trial(体验版)和 release(正式版)指向线上域名。 + +### 3.4 密钥配置到后端 + +| 状态 | 项目 | +|------|------| +| 已完成 | `WX_CALLBACK_TOKEN` 已写入 `.env.local` | +| | `WX_APP_ID` 写入服务器 `.env.local` | +| | `WX_APP_SECRET` 写入服务器 `.env.local` | +| | 确认 `JWT_SECRET_KEY` 已配置(不要用默认值) | + +> `.env.template` 中已有这些配置项的模板,复制到 `.env.local` 后填入实际值。 +> 正式环境的 `JWT_SECRET_KEY` 必须是一个随机强密码,不能和测试环境相同。 + +--- + +## 第四阶段:部署与发布流程(P0 - 让代码流动起来) + +### 4.1 日常部署流程 + +开发到测试: + +``` +1. 本机 dev 分支开发完成,push 到远程 +2. 本机合并 dev 到 test:git checkout test && git merge dev && git push +3. 服务器测试环境拉取:cd D:\NeoZQYY\test\repo && git pull +4. 如有依赖变更:uv sync --all-packages +5. 关闭测试环境后端窗口,重新运行 start-test-api.bat +6. 微信开发者工具上传体验版(指向测试环境 API) +``` + +测试到正式: + +``` +1. 测试通过后,合并 test 到 master:git checkout master && git merge test && git push +2. 服务器正式环境拉取:cd D:\NeoZQYY\prod\repo && git pull +3. 如有依赖变更:uv sync --all-packages +4. 关闭正式环境后端窗口,重新运行 start-prod-api.bat +5. 微信小程序提交审核 --> 发布 +``` + +### 4.2 小程序版本与发布 + +版本类型: + +| 版本 | 对应环境 | 谁能访问 | 用途 | +|------|----------|----------|------| +| 开发版 | 本机 dev | 开发者自己(微信开发者工具) | 日常开发调试 | +| 体验版 | 服务器 test | 体验成员(最多 100 人) | 内部测试 | +| 正式版 | 服务器 prod | 所有用户 | 线上运营 | + +完整发布流水线: + +``` +本机开发(开发版) + | 微信开发者工具"上传" + v +体验版(指向测试环境 API) + | 测试通过 + v +微信后台"提交审核" + | 审核通过(通常 1-3 个工作日,快的几小时) + v +微信后台"发布" + v +正式版(指向正式环境 API) +``` + +开发版操作: +1. 微信开发者工具打开 `apps/miniprogram/` +2. 勾选"详情 - 本地设置 - 不校验合法域名"(仅开发阶段) +3. 编译预览,确认功能正常 + +上传体验版: +1. 微信开发者工具点右上角"上传" +2. 填写版本号(如 `0.1.0`)和项目备注 +3. 微信后台 - 开发管理 - 开发版本 - 点"选为体验版" +4. 体验版会生成二维码,扫码即可体验 + +提交审核: +1. 微信后台 - 开发管理 - 开发版本 - 点"提交审核" +2. 填写功能页面截图、类目、功能介绍 +3. 如有登录功能,提供测试账号密码 +4. 等待审核(预留至少一周缓冲期) + +发布上线: +1. 审核通过后,微信后台 - 开发管理 - 审核版本 - 点"发布" +2. 可选全量发布或灰度发布(建议先 10% 观察) + +版本号规范(语义化): +- `0.1.0` -- MVP 验证 +- `0.2.0` -- 新增登录功能 +- `0.2.1` -- 修复 bug +- `1.0.0` -- 正式上线第一版 + +### 4.3 紧急回滚 + +后端回滚: +```powershell +cd D:\NeoZQYY\prod\repo +git log --oneline -5 # 查看最近提交 +git reset --hard # 回退到指定版本 +# 关闭正式环境后端窗口,重新运行 start-prod-api.bat +``` + +小程序回滚: +1. 微信后台 - 开发管理 - 线上版本 - 点"版本回退" +2. 回退即时生效,用户下次打开加载旧版本 +3. 注意:只能回退一个版本,不要连续发布多个有问题的版本 + +### 4.4 发布检查清单 + +每次发布前确认: + +- [ ] API 地址指向正确环境 +- [ ] 服务器后端已部署最新代码并重启 +- [ ] 数据库迁移已执行(如有) +- [ ] 关键功能手动测试通过 +- [ ] 无 console.log 调试信息残留 +- [ ] 版本号已更新 +- [ ] 备注中写明本次变更内容 + + +--- + +## 第五阶段:安全与审计(P1 - 上线前必须做) + +这些不会阻止小程序"能用",但不做会导致可预期的安全事故或管理问题。 + +### 5.1 用户申请/审核流 + +| 状态 | 项目 | +|------|------| +| | 创建 `user_application` 表(DDL + 迁移脚本) | +| | 实现审核 API:`POST /api/applications`、`POST /api/applications/{id}/approve` | +| | 状态机:pending / approved / rejected / disabled | + +> `users` 表 DDL 已有(`db/zqyy_app/schemas/init.sql`),但缺少申请表和审核接口。 + +### 5.2 审计日志 + +| 状态 | 项目 | +|------|------| +| | 创建 `audit_log` 表 | +| | 实现审计中间件(关键写操作自动记录) | + +> 涉及工资/财务场景,"谁在何时改了什么"必须可追溯。 +> 至少记录:审批操作、xlsx 导入、口径裁决、工资重算。 + +### 5.3 后端结构化日志 + +| 状态 | 项目 | +|------|------| +| | 后端接入结构化日志(替代 uvicorn 默认日志) | + +当前状态: +- ETL 有完整日志体系(`logging_utils.py`) +- 后端仅 uvicorn 默认日志,排查问题困难 + +### 5.4 安全加固 + +| 状态 | 项目 | +|------|------| +| | 确认服务器防火墙只允许 Tailscale 网卡入站到 API 端口 | +| | 确认 PostgreSQL 只监听内网/本机,不对公网开放 | +| | 确认 pg_hba.conf 配置合理 | +| | 消息推送从明文模式切换到安全模式(AES 加解密) | + +--- + +## 第六阶段:审核准备(P1 - 提交审核前) + +### 6.1 审核材料准备 + +| 状态 | 项目 | +|------|------| +| | 准备主要页面功能截图 | +| | 准备测试账号(如有登录功能) | +| | 确认类目资质文件(营业执照等,视类目要求) | +| | 撰写功能介绍文案 | + +> 首次审核通常 1-3 个工作日,快的几小时。被拒后修改重新提交。 +> 建议预留至少一周的审核缓冲期。 + +--- + +## 第七阶段:可上线后迭代(P2) + +这些可以在小程序上线后逐步补齐。 + +### 7.1 xlsx 导入/导出 + +| 状态 | 项目 | +|------|------| +| | 后端实现上传、解析、校验、落库、错误报告接口 | + +> ETL 层已有 openpyxl 依赖,但后端无上传接口。 + +### 7.2 运维监控系统(将取代 bat 脚本手动管理) + +| 状态 | 项目 | +|------|------| +| | 需求与 spec 编写 | +| | 后端采集器 + 监控 API | +| | 前端监控面板(管理后台新页面) | +| | 告警推送(企业微信机器人 webhook) | +| | 上线后取代 bat 脚本,接管服务启停 | + +规划方案(BS 架构,集成到现有管理后台): + +技术选型: +- 后端:FastAPI 定时任务采集指标,写入 PostgreSQL `monitor` schema +- 前端:`apps/admin-web/` 新增 Monitor 页面,Ant Design Charts 可视化 +- 告警:企业微信机器人 webhook 推送 + +监控维度: + +| 维度 | 数据来源 | 复杂度 | +|------|----------|--------| +| 服务器载荷(CPU/内存/磁盘) | `psutil` 采集 | 低 | +| 服务状态(后端进程存活) | `/health` 探活 | 低 | +| 网络上下行 | `psutil.net_io_counters` | 低 | +| 数据库状态(连接数/慢查询/表大小) | `pg_stat_*` 系统视图 | 中 | +| ETL 连接器字段漂移检测 | ETL `meta` 层运行元数据 | 中 | + +代码落点: +``` +apps/backend/app/ + routers/monitor.py # 监控数据 API + services/monitor_collector.py # 采集器(定时任务) +apps/admin-web/src/pages/ + Monitor.tsx # 监控面板页面 +db/zqyy_app/migrations/ + YYYYMMDD_create_monitor_tables.sql +``` + +功能目标: +- 数据可视化:实时仪表盘 + 历史趋势图 +- 告警分级:按信息紧急程度分 INFO / WARNING / CRITICAL +- 服务管理:上线后可通过面板启停服务,取代手动 bat 脚本 + +> 优先级 P2,建议在小程序上线稳定后再启动开发。 +> 到时候用一个 spec 来拆解需求和实现计划。 + +### 7.3 租户模型 + +| 状态 | 项目 | +|------|------| +| | tenant 层实现(当前单租户场景可暂缓) | +| | RLS Policy DDL 落库(当前 ETL 只读连接已设置 `app.current_site_id`) | + +### 7.4 ETL SDK 抽象 + +| 状态 | 项目 | +|------|------| +| | 将飞球 Connector 抽象为通用连接器基类 | + +> 当前飞球 Connector 结构清晰,但未抽象为通用 SDK。多球房扩展时需要。 + +### 7.5 自动化测试 + +| 状态 | 项目 | +|------|------| +| | 后端 API 集成测试 | +| | 小程序端自动化测试 | + +--- + +## 已完成项摘要 + +| 完成日期 | 项目 | 说明 | +|----------|------|------| +| 2026-02-19 | MVP 全链路验证 | 小程序 - FastAPI - PostgreSQL 通路已跑通,`GET /api/xcx-test` 返回 `{"ti": "t91"}` | +| 2026-02-19 | 后端 MVP 页面 | `pages/mvp/mvp` 已创建并注册为首页,从 API 读取数据并显示 | +| 2026-02-19 | 环境管理方案 | 服务器目录结构、Git 流程、bat 脚本服务管理方案已输出 | +| 2026-02-19 | PRE-TEST 验证报告 | 逐项对照 8 大板块标注完成度,已合并到本文档 | +| 2026-02-19 | 合法域名 + HTTPS | 微信后台已配置 request/socket/upload/download 合法域名 | +| 2026-02-19 | 消息推送后端接口 | `GET/POST /api/wx/callback` 已实现(`wx_callback.py`),支持验签和消息接收 | +| 2026-02-19 | 小程序环境配置 | `utils/config.ts` 已创建,根据运行环境自动切换 API 地址 | +| 2026-02-19 | 版本与发布流程文档 | 已合并到本文档第四阶段 | +| 已有 | JWT 认证框架 | `app/auth/jwt.py` 已实现签发/验证/刷新 | +| 已有 | 管理后台登录 | `POST /api/auth/login` 已实现 | +| 已有 | 健康检查 | `GET /health` 已实现 | +| 已有 | CORS 配置 | 已配置,支持环境变量覆盖 | +| 已有 | OpenAPI 文档 | `/docs`(Swagger)和 `/redoc` 已启用 | +| 已有 | 数据分层架构 | ODS/DWD/DWS/Core/Meta/App 六层 schema 已建立 | +| 已有 | Monorepo + Steering | 单仓库统一管理,Steering 规则已配置 | +| 已有 | Git 三分支 | dev/test/master 已建立 | +| 已有 | RBAC DDL | roles/permissions/user_roles/role_permissions 表已定义 | +| 已有 | site_id 贯穿 | 所有业务表均有 site_id 字段 | +| 已有 | 跳板机 + Tailscale | 内外网分层已配置 | + +--- + +## 数据库环境隔离速查 + +| 数据库 | 用途 | 备注 | +|--------|------|------| +| `test_etl_feiqiu` | 开发 + 测试 ETL 数据 | 可随时重建 | +| `test_zqyy_app` | 开发 + 测试业务数据 | 可随时重建 | +| `etl_feiqiu` | 正式 ETL 数据 | 需备份 | +| `zqyy_app` | 正式业务数据 | 需备份 | + +--- + +## 风险提醒 + +| 风险 | 级别 | 当前缓解 | 是否充分 | +|------|------|----------|----------| +| 数据越权/串账 | P0 | site_id 已贯穿,ETL 只读连接有 RLS | 后端 API 无权限中间件,需尽快补齐 | +| 财务口径不可追溯 | P0 | ETL 有 meta 层记录运行元数据 | 后端无 audit_log,需补齐 | +| 单点故障 | P0 | 无 | 无备份/恢复方案,需尽快建立 | +| 密钥泄露 | P1 | .env 已 gitignore | AppSecret 需确认只在服务端 | +| ETL 扩展维护 | P1 | 飞球 Connector 结构清晰 | 未抽象为通用 SDK | +| 联调回归成本 | P2 | Steering + Hooks 已配置 | 缺自动化集成测试 | diff --git a/pyproject.toml b/pyproject.toml index f8ea37b..9d3cfa5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,5 +7,6 @@ requires-python = ">=3.10" members = [ "apps/etl/connectors/feiqiu", "apps/backend", + "apps/mcp-server", "packages/shared", ] \ No newline at end of file diff --git a/scripts/server/init-clean-branch.py b/scripts/server/init-clean-branch.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/server/server-exclude.txt b/scripts/server/server-exclude.txt new file mode 100644 index 0000000..a9dd6d8 --- /dev/null +++ b/scripts/server/server-exclude.txt @@ -0,0 +1,122 @@ +# ============================================================================== +# 服务器端 Git 排除规则 +# ============================================================================== +# 用途:服务器上不需要的文件,避免占用磁盘空间和干扰运行环境。 +# 使用方式:运行 scripts/server/setup-server-git.py 自动配置。 +# +# 原则:服务器只跑后端 API(+ 可选 ETL),其余全部排除。 +# 注意:此文件影响未 track 的新文件。 +# 对于已 track 但服务器不需要的文件, +# 由 setup-server-git.py 配合 skip-worktree 处理。 + +# ===== 环境配置(服务器有自己的 .env,不用 Git 里的) ===== +.env +.env.local +# 模板保留,方便参考 +# !.env.template + +# ===== ETL 导出数据(仅开发机留存) ===== +export/ + +# ===== 文档(开发参考用,服务器不需要) ===== +docs/ +# 如果需要部署文档可单独拉取,但运行时不依赖 + +# ===== H5 原型设计稿 ===== +# 已在 docs/h5_ui/ 下,被 docs/ 规则覆盖 + +# ===== 小程序源码(服务器不编译小程序) ===== +apps/miniprogram/ + +# ===== 管理后台源码(服务器只需要 dist/,不需要源码和 node_modules) ===== +apps/admin-web/src/ +apps/admin-web/node_modules/ +apps/admin-web/pnpm-lock.yaml + +# ===== 测试(服务器不跑测试) ===== +tests/ +.hypothesis/ +.pytest_cache/ +pytest-cache-files-*/ + +# ===== 示例数据 ===== +samples/ + +# ===== 临时目录 ===== +tmp/ + +# ===== 运维脚本中的一次性脚本(服务器不需要开发用的 ops 脚本) ===== +scripts/ops/ +scripts/audit/ +scripts/migrate/ + +# ===== 根目录散文件(开发用,服务器不需要) ===== +*.png +*.code-workspace +start-admin.bat +.kiroignore + +# ===== Kiro 配置(服务器上不用 Kiro) ===== +.kiro/ + +# ===== infra 配置文档(参考用,服务器不需要) ===== +infra/ + +# ===== 日志文件(服务器自己产生的日志不入 Git) ===== +logs/ +*.log +*.jsonl + +# ===== Python 虚拟环境(服务器自己 uv sync 生成) ===== +.venv/ +venv/ +ENV/ +env/ + +# ===== Python 缓存与构建产物 ===== +__pycache__/ +*.pyc +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +dist/ + +# ===== 测试覆盖率 ===== +.coverage +htmlcov/ + +# ===== Node ===== +node_modules/ + +# ===== infra 敏感文件 ===== +infra/**/*.key +infra/**/*.pem +infra/**/*.secret + +# ===== IDE ===== +.idea/ +.vscode/ +*.swp +*.swo +*~ +.specstory/ +.cursorindexingignore + +# ===== Windows 杂项 ===== +*.lnk +.Deleted/ diff --git a/scripts/server/setup-server-git.py b/scripts/server/setup-server-git.py new file mode 100644 index 0000000..058ca2a --- /dev/null +++ b/scripts/server/setup-server-git.py @@ -0,0 +1,137 @@ +""" +服务器 Git 环境配置脚本 + +在服务器上首次 git clone 后运行一次,完成两件事: +1. 将 server-exclude.txt 复制到 .git/info/exclude +2. 对已 track 但服务器不需要的文件/目录设置 skip-worktree, + 这样 git pull 不会覆盖本地删除,也不会在工作区还原这些文件。 + +用法: + cd D:\\NeoZQYY\\test\\repo (或 prod\\repo) + python scripts/server/setup-server-git.py + +运行后可以安全删除 export/ 等目录释放磁盘空间。 +""" + +import shutil +import subprocess +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent + +# 需要 skip-worktree 的路径前缀(已被 track 但服务器不需要) +SKIP_PREFIXES = [ + "export/", + ".env", + "docs/", + "tests/", + "samples/", + "infra/", + ".kiro/", + ".hypothesis/", + "apps/miniprogram/", + "apps/admin-web/src/", + "apps/admin-web/pnpm-lock.yaml", + "scripts/ops/", + "scripts/audit/", + "scripts/migrate/", + # 根目录散文件(截图、workspace 文件等) + "coach-detail-full.png", + "customer-detail-full.png", + "perf-records-current.png", + "white-screen-debug.png", + "NeoZQYY.code-workspace", + "start-admin.bat", + ".kiroignore", +] + +# 完全不需要出现在服务器工作区的目录(skip-worktree 后可删除释放空间) +DELETABLE_DIRS = [ + "export", + "docs", + "tests", + "samples", + "infra", + ".kiro", + ".hypothesis", + "apps/miniprogram", + "scripts/ops", + "scripts/audit", + "scripts/migrate", +] + + +def copy_exclude(): + """复制排除规则到 .git/info/exclude""" + src = REPO_ROOT / "scripts" / "server" / "server-exclude.txt" + dst = REPO_ROOT / ".git" / "info" / "exclude" + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) + print(f" 已复制 {src.name} -> {dst}") + + +def get_tracked_files(prefix: str) -> list[str]: + """获取匹配前缀的已 track 文件列表""" + result = subprocess.run( + ["git", "ls-files", "--", prefix], + capture_output=True, text=True, cwd=REPO_ROOT, + ) + return [f for f in result.stdout.strip().split("\n") if f] + + +def skip_worktree(files: list[str]): + """对文件列表设置 skip-worktree 标记""" + if not files: + return + # git update-index 一次处理的文件数有限,分批 + batch_size = 50 + for i in range(0, len(files), batch_size): + batch = files[i:i + batch_size] + subprocess.run( + ["git", "update-index", "--skip-worktree"] + batch, + cwd=REPO_ROOT, + ) + + +def main(): + print("=== 服务器 Git 环境配置 ===\n") + + # 1. 复制 exclude 规则 + print("[1/3] 配置 .git/info/exclude ...") + copy_exclude() + + # 2. 设置 skip-worktree + print("\n[2/3] 设置 skip-worktree(已 track 但服务器不需要的文件)...") + total_skipped = 0 + for prefix in SKIP_PREFIXES: + files = get_tracked_files(prefix) + if files: + skip_worktree(files) + total_skipped += len(files) + print(f" {prefix} -> {len(files)} 个文件已标记") + else: + print(f" {prefix} -> 无匹配文件") + print(f" 共标记 {total_skipped} 个文件") + + # 3. 提示可删除的目录 + print("\n[3/3] 以下目录已标记 skip-worktree,可安全删除以释放磁盘空间:") + for d in DELETABLE_DIRS: + dir_path = REPO_ROOT / d + if dir_path.exists(): + # 计算目录大小 + size = sum(f.stat().st_size for f in dir_path.rglob("*") if f.is_file()) + size_mb = size / (1024 * 1024) + print(f" {d}/ ({size_mb:.1f} MB)") + else: + print(f" {d}/ (不存在,无需处理)") + + print("\n如需删除,手动执行:") + for d in DELETABLE_DIRS: + print(f" rmdir /s /q {d}") + + print("\n配置完成。后续 git pull 不会还原这些文件。") + + +if __name__ == "__main__": + main()