Files
Neo-ZQYY/apps/backend/app/routers/tenant_excel.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

997 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
租户管理后台 — Excel 上传/校验/冲突/写入路由。
端点清单:
- POST /api/tenant/excel/upload — 上传解析 + 格式校验 + 人员匹配 + 冲突检测
- POST /api/tenant/excel/confirm — 确认写入(单事务)
- GET /api/tenant/excel/logs — 上传记录列表(分页)
- GET /api/tenant/excel/template/{type} — 下载空白 Excel 模板
需求: 5.1-5.5, 6.1-6.5, 7.1-7.5, 8.1-8.5
AI_CHANGELOG
- 2026-03-23 21:00:00 | Prompt: P20260323-210000根治 tenant_admin managed_site_ids 限制)| Direct causeJWT managed_site_ids 静态签发,新建店铺后所有端点受限 | Summary两个 verify_site_access 改用 admin=adminlist_upload_logs 的 site_filter_clause 改用 admin=admin | VerifyExcel 上传/确认/日志覆盖新建店铺
"""
from __future__ import annotations
import io
import json
import logging
import re
from datetime import date, datetime, timezone
from decimal import Decimal, InvalidOperation
from typing import Any, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from fastapi.responses import StreamingResponse
from app.auth.tenant_admins import (
CurrentTenantAdmin,
require_tenant_admin,
site_filter_clause,
verify_site_access,
)
from app.database import get_connection, get_etl_readonly_connection
from app.schemas.tenant_excel import (
ConfirmRequest,
ConflictDiff,
FieldDiff,
UploadLogItem,
ValidationError as VError,
ValidationResult,
ValidationWarning as VWarning,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/tenant/excel", tags=["租户Excel上传"])
# ── 常量 ──────────────────────────────────────────────────
VALID_UPLOAD_TYPES = {"expense", "platform_income", "salary_adj", "recharge_commission"}
EXPENSE_CATEGORIES = [
"房租", "水电", "物业", "食品饮料进货", "耗材", "报销", "固定人员工资", "其他费用",
]
SALARY_ADJ_TYPES = {"扣款": "deduction", "奖金": "bonus"}
# 模板列定义(中文表头 → 内部字段名)
TEMPLATE_COLUMNS: dict[str, list[tuple[str, str]]] = {
"expense": [
("月份", "expense_month"),
("支出类别", "category"),
("金额", "amount"),
("备注", "remark"),
],
"platform_income": [
("月份", "income_month"),
("平台名称", "platform_name"),
("金额", "amount"),
("备注", "remark"),
],
"salary_adj": [
("月份", "salary_month"),
("助教姓名", "assistant_name"),
("助教编号", "assistant_number"),
("类型", "adjustment_type"),
("金额", "amount"),
("原因", "reason"),
],
"recharge_commission": [
("充值日期", "recharge_date"),
("会员名称", "member_name"),
("充值金额", "recharge_amount"),
("归属助教", "assigned_assistant"),
("奖励金额", "reward_amount"),
],
}
# 冲突检测主键规则(不含 site_idsite_id 在查询时自动附加)
CONFLICT_KEYS: dict[str, list[str]] = {
"expense": ["expense_month", "category"],
"platform_income": ["income_month", "platform_name"],
"salary_adj": ["salary_month", "assistant_name", "assistant_number", "adjustment_type", "reason"],
"recharge_commission": ["recharge_date", "member_name", "assigned_assistant"],
}
# 目标表映射
TARGET_TABLES: dict[str, str] = {
"expense": "biz.stg_finance_expense",
"platform_income": "biz.stg_platform_income",
"salary_adj": "biz.salary_adjustments",
"recharge_commission": "biz.stg_recharge_commission",
}
# 各表写入字段(不含 id, upload_batch_id, created_at, synced_at 等自动字段)
TABLE_WRITE_FIELDS: dict[str, list[str]] = {
"expense": ["site_id", "expense_month", "category", "amount", "remark", "upload_batch_id", "created_at"],
"platform_income": ["site_id", "income_month", "platform_name", "amount", "remark", "upload_batch_id", "created_at"],
"salary_adj": ["site_id", "assistant_id", "assistant_name", "assistant_number", "salary_month", "adjustment_type", "amount", "reason", "upload_batch_id", "created_at", "created_by"],
"recharge_commission": ["site_id", "recharge_date", "member_name", "recharge_amount", "assigned_assistant", "reward_amount", "upload_batch_id", "created_at"],
}
# ── 校验工具函数 ──────────────────────────────────────────
_MONTH_RE = re.compile(r"^\d{4}-(0[1-9]|1[0-2])$")
_DATE_RE = re.compile(r"^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])$")
def _validate_month(value: str, current_month: str | None = None) -> str | None:
"""校验月份格式 YYYY-MM返回错误描述或 None。"""
if not value or not _MONTH_RE.match(str(value).strip()):
return "月份格式应为 YYYY-MM"
return None
def _validate_date(value: str) -> str | None:
"""校验日期格式 YYYY-MM-DD。"""
if not value or not _DATE_RE.match(str(value).strip()):
return "日期格式应为 YYYY-MM-DD"
# 额外验证日期合法性
try:
datetime.strptime(str(value).strip(), "%Y-%m-%d")
except ValueError:
return "无效的日期"
return None
def _validate_positive_amount(value: Any) -> str | None:
"""校验金额 > 0精度 2 位小数。"""
try:
d = Decimal(str(value))
except (InvalidOperation, TypeError, ValueError):
return "金额必须为有效数字"
if d <= 0:
return "金额必须大于 0"
if d.as_tuple().exponent is not None and abs(d.as_tuple().exponent) > 2:
return "金额精度不能超过 2 位小数"
return None
def _validate_non_negative_amount(value: Any) -> str | None:
"""校验金额 ≥ 0精度 2 位小数。"""
try:
d = Decimal(str(value))
except (InvalidOperation, TypeError, ValueError):
return "金额必须为有效数字"
if d < 0:
return "金额不能为负数"
if d.as_tuple().exponent is not None and abs(d.as_tuple().exponent) > 2:
return "金额精度不能超过 2 位小数"
return None
def _validate_not_empty(value: Any, max_len: int | None = None) -> str | None:
"""校验非空字符串,可选最大长度。"""
s = str(value).strip() if value is not None else ""
if not s:
return "不能为空"
if max_len and len(s) > max_len:
return f"长度不能超过 {max_len} 字符"
return None
def validate_rows(upload_type: str, rows: list[dict]) -> tuple[list[VError], list[dict]]:
"""
按模板类型校验数据行。
返回 (errors, passed_rows)。
passed_rows 中的字段值已做类型转换(如金额转 float
"""
errors: list[VError] = []
passed: list[dict] = []
for row in rows:
row_idx = row.get("row_index", 0)
row_errors: list[VError] = []
if upload_type == "expense":
_validate_expense_row(row, row_idx, row_errors)
elif upload_type == "platform_income":
_validate_platform_income_row(row, row_idx, row_errors)
elif upload_type == "salary_adj":
_validate_salary_adj_row(row, row_idx, row_errors)
elif upload_type == "recharge_commission":
_validate_recharge_commission_row(row, row_idx, row_errors)
if row_errors:
errors.extend(row_errors)
else:
passed.append(row)
return errors, passed
def _validate_expense_row(row: dict, row_idx: int, errors: list[VError]):
"""校验财务支出行。"""
# 月份
err = _validate_month(row.get("expense_month", ""))
if err:
errors.append(VError(row_index=row_idx, column="月份", message=err))
# 支出类别
cat = str(row.get("category", "")).strip()
if cat not in EXPENSE_CATEGORIES:
errors.append(VError(
row_index=row_idx, column="支出类别",
message=f"无效的支出类别,可选值:{''.join(EXPENSE_CATEGORIES)}",
))
# 金额
err = _validate_positive_amount(row.get("amount"))
if err:
errors.append(VError(row_index=row_idx, column="金额", message=err))
# 备注(可选,最长 500
remark = row.get("remark")
if remark is not None and str(remark).strip():
if len(str(remark).strip()) > 500:
errors.append(VError(row_index=row_idx, column="备注", message="备注长度不能超过 500 字符"))
def _validate_platform_income_row(row: dict, row_idx: int, errors: list[VError]):
"""校验团购收入行。"""
err = _validate_month(row.get("income_month", ""))
if err:
errors.append(VError(row_index=row_idx, column="月份", message=err))
err = _validate_not_empty(row.get("platform_name"))
if err:
errors.append(VError(row_index=row_idx, column="平台名称", message=err))
err = _validate_positive_amount(row.get("amount"))
if err:
errors.append(VError(row_index=row_idx, column="金额", message=err))
remark = row.get("remark")
if remark is not None and str(remark).strip():
if len(str(remark).strip()) > 500:
errors.append(VError(row_index=row_idx, column="备注", message="备注长度不能超过 500 字符"))
def _validate_salary_adj_row(row: dict, row_idx: int, errors: list[VError]):
"""校验助教奖罚行。"""
err = _validate_month(row.get("salary_month", ""))
if err:
errors.append(VError(row_index=row_idx, column="月份", message=err))
err = _validate_not_empty(row.get("assistant_name"))
if err:
errors.append(VError(row_index=row_idx, column="助教姓名", message=err))
err = _validate_not_empty(row.get("assistant_number"))
if err:
errors.append(VError(row_index=row_idx, column="助教编号", message=err))
adj_type = str(row.get("adjustment_type", "")).strip()
if adj_type not in SALARY_ADJ_TYPES:
errors.append(VError(
row_index=row_idx, column="类型",
message=f"无效的类型,可选值:{''.join(SALARY_ADJ_TYPES.keys())}",
))
err = _validate_positive_amount(row.get("amount"))
if err:
errors.append(VError(row_index=row_idx, column="金额", message=err))
err = _validate_not_empty(row.get("reason"), max_len=200)
if err:
errors.append(VError(row_index=row_idx, column="原因", message=err))
def _validate_recharge_commission_row(row: dict, row_idx: int, errors: list[VError]):
"""校验充值业绩归属行。"""
err = _validate_date(row.get("recharge_date", ""))
if err:
errors.append(VError(row_index=row_idx, column="充值日期", message=err))
err = _validate_not_empty(row.get("member_name"))
if err:
errors.append(VError(row_index=row_idx, column="会员名称", message=err))
err = _validate_positive_amount(row.get("recharge_amount"))
if err:
errors.append(VError(row_index=row_idx, column="充值金额", message=err))
err = _validate_not_empty(row.get("assigned_assistant"))
if err:
errors.append(VError(row_index=row_idx, column="归属助教", message=err))
err = _validate_non_negative_amount(row.get("reward_amount"))
if err:
errors.append(VError(row_index=row_idx, column="奖励金额", message=err))
# ── Excel 解析 ────────────────────────────────────────────
def parse_excel(file_bytes: bytes, upload_type: str) -> list[dict]:
"""
解析 Excel 文件,返回行数据列表。
每行为 dict包含 row_index从 1 开始)和各字段值。
"""
import openpyxl
wb = openpyxl.load_workbook(io.BytesIO(file_bytes), read_only=True, data_only=True)
ws = wb.active
if ws is None:
return []
columns = TEMPLATE_COLUMNS.get(upload_type, [])
if not columns:
return []
rows_data: list[dict] = []
header_row = True
for row in ws.iter_rows(values_only=True):
if header_row:
header_row = False
continue # 跳过表头行
# 跳过全空行
if all(cell is None or str(cell).strip() == "" for cell in row):
continue
row_dict: dict[str, Any] = {"row_index": len(rows_data) + 1}
for i, (_, field_name) in enumerate(columns):
val = row[i] if i < len(row) else None
# 将值转为字符串(保留 None
if val is not None:
row_dict[field_name] = str(val).strip()
else:
row_dict[field_name] = ""
rows_data.append(row_dict)
wb.close()
return rows_data
# ── 人员匹配 ─────────────────────────────────────────────
def match_personnel(
rows: list[dict],
site_id: int,
upload_type: str,
) -> list[VWarning]:
"""
对 salary_adj / recharge_commission 模板执行人员匹配校验。
优先 v_dim_assistantnickname + assistant_number
未匹配再查 v_dim_staff + v_dim_staff_exname + staff_number
匹配成功填充 assistant_id失败标记 warning 不阻断。
"""
if upload_type not in ("salary_adj", "recharge_commission"):
return []
warnings: list[VWarning] = []
# 提取需要匹配的姓名+编号对
if upload_type == "salary_adj":
name_field = "assistant_name"
number_field = "assistant_number"
else:
name_field = "assigned_assistant"
number_field = None # recharge_commission 没有编号字段
# 批量查询 v_dim_assistant
assistant_map: dict[str, int] = {}
staff_map: dict[str, int] = {}
try:
etl_conn = get_etl_readonly_connection(site_id)
try:
with etl_conn.cursor() as cur:
cur.execute(
"SELECT assistant_id, nickname, number FROM fdw_etl.v_dim_assistant WHERE scd2_is_current = 1",
)
for aid, nickname, number in cur.fetchall():
if nickname and number:
assistant_map[f"{nickname}|{number}"] = aid
if nickname:
assistant_map[f"{nickname}|"] = aid
finally:
etl_conn.close()
except Exception:
logger.warning("v_dim_assistant 查询失败site_id=%s", site_id, exc_info=True)
try:
etl_conn = get_etl_readonly_connection(site_id)
try:
with etl_conn.cursor() as cur:
cur.execute(
"SELECT staff_id, name, number FROM fdw_etl.v_dim_staff",
)
for sid, name, number in cur.fetchall():
if name and number:
staff_map[f"{name}|{number}"] = sid
if name:
staff_map[f"{name}|"] = sid
finally:
etl_conn.close()
except Exception:
logger.warning("v_dim_staff 查询失败site_id=%s", site_id, exc_info=True)
for row in rows:
name = str(row.get(name_field, "")).strip()
number = str(row.get(number_field, "")).strip() if number_field else ""
row_idx = row.get("row_index", 0)
# 优先 v_dim_assistant 匹配
key_full = f"{name}|{number}"
key_name = f"{name}|"
matched_id = assistant_map.get(key_full) or assistant_map.get(key_name)
if not matched_id:
matched_id = staff_map.get(key_full) or staff_map.get(key_name)
if matched_id:
row["assistant_id"] = matched_id
else:
row["assistant_id"] = None
warnings.append(VWarning(
row_index=row_idx,
column="助教姓名",
message=f"未匹配到助教/员工:{name}" + (f"(编号 {number}" if number else ""),
))
return warnings
# ── 冲突检测 ──────────────────────────────────────────────
def detect_conflicts(
upload_type: str,
rows: list[dict],
site_id: int,
) -> tuple[list[ConflictDiff], list[dict], list[dict]]:
"""
按模板主键规则检测冲突。
返回 (conflicts, new_rows, conflict_rows_with_existing)。
- conflicts: 冲突 diff 列表
- new_rows: 无冲突的新增行
- conflict_rows_with_existing: 冲突行(附带已有数据用于 confirm 时 UPDATE
"""
keys = CONFLICT_KEYS.get(upload_type, [])
table = TARGET_TABLES.get(upload_type, "")
if not keys or not table:
return [], rows, []
# 查询已有数据
existing_map: dict[tuple, dict] = {}
conn = get_connection()
try:
with conn.cursor() as cur:
key_cols = ", ".join(keys)
# 获取所有字段用于 diff
cur.execute(f"SELECT * FROM {table} WHERE site_id = %s LIMIT 0", (site_id,))
col_names = [desc[0] for desc in cur.description] if cur.description else []
cur.execute(
f"SELECT * FROM {table} WHERE site_id = %s",
(site_id,),
)
for row_data in cur.fetchall():
row_dict = dict(zip(col_names, row_data))
pk = tuple(str(row_dict.get(k, "")).strip() for k in keys)
existing_map[pk] = row_dict
finally:
conn.close()
conflicts: list[ConflictDiff] = []
new_rows: list[dict] = []
conflict_rows: list[dict] = []
# 对 salary_adj 类型,需要将中文类型映射为英文
for row in rows:
pk_values = []
for k in keys:
val = str(row.get(k, "")).strip()
# salary_adj 的 adjustment_type 需要映射
if upload_type == "salary_adj" and k == "adjustment_type":
val = SALARY_ADJ_TYPES.get(val, val)
pk_values.append(val)
pk = tuple(pk_values)
if pk in existing_map:
existing = existing_map[pk]
# 生成逐字段 diff
field_diffs: list[FieldDiff] = []
# 比较可变字段(排除主键和系统字段)
compare_fields = _get_compare_fields(upload_type)
for field_name, display_name in compare_fields:
old_val = str(existing.get(field_name, "")) if existing.get(field_name) is not None else ""
new_val = str(row.get(field_name, "")).strip()
# salary_adj 的 adjustment_type 需要映射
if upload_type == "salary_adj" and field_name == "adjustment_type":
new_val = SALARY_ADJ_TYPES.get(new_val, new_val)
if old_val != new_val:
field_diffs.append(FieldDiff(
field=display_name, old_value=old_val, new_value=new_val,
))
if field_diffs:
conflicts.append(ConflictDiff(
row_index=row.get("row_index", 0),
field_diffs=field_diffs,
))
row["_existing_id"] = existing.get("id")
conflict_rows.append(row)
else:
# 主键匹配但所有字段相同,视为无变化,跳过
conflict_rows.append(row)
row["_existing_id"] = existing.get("id")
else:
new_rows.append(row)
return conflicts, new_rows, conflict_rows
def _get_compare_fields(upload_type: str) -> list[tuple[str, str]]:
"""获取用于 diff 比较的字段列表 [(db_field, display_name)]。"""
if upload_type == "expense":
return [("amount", "金额"), ("remark", "备注")]
elif upload_type == "platform_income":
return [("amount", "金额"), ("remark", "备注")]
elif upload_type == "salary_adj":
return [("amount", "金额"), ("reason", "原因")]
elif upload_type == "recharge_commission":
return [("recharge_amount", "充值金额"), ("reward_amount", "奖励金额")]
return []
# ── POST /api/tenant/excel/upload ─────────────────────────
@router.post("/upload")
async def upload_excel(
file: UploadFile = File(...),
upload_type: str = Form(...),
site_id: int = Form(...),
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""
上传 Excel 文件:解析 → 格式校验 → 人员匹配 → 冲突检测。
返回 upload_id + 校验结果 + 冲突 diff。
"""
# 校验 upload_type
if upload_type not in VALID_UPLOAD_TYPES:
raise HTTPException(status_code=400, detail=f"无效的模板类型,可选值:{', '.join(VALID_UPLOAD_TYPES)}")
# 校验门店权限
verify_site_access(site_id, admin=admin)
# 校验文件格式
filename = file.filename or ""
if not filename.lower().endswith((".xlsx", ".xls")):
raise HTTPException(status_code=400, detail="请上传有效的 Excel 文件(.xlsx/.xls")
# 读取文件内容
file_bytes = await file.read()
if not file_bytes:
raise HTTPException(status_code=400, detail="文件内容为空")
# 解析 Excel
try:
rows = parse_excel(file_bytes, upload_type)
except Exception as e:
logger.warning("Excel 解析失败:%s", e, exc_info=True)
raise HTTPException(status_code=400, detail="Excel 文件解析失败,请检查文件格式")
if not rows:
raise HTTPException(status_code=400, detail="Excel 文件中没有数据行")
# 格式校验
errors, passed_rows = validate_rows(upload_type, rows)
# 如果有格式错误,直接返回(不创建 upload_log
if errors:
return ValidationResult(
errors=errors,
warnings=[],
passed_rows=[],
upload_id=None,
).model_dump(by_alias=True)
# 人员匹配校验(仅 salary_adj / recharge_commission
warnings = match_personnel(passed_rows, site_id, upload_type)
# 冲突检测
conflicts, new_rows, conflict_rows = detect_conflicts(upload_type, passed_rows, site_id)
# 创建 excel_upload_log 记录
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO biz.excel_upload_log
(site_id, upload_type, file_name, uploaded_by, row_count, conflict_count, status)
VALUES (%s, %s, %s, %s, %s, %s, 'pending')
RETURNING id
""",
(site_id, upload_type, filename, admin.admin_id, len(passed_rows), len(conflicts)),
)
upload_id = cur.fetchone()[0]
conn.commit()
except Exception:
conn.rollback()
logger.error("创建 upload_log 失败", exc_info=True)
raise HTTPException(status_code=500, detail="创建上传记录失败")
finally:
conn.close()
# 将通过的行数据临时存储到 upload_log 的 error_detail 字段JSON
# 用于 confirm 时读取(避免二次上传)
_cache_upload_data(upload_id, {
"upload_type": upload_type,
"site_id": site_id,
"new_rows": new_rows,
"conflict_rows": conflict_rows,
})
return {
**ValidationResult(
errors=[],
warnings=warnings,
passed_rows=passed_rows,
upload_id=upload_id,
).model_dump(by_alias=True),
"conflicts": [c.model_dump(by_alias=True) for c in conflicts],
}
def _cache_upload_data(upload_id: int, data: dict):
"""将上传数据缓存到 upload_log.error_detailJSON供 confirm 时使用。"""
conn = get_connection()
try:
with conn.cursor() as cur:
# 序列化时处理 Decimal 等特殊类型
json_str = json.dumps(data, ensure_ascii=False, default=str)
cur.execute(
"UPDATE biz.excel_upload_log SET error_detail = %s::jsonb WHERE id = %s",
(json_str, upload_id),
)
conn.commit()
except Exception:
conn.rollback()
logger.warning("缓存上传数据失败upload_id=%s", upload_id, exc_info=True)
finally:
conn.close()
# ── POST /api/tenant/excel/confirm ────────────────────────
@router.post("/confirm")
async def confirm_upload(
body: ConfirmRequest,
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""
确认写入:单事务写入目标表。
替换行执行 UPDATE新增行执行 INSERT。
写入失败回滚整批log status=failed。
"""
conn = get_connection()
try:
with conn.cursor() as cur:
# 获取 upload_log 记录
cur.execute(
"SELECT site_id, upload_type, status, error_detail FROM biz.excel_upload_log WHERE id = %s",
(body.upload_id,),
)
log_row = cur.fetchone()
if log_row is None:
raise HTTPException(status_code=404, detail="上传记录不存在")
site_id, upload_type, log_status, cached_data = log_row
if log_status != "pending":
raise HTTPException(status_code=409, detail="该上传批次已被处理")
verify_site_access(site_id, admin=admin)
# 从缓存中读取数据
if not cached_data:
raise HTTPException(status_code=400, detail="上传数据已过期,请重新上传")
if isinstance(cached_data, str):
cached_data = json.loads(cached_data)
new_rows = cached_data.get("new_rows", [])
conflict_rows = cached_data.get("conflict_rows", [])
# 构建 resolution 映射
resolution_map: dict[int, str] = {}
for r in body.resolutions:
resolution_map[r.row_index] = r.action
table = TARGET_TABLES[upload_type]
write_fields = TABLE_WRITE_FIELDS[upload_type]
inserted_count = 0
updated_count = 0
resolved_count = 0
# 写入新增行
for row in new_rows:
_insert_row(cur, table, write_fields, row, upload_type, site_id, body.upload_id, admin.admin_id)
inserted_count += 1
# 处理冲突行
for row in conflict_rows:
row_idx = row.get("row_index", 0)
action = resolution_map.get(row_idx, "keep")
existing_id = row.get("_existing_id")
if action == "replace" and existing_id:
_update_row(cur, table, write_fields, row, upload_type, existing_id, site_id, body.upload_id, admin.admin_id)
updated_count += 1
resolved_count += 1
elif action == "keep":
resolved_count += 1
else:
# 无 existing_id 的冲突行按新增处理
_insert_row(cur, table, write_fields, row, upload_type, site_id, body.upload_id, admin.admin_id)
inserted_count += 1
# 更新 upload_log
cur.execute(
"""
UPDATE biz.excel_upload_log
SET status = 'confirmed',
row_count = %s,
resolved_count = %s,
confirmed_at = NOW(),
error_detail = NULL
WHERE id = %s
""",
(inserted_count + updated_count, resolved_count, body.upload_id),
)
conn.commit()
except HTTPException:
conn.rollback()
raise
except Exception as e:
conn.rollback()
# 记录失败状态
_mark_upload_failed(body.upload_id, str(e))
logger.error("写入失败upload_id=%s", body.upload_id, exc_info=True)
raise HTTPException(status_code=500, detail="数据写入失败,已回滚整批")
finally:
conn.close()
return {
"message": "写入成功",
"inserted": inserted_count,
"updated": updated_count,
"resolved": resolved_count,
}
def _insert_row(cur, table: str, fields: list[str], row: dict, upload_type: str, site_id: int, upload_id: int, admin_id: int):
"""插入一行数据到目标表。"""
values = _build_row_values(fields, row, upload_type, site_id, upload_id, admin_id)
placeholders = ", ".join(["%s"] * len(fields))
cols = ", ".join(fields)
cur.execute(f"INSERT INTO {table} ({cols}) VALUES ({placeholders})", tuple(values))
def _update_row(cur, table: str, fields: list[str], row: dict, upload_type: str, existing_id: int, site_id: int, upload_id: int, admin_id: int):
"""更新已有行。"""
values = _build_row_values(fields, row, upload_type, site_id, upload_id, admin_id)
set_parts = [f"{f} = %s" for f in fields]
cur.execute(
f"UPDATE {table} SET {', '.join(set_parts)} WHERE id = %s",
(*values, existing_id),
)
def _build_row_values(fields: list[str], row: dict, upload_type: str, site_id: int, upload_id: int, admin_id: int) -> list:
"""根据字段列表构建值列表。"""
values = []
for f in fields:
if f == "site_id":
values.append(site_id)
elif f == "upload_batch_id":
values.append(upload_id)
elif f == "created_at":
values.append(datetime.now(timezone.utc))
elif f == "created_by":
values.append(admin_id)
elif f == "adjustment_type":
# 中文 → 英文映射
raw = str(row.get(f, "")).strip()
values.append(SALARY_ADJ_TYPES.get(raw, raw))
elif f in ("amount", "recharge_amount", "reward_amount"):
try:
values.append(float(row.get(f, 0)))
except (ValueError, TypeError):
values.append(0.0)
elif f == "assistant_id":
values.append(row.get("assistant_id"))
else:
values.append(str(row.get(f, "")).strip() if row.get(f) is not None else None)
return values
def _mark_upload_failed(upload_id: int, error_msg: str):
"""标记上传批次为失败状态。"""
try:
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.excel_upload_log
SET status = 'failed',
error_detail = %s::jsonb
WHERE id = %s
""",
(json.dumps({"error": error_msg}, ensure_ascii=False), upload_id),
)
conn.commit()
finally:
conn.close()
except Exception:
logger.warning("标记上传失败状态失败upload_id=%s", upload_id, exc_info=True)
# ── GET /api/tenant/excel/logs ────────────────────────────
@router.get("/logs")
async def list_upload_logs(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(20, ge=1, le=100, description="每页条数"),
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""上传记录列表,分页,附加 site_id IN 条件。"""
site_sql, site_params = site_filter_clause(admin=admin)
offset = (page - 1) * page_size
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"SELECT COUNT(*) FROM biz.excel_upload_log WHERE {site_sql}",
site_params,
)
total = cur.fetchone()[0]
cur.execute(
f"""
SELECT id, site_id, upload_type, file_name, uploaded_by,
row_count, conflict_count, resolved_count, status,
created_at::text, confirmed_at::text
FROM biz.excel_upload_log
WHERE {site_sql}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(*site_params, page_size, offset),
)
rows = cur.fetchall()
finally:
conn.close()
items = [
UploadLogItem(
id=r[0], site_id=r[1], upload_type=r[2], file_name=r[3],
uploaded_by=r[4], row_count=r[5], conflict_count=r[6],
resolved_count=r[7], status=r[8], created_at=r[9], confirmed_at=r[10],
).model_dump(by_alias=True)
for r in rows
]
return {"items": items, "total": total, "page": page, "pageSize": page_size}
# ── GET /api/tenant/excel/template/{type} ─────────────────
@router.get("/template/{template_type}")
async def download_template(template_type: str):
"""返回空白 Excel 模板文件(含表头和格式说明)。"""
if template_type not in VALID_UPLOAD_TYPES:
raise HTTPException(status_code=400, detail=f"无效的模板类型,可选值:{', '.join(VALID_UPLOAD_TYPES)}")
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "数据模板"
columns = TEMPLATE_COLUMNS[template_type]
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
# 写入表头
for col_idx, (header_name, _) in enumerate(columns, 1):
cell = ws.cell(row=1, column=col_idx, value=header_name)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
ws.column_dimensions[cell.column_letter].width = 18
# 写入格式说明行(第 2 行,灰色字体)
hint_font = Font(color="808080", italic=True)
hints = _get_template_hints(template_type)
for col_idx, hint in enumerate(hints, 1):
cell = ws.cell(row=2, column=col_idx, value=hint)
cell.font = hint_font
# 输出为字节流
output = io.BytesIO()
wb.save(output)
output.seek(0)
from urllib.parse import quote as _url_quote
filename_map = {
"expense": "财务支出模板.xlsx",
"platform_income": "团购收入模板.xlsx",
"salary_adj": "助教奖罚模板.xlsx",
"recharge_commission": "充值业绩归属模板.xlsx",
}
raw_name = filename_map.get(template_type, "template.xlsx")
encoded_name = _url_quote(raw_name)
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_name}",
},
)
def _get_template_hints(template_type: str) -> list[str]:
"""获取模板格式说明。"""
if template_type == "expense":
return [
"格式YYYY-MM",
f"可选值:{''.join(EXPENSE_CATEGORIES)}",
"大于0保留2位小数",
"可选最长500字符",
]
elif template_type == "platform_income":
return [
"格式YYYY-MM",
"必填",
"大于0保留2位小数",
"可选最长500字符",
]
elif template_type == "salary_adj":
return [
"格式YYYY-MM",
"必填",
"必填",
"可选值:扣款、奖金",
"大于0保留2位小数",
"必填最长200字符",
]
elif template_type == "recharge_commission":
return [
"格式YYYY-MM-DD",
"必填",
"大于0保留2位小数",
"必填",
"≥0保留2位小数",
]
return []