# -*- coding: utf-8 -*- """ DWS配置数据初始化任务 功能说明: 执行 seed_dws_config.sql,向配置表插入初始数据 执行前提: - billiards_dws schema 已创建(INIT_DWS_SCHEMA) - 配置表已存在 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from pathlib import Path from typing import Any from tasks.base_task import BaseTask, TaskContext class SeedDwsConfigTask(BaseTask): """ DWS配置数据初始化任务 执行 seed_dws_config.sql 文件,向以下配置表插入初始数据: - cfg_performance_tier: 绩效档位配置 - cfg_assistant_level_price: 助教等级定价 - cfg_bonus_rules: 奖金规则配置 - cfg_area_category: 台区分类映射 - cfg_skill_type: 技能课程类型映射 """ def get_task_code(self) -> str: return "SEED_DWS_CONFIG" def extract(self, context: TaskContext) -> dict[str, Any]: """ 读取配置数据SQL文件 """ base_dir = Path(__file__).resolve().parents[1] / "database" seed_path = Path(self.config.get("schema.seed_dws_file", base_dir / "seed_dws_config.sql")) if not seed_path.exists(): raise FileNotFoundError(f"未找到 DWS 配置数据文件: {seed_path}") return { "seed_sql": seed_path.read_text(encoding="utf-8"), "seed_file": str(seed_path) } def load(self, extracted: dict[str, Any], context: TaskContext) -> dict: """ 执行配置数据SQL """ with self.db.conn.cursor() as cur: self.logger.info("执行 DWS 配置数据文件: %s", extracted["seed_file"]) cur.execute(extracted["seed_sql"]) self.logger.info("DWS 配置数据初始化完成") return {"executed": 1, "files": [extracted["seed_file"]]}