# -*- coding: utf-8 -*- """ DWS Excel导入脚本 功能说明: 支持三类Excel数据的导入: 1. 支出结构(dws_finance_expense_summary) 2. 平台结算(dws_platform_settlement) 3. 充值提成(dws_assistant_recharge_commission) 导入规范: - 字段定义:按照目标表字段要求 - 时间粒度:支出按月,平台结算按日,充值提成按月 - 门店维度:使用配置的site_id - 去重规则:按import_batch_no去重 - 校验规则:金额字段非负,日期格式校验 使用方式: python import_dws_excel.py --type expense --file expenses.xlsx python import_dws_excel.py --type platform --file platform_settlement.xlsx python import_dws_excel.py --type commission --file recharge_commission.xlsx 作者:ETL团队 创建日期:2026-02-01 """ import argparse import os import sys import uuid from datetime import date, datetime from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) try: import pandas as pd except ImportError: print("请安装 pandas: pip install pandas openpyxl") sys.exit(1) from config.settings import AppConfig from database.connection import DatabaseConnection from database.operations import DatabaseOperations # ============================================================================= # 常量定义 # ============================================================================= # 支出类型枚举 EXPENSE_TYPES = { '房租': 'RENT', '水电费': 'UTILITY', '物业费': 'PROPERTY', '工资': 'SALARY', '报销': 'REIMBURSE', '平台服务费': 'PLATFORM_FEE', '其他': 'OTHER', } # 支出大类映射 EXPENSE_CATEGORIES = { 'RENT': 'FIXED_COST', 'UTILITY': 'VARIABLE_COST', 'PROPERTY': 'FIXED_COST', 'SALARY': 'FIXED_COST', 'REIMBURSE': 'VARIABLE_COST', 'PLATFORM_FEE': 'VARIABLE_COST', 'OTHER': 'OTHER', } # 平台类型枚举 PLATFORM_TYPES = { '美团': 'MEITUAN', '抖音': 'DOUYIN', '大众点评': 'DIANPING', '其他': 'OTHER', } # ============================================================================= # 导入基类 # ============================================================================= class BaseImporter: """导入基类""" def __init__(self, config: Config, db: DatabaseConnection): self.config = config self.db = db self.site_id = config.get("app.store_id") self.tenant_id = config.get("app.tenant_id", self.site_id) self.batch_no = self._generate_batch_no() def _generate_batch_no(self) -> str: """生成导入批次号""" timestamp = datetime.now().strftime("%Y%m%d%H%M%S") unique_id = str(uuid.uuid4())[:8] return f"{timestamp}_{unique_id}" def _safe_decimal(self, value: Any, default: Decimal = Decimal('0')) -> Decimal: """安全转换为Decimal""" if value is None or pd.isna(value): return default try: return Decimal(str(value)) except (ValueError, InvalidOperation): return default def _safe_date(self, value: Any) -> Optional[date]: """安全转换为日期""" if value is None or pd.isna(value): return None if isinstance(value, datetime): return value.date() if isinstance(value, date): return value try: return pd.to_datetime(value).date() except: return None def _safe_month(self, value: Any) -> Optional[date]: """安全转换为月份(月第一天)""" dt = self._safe_date(value) if dt: return dt.replace(day=1) return None def import_file(self, file_path: str) -> Dict[str, Any]: """导入文件""" raise NotImplementedError def validate_row(self, row: Dict[str, Any], row_idx: int) -> List[str]: """校验行数据,返回错误列表""" return [] def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]: """转换行数据""" raise NotImplementedError def insert_records(self, records: List[Dict[str, Any]]) -> int: """插入记录""" raise NotImplementedError # ============================================================================= # 支出导入 # ============================================================================= class ExpenseImporter(BaseImporter): """ 支出导入 Excel格式要求: - 月份: 2026-01 或 2026/01/01 格式 - 支出类型: 房租/水电费/物业费/工资/报销/平台服务费/其他 - 金额: 数字 - 备注: 可选 """ TARGET_TABLE = "billiards_dws.dws_finance_expense_summary" REQUIRED_COLUMNS = ['月份', '支出类型', '金额'] OPTIONAL_COLUMNS = ['明细', '备注'] def import_file(self, file_path: str) -> Dict[str, Any]: """导入支出Excel""" print(f"开始导入支出文件: {file_path}") # 读取Excel df = pd.read_excel(file_path) # 校验必要列 missing_cols = [c for c in self.REQUIRED_COLUMNS if c not in df.columns] if missing_cols: return {"status": "ERROR", "message": f"缺少必要列: {missing_cols}"} # 处理数据 records = [] errors = [] for idx, row in df.iterrows(): row_dict = row.to_dict() row_errors = self.validate_row(row_dict, idx + 2) # Excel行号从2开始 if row_errors: errors.extend(row_errors) continue record = self.transform_row(row_dict) records.append(record) if errors: print(f"校验错误: {len(errors)} 条") for err in errors[:10]: print(f" - {err}") # 插入数据 inserted = 0 if records: inserted = self.insert_records(records) return { "status": "SUCCESS" if not errors else "PARTIAL", "batch_no": self.batch_no, "total_rows": len(df), "inserted": inserted, "errors": len(errors), "error_messages": errors[:10] } def validate_row(self, row: Dict[str, Any], row_idx: int) -> List[str]: errors = [] # 校验月份 month = self._safe_month(row.get('月份')) if not month: errors.append(f"行{row_idx}: 月份格式错误") # 校验支出类型 expense_type = row.get('支出类型', '').strip() if expense_type not in EXPENSE_TYPES: errors.append(f"行{row_idx}: 支出类型无效 '{expense_type}'") # 校验金额 amount = self._safe_decimal(row.get('金额')) if amount < 0: errors.append(f"行{row_idx}: 金额不能为负数") return errors def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]: expense_type_name = row.get('支出类型', '').strip() expense_type_code = EXPENSE_TYPES.get(expense_type_name, 'OTHER') expense_category = EXPENSE_CATEGORIES.get(expense_type_code, 'OTHER') return { 'site_id': self.site_id, 'tenant_id': self.tenant_id, 'expense_month': self._safe_month(row.get('月份')), 'expense_type_code': expense_type_code, 'expense_type_name': expense_type_name, 'expense_category': expense_category, 'expense_amount': self._safe_decimal(row.get('金额')), 'expense_detail': row.get('明细'), 'import_batch_no': self.batch_no, 'import_file_name': os.path.basename(str(row.get('_file_path', ''))), 'import_time': datetime.now(), 'import_user': os.getenv('USERNAME', 'system'), 'remark': row.get('备注'), } def insert_records(self, records: List[Dict[str, Any]]) -> int: columns = [ 'site_id', 'tenant_id', 'expense_month', 'expense_type_code', 'expense_type_name', 'expense_category', 'expense_amount', 'expense_detail', 'import_batch_no', 'import_file_name', 'import_time', 'import_user', 'remark' ] cols_str = ", ".join(columns) placeholders = ", ".join(["%s"] * len(columns)) sql = f"INSERT INTO {self.TARGET_TABLE} ({cols_str}) VALUES ({placeholders})" inserted = 0 with self.db.conn.cursor() as cur: for record in records: values = [record.get(col) for col in columns] cur.execute(sql, values) inserted += cur.rowcount self.db.commit() return inserted # ============================================================================= # 平台结算导入 # ============================================================================= class PlatformSettlementImporter(BaseImporter): """ 平台结算导入 Excel格式要求: - 回款日期: 日期格式 - 平台类型: 美团/抖音/大众点评/其他 - 平台订单号: 字符串 - 订单原始金额: 数字 - 佣金: 数字 - 服务费: 数字 - 回款金额: 数字 - 备注: 可选 """ TARGET_TABLE = "billiards_dws.dws_platform_settlement" REQUIRED_COLUMNS = ['回款日期', '平台类型', '回款金额'] OPTIONAL_COLUMNS = ['平台订单号', '订单原始金额', '佣金', '服务费', '关联订单ID', '备注'] def import_file(self, file_path: str) -> Dict[str, Any]: print(f"开始导入平台结算文件: {file_path}") df = pd.read_excel(file_path) missing_cols = [c for c in self.REQUIRED_COLUMNS if c not in df.columns] if missing_cols: return {"status": "ERROR", "message": f"缺少必要列: {missing_cols}"} records = [] errors = [] for idx, row in df.iterrows(): row_dict = row.to_dict() row_errors = self.validate_row(row_dict, idx + 2) if row_errors: errors.extend(row_errors) continue record = self.transform_row(row_dict) records.append(record) if errors: print(f"校验错误: {len(errors)} 条") for err in errors[:10]: print(f" - {err}") inserted = 0 if records: inserted = self.insert_records(records) return { "status": "SUCCESS" if not errors else "PARTIAL", "batch_no": self.batch_no, "total_rows": len(df), "inserted": inserted, "errors": len(errors), } def validate_row(self, row: Dict[str, Any], row_idx: int) -> List[str]: errors = [] settlement_date = self._safe_date(row.get('回款日期')) if not settlement_date: errors.append(f"行{row_idx}: 回款日期格式错误") platform_type = row.get('平台类型', '').strip() if platform_type not in PLATFORM_TYPES: errors.append(f"行{row_idx}: 平台类型无效 '{platform_type}'") amount = self._safe_decimal(row.get('回款金额')) if amount < 0: errors.append(f"行{row_idx}: 回款金额不能为负数") return errors def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]: platform_name = row.get('平台类型', '').strip() platform_type = PLATFORM_TYPES.get(platform_name, 'OTHER') return { 'site_id': self.site_id, 'tenant_id': self.tenant_id, 'settlement_date': self._safe_date(row.get('回款日期')), 'platform_type': platform_type, 'platform_name': platform_name, 'platform_order_no': row.get('平台订单号'), 'order_settle_id': row.get('关联订单ID'), 'settlement_amount': self._safe_decimal(row.get('回款金额')), 'commission_amount': self._safe_decimal(row.get('佣金')), 'service_fee': self._safe_decimal(row.get('服务费')), 'gross_amount': self._safe_decimal(row.get('订单原始金额')), 'import_batch_no': self.batch_no, 'import_file_name': os.path.basename(str(row.get('_file_path', ''))), 'import_time': datetime.now(), 'import_user': os.getenv('USERNAME', 'system'), 'remark': row.get('备注'), } def insert_records(self, records: List[Dict[str, Any]]) -> int: columns = [ 'site_id', 'tenant_id', 'settlement_date', 'platform_type', 'platform_name', 'platform_order_no', 'order_settle_id', 'settlement_amount', 'commission_amount', 'service_fee', 'gross_amount', 'import_batch_no', 'import_file_name', 'import_time', 'import_user', 'remark' ] cols_str = ", ".join(columns) placeholders = ", ".join(["%s"] * len(columns)) sql = f"INSERT INTO {self.TARGET_TABLE} ({cols_str}) VALUES ({placeholders})" inserted = 0 with self.db.conn.cursor() as cur: for record in records: values = [record.get(col) for col in columns] cur.execute(sql, values) inserted += cur.rowcount self.db.commit() return inserted # ============================================================================= # 充值提成导入 # ============================================================================= class RechargeCommissionImporter(BaseImporter): """ 充值提成导入 Excel格式要求: - 月份: 2026-01 格式 - 助教ID: 数字 - 助教花名: 字符串 - 充值订单金额: 数字 - 提成金额: 数字 - 充值订单号: 可选 - 备注: 可选 """ TARGET_TABLE = "billiards_dws.dws_assistant_recharge_commission" REQUIRED_COLUMNS = ['月份', '助教ID', '提成金额'] OPTIONAL_COLUMNS = ['助教花名', '充值订单金额', '充值订单ID', '充值订单号', '备注'] def import_file(self, file_path: str) -> Dict[str, Any]: print(f"开始导入充值提成文件: {file_path}") df = pd.read_excel(file_path) missing_cols = [c for c in self.REQUIRED_COLUMNS if c not in df.columns] if missing_cols: return {"status": "ERROR", "message": f"缺少必要列: {missing_cols}"} records = [] errors = [] for idx, row in df.iterrows(): row_dict = row.to_dict() row_errors = self.validate_row(row_dict, idx + 2) if row_errors: errors.extend(row_errors) continue record = self.transform_row(row_dict) records.append(record) if errors: print(f"校验错误: {len(errors)} 条") for err in errors[:10]: print(f" - {err}") inserted = 0 if records: inserted = self.insert_records(records) return { "status": "SUCCESS" if not errors else "PARTIAL", "batch_no": self.batch_no, "total_rows": len(df), "inserted": inserted, "errors": len(errors), } def validate_row(self, row: Dict[str, Any], row_idx: int) -> List[str]: errors = [] month = self._safe_month(row.get('月份')) if not month: errors.append(f"行{row_idx}: 月份格式错误") assistant_id = row.get('助教ID') if assistant_id is None or pd.isna(assistant_id): errors.append(f"行{row_idx}: 助教ID不能为空") amount = self._safe_decimal(row.get('提成金额')) if amount < 0: errors.append(f"行{row_idx}: 提成金额不能为负数") return errors def transform_row(self, row: Dict[str, Any]) -> Dict[str, Any]: recharge_amount = self._safe_decimal(row.get('充值订单金额')) commission_amount = self._safe_decimal(row.get('提成金额')) commission_ratio = commission_amount / recharge_amount if recharge_amount > 0 else None return { 'site_id': self.site_id, 'tenant_id': self.tenant_id, 'assistant_id': int(row.get('助教ID')), 'assistant_nickname': row.get('助教花名'), 'commission_month': self._safe_month(row.get('月份')), 'recharge_order_id': row.get('充值订单ID'), 'recharge_order_no': row.get('充值订单号'), 'recharge_amount': recharge_amount, 'commission_amount': commission_amount, 'commission_ratio': commission_ratio, 'import_batch_no': self.batch_no, 'import_file_name': os.path.basename(str(row.get('_file_path', ''))), 'import_time': datetime.now(), 'import_user': os.getenv('USERNAME', 'system'), 'remark': row.get('备注'), } def insert_records(self, records: List[Dict[str, Any]]) -> int: columns = [ 'site_id', 'tenant_id', 'assistant_id', 'assistant_nickname', 'commission_month', 'recharge_order_id', 'recharge_order_no', 'recharge_amount', 'commission_amount', 'commission_ratio', 'import_batch_no', 'import_file_name', 'import_time', 'import_user', 'remark' ] cols_str = ", ".join(columns) placeholders = ", ".join(["%s"] * len(columns)) sql = f"INSERT INTO {self.TARGET_TABLE} ({cols_str}) VALUES ({placeholders})" inserted = 0 with self.db.conn.cursor() as cur: for record in records: values = [record.get(col) for col in columns] cur.execute(sql, values) inserted += cur.rowcount self.db.commit() return inserted # ============================================================================= # 主函数 # ============================================================================= def main(): parser = argparse.ArgumentParser(description='DWS Excel导入工具') parser.add_argument( '--type', '-t', choices=['expense', 'platform', 'commission'], required=True, help='导入类型: expense(支出), platform(平台结算), commission(充值提成)' ) parser.add_argument( '--file', '-f', required=True, help='Excel文件路径' ) args = parser.parse_args() # 检查文件 if not os.path.exists(args.file): print(f"文件不存在: {args.file}") sys.exit(1) # 加载配置 config = AppConfig.load() dsn = config["db"]["dsn"] db_conn = DatabaseConnection(dsn=dsn) db = DatabaseOperations(db_conn) try: # 选择导入器 if args.type == 'expense': importer = ExpenseImporter(config, db) elif args.type == 'platform': importer = PlatformSettlementImporter(config, db) elif args.type == 'commission': importer = RechargeCommissionImporter(config, db) else: print(f"未知的导入类型: {args.type}") sys.exit(1) # 执行导入 result = importer.import_file(args.file) # 输出结果 print("\n" + "=" * 50) print("导入结果:") print(f" 状态: {result.get('status')}") print(f" 批次号: {result.get('batch_no')}") print(f" 总行数: {result.get('total_rows')}") print(f" 插入行数: {result.get('inserted')}") print(f" 错误行数: {result.get('errors')}") if result.get('status') == 'ERROR': print(f" 错误信息: {result.get('message')}") sys.exit(1) except Exception as e: print(f"导入失败: {e}") db_conn.rollback() sys.exit(1) finally: db_conn.close() if __name__ == "__main__": main()