# -*- coding: utf-8 -*- """应用程序设置管理""" import json import os import sys from pathlib import Path from typing import Any, Dict, Optional class AppSettings: """应用程序设置单例""" _instance: Optional["AppSettings"] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True # 配置文件路径 self._settings_file = self._get_settings_path() # 默认设置 self._settings = { "etl_project_path": "", # ETL 项目路径 "env_file_path": "", # .env 文件路径 # 自动更新配置 "auto_update": { "hours": 24, "overlap_seconds": 3600, "include_dwd": True, "auto_verify": False, "selected_tasks": [], }, # 数据校验配置 "integrity_check": { "mode": "history", "history_start": "", "history_end": "", "lookback_hours": 24, "include_dimensions": False, "auto_backfill": False, "ods_tasks": "", }, # 高级配置 "advanced": { "pipeline_flow": "FULL", "dry_run": False, "window_start": "", "window_end": "", "window_split": "none", "window_compensation": 0, "ingest_source": "", "store_id": "", "pg_dsn": "", "api_token": "", }, } # 加载设置 self._load() # 如果没有配置,尝试自动检测 if not self._settings["etl_project_path"]: self._auto_detect_paths() def _get_settings_path(self) -> Path: """获取设置文件路径""" # 优先使用用户目录 if sys.platform == "win32": app_data = os.environ.get("APPDATA", "") if app_data: settings_dir = Path(app_data) / "ETL管理系统" else: settings_dir = Path.home() / ".etl_gui" else: settings_dir = Path.home() / ".etl_gui" settings_dir.mkdir(parents=True, exist_ok=True) return settings_dir / "settings.json" def _auto_detect_paths(self): """自动检测 ETL 项目路径""" # 方法1: 检查是否从源码目录运行 try: source_dir = Path(__file__).resolve().parents[2] cli_main = source_dir / "cli" / "main.py" if cli_main.exists(): rel_source = Path(os.path.relpath(source_dir, Path.cwd())) self._settings["etl_project_path"] = str(rel_source) env_file = rel_source / ".env" if env_file.exists(): self._settings["env_file_path"] = str(env_file) self._save() return except Exception: pass # 方法2: 检查常见位置 common_paths = [ Path("etl_billiards"), Path("."), ] for path in common_paths: if path.exists() and (path / "cli" / "main.py").exists(): self._settings["etl_project_path"] = str(path) env_file = path / ".env" if env_file.exists(): self._settings["env_file_path"] = str(env_file) self._save() return def _load(self): """加载设置""" if self._settings_file.exists(): try: data = json.loads(self._settings_file.read_text(encoding="utf-8")) self._settings.update(data) except Exception: pass def _save(self): """保存设置""" try: self._settings_file.write_text( json.dumps(self._settings, ensure_ascii=False, indent=2), encoding="utf-8" ) except Exception: pass @property def etl_project_path(self) -> str: """获取 ETL 项目路径""" return self._settings.get("etl_project_path", "") @etl_project_path.setter def etl_project_path(self, value: str): """设置 ETL 项目路径""" self._settings["etl_project_path"] = value # 同时更新 .env 路径 if value: env_path = Path(value) / ".env" if env_path.exists(): self._settings["env_file_path"] = str(env_path) self._save() @property def env_file_path(self) -> str: """获取 .env 文件路径""" path = self._settings.get("env_file_path", "") if not path and self.etl_project_path: path = str(Path(self.etl_project_path) / ".env") return path @env_file_path.setter def env_file_path(self, value: str): """设置 .env 文件路径""" self._settings["env_file_path"] = value self._save() def is_configured(self) -> bool: """检查是否已配置""" path = self.etl_project_path if not path: return False return Path(path).exists() and (Path(path) / "cli" / "main.py").exists() def validate(self) -> tuple[bool, str]: """验证配置""" path = self.etl_project_path if not path: return False, "未配置 ETL 项目路径" project_path = Path(path) if not project_path.exists(): return False, f"ETL 项目路径不存在: {path}" cli_main = project_path / "cli" / "main.py" if not cli_main.exists(): return False, f"找不到 CLI 入口: {cli_main}" return True, "配置有效" # ==================== 自动更新配置 ==================== @property def auto_update_hours(self) -> int: return self._settings.get("auto_update", {}).get("hours", 24) @auto_update_hours.setter def auto_update_hours(self, value: int): self._settings.setdefault("auto_update", {})["hours"] = value self._save() @property def auto_update_overlap_seconds(self) -> int: return self._settings.get("auto_update", {}).get("overlap_seconds", 3600) @auto_update_overlap_seconds.setter def auto_update_overlap_seconds(self, value: int): self._settings.setdefault("auto_update", {})["overlap_seconds"] = value self._save() @property def auto_update_include_dwd(self) -> bool: return self._settings.get("auto_update", {}).get("include_dwd", True) @auto_update_include_dwd.setter def auto_update_include_dwd(self, value: bool): self._settings.setdefault("auto_update", {})["include_dwd"] = value self._save() @property def auto_update_auto_verify(self) -> bool: return self._settings.get("auto_update", {}).get("auto_verify", False) @auto_update_auto_verify.setter def auto_update_auto_verify(self, value: bool): self._settings.setdefault("auto_update", {})["auto_verify"] = value self._save() @property def auto_update_selected_tasks(self) -> list: return self._settings.get("auto_update", {}).get("selected_tasks", []) @auto_update_selected_tasks.setter def auto_update_selected_tasks(self, value: list): self._settings.setdefault("auto_update", {})["selected_tasks"] = value self._save() # ==================== 数据校验配置 ==================== @property def integrity_mode(self) -> str: return self._settings.get("integrity_check", {}).get("mode", "history") @integrity_mode.setter def integrity_mode(self, value: str): self._settings.setdefault("integrity_check", {})["mode"] = value self._save() @property def integrity_history_start(self) -> str: return self._settings.get("integrity_check", {}).get("history_start", "") @integrity_history_start.setter def integrity_history_start(self, value: str): self._settings.setdefault("integrity_check", {})["history_start"] = value self._save() @property def integrity_history_end(self) -> str: return self._settings.get("integrity_check", {}).get("history_end", "") @integrity_history_end.setter def integrity_history_end(self, value: str): self._settings.setdefault("integrity_check", {})["history_end"] = value self._save() @property def integrity_lookback_hours(self) -> int: return self._settings.get("integrity_check", {}).get("lookback_hours", 24) @integrity_lookback_hours.setter def integrity_lookback_hours(self, value: int): self._settings.setdefault("integrity_check", {})["lookback_hours"] = value self._save() @property def integrity_include_dimensions(self) -> bool: return self._settings.get("integrity_check", {}).get("include_dimensions", False) @integrity_include_dimensions.setter def integrity_include_dimensions(self, value: bool): self._settings.setdefault("integrity_check", {})["include_dimensions"] = value self._save() @property def integrity_auto_backfill(self) -> bool: return self._settings.get("integrity_check", {}).get("auto_backfill", False) @integrity_auto_backfill.setter def integrity_auto_backfill(self, value: bool): self._settings.setdefault("integrity_check", {})["auto_backfill"] = value self._save() @property def integrity_ods_tasks(self) -> str: return self._settings.get("integrity_check", {}).get("ods_tasks", "") @integrity_ods_tasks.setter def integrity_ods_tasks(self, value: str): self._settings.setdefault("integrity_check", {})["ods_tasks"] = value self._save() # ==================== 高级配置 ==================== @property def advanced_pipeline_flow(self) -> str: return self._settings.get("advanced", {}).get("pipeline_flow", "FULL") @advanced_pipeline_flow.setter def advanced_pipeline_flow(self, value: str): self._settings.setdefault("advanced", {})["pipeline_flow"] = value self._save() @property def advanced_dry_run(self) -> bool: return self._settings.get("advanced", {}).get("dry_run", False) @advanced_dry_run.setter def advanced_dry_run(self, value: bool): self._settings.setdefault("advanced", {})["dry_run"] = value self._save() @property def advanced_window_start(self) -> str: return self._settings.get("advanced", {}).get("window_start", "") @advanced_window_start.setter def advanced_window_start(self, value: str): self._settings.setdefault("advanced", {})["window_start"] = value self._save() @property def advanced_window_end(self) -> str: return self._settings.get("advanced", {}).get("window_end", "") @advanced_window_end.setter def advanced_window_end(self, value: str): self._settings.setdefault("advanced", {})["window_end"] = value self._save() @property def advanced_ingest_source(self) -> str: return self._settings.get("advanced", {}).get("ingest_source", "") @advanced_ingest_source.setter def advanced_ingest_source(self, value: str): self._settings.setdefault("advanced", {})["ingest_source"] = value self._save() @property def advanced_window_split(self) -> str: return self._settings.get("advanced", {}).get("window_split", "none") @advanced_window_split.setter def advanced_window_split(self, value: str): self._settings.setdefault("advanced", {})["window_split"] = value self._save() @property def advanced_window_compensation(self) -> int: return self._settings.get("advanced", {}).get("window_compensation", 0) @advanced_window_compensation.setter def advanced_window_compensation(self, value: int): self._settings.setdefault("advanced", {})["window_compensation"] = value self._save() def get_all_settings(self) -> Dict[str, Any]: """获取所有设置(用于调试)""" return self._settings.copy() def save_all(self): """强制保存所有设置""" self._save() # ==================== 任务历史存储 ==================== def _get_history_path(self) -> Path: """获取任务历史文件路径""" return self._settings_file.parent / "task_history.json" def save_task_history(self, history_list: list): """保存任务历史到文件""" try: history_path = self._get_history_path() # 序列化任务历史 serialized = [] for task in history_list[:100]: # 最多保存100条 try: task_data = { "id": task.id, "tasks": task.config.tasks if hasattr(task, 'config') else [], "status": task.status.value if hasattr(task.status, 'value') else str(task.status), "created_at": task.created_at.isoformat() if task.created_at else None, "started_at": task.started_at.isoformat() if task.started_at else None, "finished_at": task.finished_at.isoformat() if task.finished_at else None, "exit_code": task.exit_code, "error": task.error[:500] if task.error else "", # 限制长度 "output_preview": task.output[:1000] if task.output else "", # 输出预览 # 保存配置信息 "pipeline_flow": task.config.pipeline_flow if hasattr(task, 'config') else "FULL", "window_start": task.config.window_start if hasattr(task, 'config') else None, "window_end": task.config.window_end if hasattr(task, 'config') else None, } serialized.append(task_data) except Exception: continue history_path.write_text( json.dumps(serialized, ensure_ascii=False, indent=2), encoding="utf-8" ) except Exception as e: print(f"保存任务历史失败: {e}") def load_task_history(self) -> list: """从文件加载任务历史""" try: history_path = self._get_history_path() if not history_path.exists(): return [] data = json.loads(history_path.read_text(encoding="utf-8")) return data except Exception as e: print(f"加载任务历史失败: {e}") return [] # 全局单例 app_settings = AppSettings()