From 8b98fcea1f12393fc59fa445293e957472c56efa Mon Sep 17 00:00:00 2001 From: Neo Date: Mon, 19 Jan 2026 22:37:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=8E=AF=E5=A2=83=E5=8F=98?= =?UTF-8?q?=E9=87=8F=E6=98=A0=E5=B0=84=EF=BC=8C=E6=94=AF=E6=8C=81=E5=AE=8C?= =?UTF-8?q?=E6=95=B4=E6=80=A7=E6=A3=80=E6=9F=A5=E4=BB=BB=E5=8A=A1=E5=92=8C?= =?UTF-8?q?=E5=B7=A5=E5=85=B7=E7=B1=BB=E4=BB=BB=E5=8A=A1=E7=9A=84=E6=89=A7?= =?UTF-8?q?=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl_billiards/config/env_parser.py | 9 ++ etl_billiards/gui/models/task_model.py | 179 +++++++++++++++++++++++ etl_billiards/gui/workers/task_worker.py | 175 ++++++++++++++++++++++ etl_billiards/orchestration/scheduler.py | 95 ++++++++++++ etl_billiards/requirements.txt | 7 + 5 files changed, 465 insertions(+) create mode 100644 etl_billiards/gui/models/task_model.py create mode 100644 etl_billiards/gui/workers/task_worker.py diff --git a/etl_billiards/config/env_parser.py b/etl_billiards/config/env_parser.py index 787bda6..1b1aee5 100644 --- a/etl_billiards/config/env_parser.py +++ b/etl_billiards/config/env_parser.py @@ -42,11 +42,20 @@ ENV_MAP = { "ALLOW_EMPTY_ADVANCE": ("run.allow_empty_result_advance",), "WINDOW_START": ("run.window_override.start",), "WINDOW_END": ("run.window_override.end",), + "WINDOW_SPLIT_UNIT": ("run.window_split.unit",), + "WINDOW_COMPENSATION_HOURS": ("run.window_split.compensation_hours",), "PIPELINE_FLOW": ("pipeline.flow",), "JSON_FETCH_ROOT": ("pipeline.fetch_root",), "JSON_SOURCE_DIR": ("pipeline.ingest_source_dir",), "FETCH_ROOT": ("pipeline.fetch_root",), "INGEST_SOURCE_DIR": ("pipeline.ingest_source_dir",), + "INTEGRITY_MODE": ("integrity.mode",), + "INTEGRITY_HISTORY_START": ("integrity.history_start",), + "INTEGRITY_HISTORY_END": ("integrity.history_end",), + "INTEGRITY_INCLUDE_DIMENSIONS": ("integrity.include_dimensions",), + "INTEGRITY_AUTO_CHECK": ("integrity.auto_check",), + "INTEGRITY_AUTO_BACKFILL": ("integrity.auto_backfill",), + "INTEGRITY_ODS_TASK_CODES": ("integrity.ods_task_codes",), } diff --git a/etl_billiards/gui/models/task_model.py b/etl_billiards/gui/models/task_model.py new file mode 100644 index 0000000..a9732b5 --- /dev/null +++ b/etl_billiards/gui/models/task_model.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +"""任务数据模型""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Optional, List, Dict, Any + + +class TaskStatus(Enum): + """任务状态枚举""" + PENDING = "pending" # 待执行 + RUNNING = "running" # 执行中 + SUCCESS = "success" # 成功 + FAILED = "failed" # 失败 + CANCELLED = "cancelled" # 已取消 + + +class TaskCategory(Enum): + """任务分类""" + ODS = "ODS" # ODS 数据抓取任务 + DWD = "DWD" # DWD 装载任务 + DWS = "DWS" # DWS 汇总任务 + SCHEMA = "Schema" # Schema 初始化任务 + QUALITY = "Quality" # 质量检查任务 + OTHER = "Other" # 其他任务 + + +# 任务分类映射 +TASK_CATEGORIES: Dict[str, TaskCategory] = { + # ODS 任务 + "ODS_PAYMENT": TaskCategory.ODS, + "ODS_MEMBER": TaskCategory.ODS, + "ODS_MEMBER_CARD": TaskCategory.ODS, + "ODS_MEMBER_BALANCE": TaskCategory.ODS, + "ODS_SETTLEMENT_RECORDS": TaskCategory.ODS, + "ODS_TABLE_USE": TaskCategory.ODS, + "ODS_ASSISTANT_ACCOUNT": TaskCategory.ODS, + "ODS_ASSISTANT_LEDGER": TaskCategory.ODS, + "ODS_ASSISTANT_ABOLISH": TaskCategory.ODS, + "ODS_REFUND": TaskCategory.ODS, + "ODS_PLATFORM_COUPON": TaskCategory.ODS, + "ODS_RECHARGE_SETTLE": TaskCategory.ODS, + "ODS_GROUP_PACKAGE": TaskCategory.ODS, + "ODS_GROUP_BUY_REDEMPTION": TaskCategory.ODS, + "ODS_INVENTORY_STOCK": TaskCategory.ODS, + "ODS_INVENTORY_CHANGE": TaskCategory.ODS, + "ODS_TABLES": TaskCategory.ODS, + "ODS_GOODS_CATEGORY": TaskCategory.ODS, + "ODS_STORE_GOODS": TaskCategory.ODS, + "ODS_STORE_GOODS_SALES": TaskCategory.ODS, + "ODS_TABLE_FEE_DISCOUNT": TaskCategory.ODS, + "ODS_TENANT_GOODS": TaskCategory.ODS, + "ODS_SETTLEMENT_TICKET": TaskCategory.ODS, + # DWD 任务 + "DWD_LOAD_FROM_ODS": TaskCategory.DWD, + "DWD_QUALITY_CHECK": TaskCategory.QUALITY, + "PAYMENTS_DWD": TaskCategory.DWD, + "MEMBERS_DWD": TaskCategory.DWD, + "TICKET_DWD": TaskCategory.DWD, + # DWS 任务 + "INIT_DWS_SCHEMA": TaskCategory.SCHEMA, + "DWS_BUILD_ORDER_SUMMARY": TaskCategory.DWS, + # Schema 任务 + "INIT_ODS_SCHEMA": TaskCategory.SCHEMA, + "INIT_DWD_SCHEMA": TaskCategory.SCHEMA, + # 其他任务 + "MANUAL_INGEST": TaskCategory.OTHER, + "CHECK_CUTOFF": TaskCategory.OTHER, + "DATA_INTEGRITY_CHECK": TaskCategory.QUALITY, + "ODS_JSON_ARCHIVE": TaskCategory.OTHER, + # 旧版任务(兼容) + "PRODUCTS": TaskCategory.ODS, + "TABLES": TaskCategory.ODS, + "MEMBERS": TaskCategory.ODS, + "ASSISTANTS": TaskCategory.ODS, + "PACKAGES_DEF": TaskCategory.ODS, + "ORDERS": TaskCategory.ODS, + "PAYMENTS": TaskCategory.ODS, + "REFUNDS": TaskCategory.ODS, + "COUPON_USAGE": TaskCategory.ODS, + "INVENTORY_CHANGE": TaskCategory.ODS, + "TOPUPS": TaskCategory.ODS, + "TABLE_DISCOUNT": TaskCategory.ODS, + "ASSISTANT_ABOLISH": TaskCategory.ODS, + "LEDGER": TaskCategory.ODS, +} + + +def get_task_category(task_code: str) -> TaskCategory: + """获取任务分类""" + return TASK_CATEGORIES.get(task_code.upper(), TaskCategory.OTHER) + + +@dataclass +class TaskItem: + """任务项""" + task_code: str + name: str = "" + description: str = "" + category: TaskCategory = TaskCategory.OTHER + enabled: bool = True + + def __post_init__(self): + if not self.name: + self.name = self.task_code + if not self.category or self.category == TaskCategory.OTHER: + self.category = get_task_category(self.task_code) + + +@dataclass +class TaskConfig: + """任务执行配置""" + tasks: List[str] = field(default_factory=list) + pipeline_flow: str = "FULL" # FULL, FETCH_ONLY, INGEST_ONLY + dry_run: bool = False + window_start: Optional[str] = None + window_end: Optional[str] = None + window_split: Optional[str] = None # none, month + window_compensation: int = 0 # 补偿小时数 + ingest_source: Optional[str] = None + store_id: Optional[int] = None + pg_dsn: Optional[str] = None + api_token: Optional[str] = None + extra_args: Dict[str, Any] = field(default_factory=dict) + env_vars: Dict[str, str] = field(default_factory=dict) # 额外环境变量 + + +@dataclass +class TaskHistory: + """任务执行历史""" + id: str + task_codes: List[str] + status: TaskStatus + start_time: datetime + end_time: Optional[datetime] = None + exit_code: Optional[int] = None + command: str = "" + output_log: str = "" + error_message: str = "" + summary: Dict[str, Any] = field(default_factory=dict) + + @property + def duration_seconds(self) -> Optional[float]: + """执行时长(秒)""" + if self.end_time and self.start_time: + return (self.end_time - self.start_time).total_seconds() + return None + + @property + def duration_str(self) -> str: + """格式化的执行时长""" + secs = self.duration_seconds + if secs is None: + return "-" + if secs < 60: + return f"{secs:.1f}秒" + elif secs < 3600: + mins = int(secs // 60) + secs = secs % 60 + return f"{mins}分{secs:.0f}秒" + else: + hours = int(secs // 3600) + mins = int((secs % 3600) // 60) + return f"{hours}时{mins}分" + + +@dataclass +class QueuedTask: + """队列中的任务""" + id: str + config: TaskConfig + status: TaskStatus = TaskStatus.PENDING + created_at: datetime = field(default_factory=datetime.now) + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + output: str = "" + error: str = "" + exit_code: Optional[int] = None diff --git a/etl_billiards/gui/workers/task_worker.py b/etl_billiards/gui/workers/task_worker.py new file mode 100644 index 0000000..63a44bc --- /dev/null +++ b/etl_billiards/gui/workers/task_worker.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +"""任务执行工作线程""" + +import subprocess +import sys +import os +from pathlib import Path +from typing import List, Optional, Dict + +from PySide6.QtCore import QThread, Signal + +from ..utils.app_settings import app_settings + + +class TaskWorker(QThread): + """任务执行工作线程""" + + # 信号 + output_received = Signal(str) # 收到输出行 + task_finished = Signal(int, str) # 任务完成 (exit_code, summary) + error_occurred = Signal(str) # 发生错误 + progress_updated = Signal(int, int) # 进度更新 (current, total) + + def __init__(self, command: List[str], working_dir: Optional[str] = None, + extra_env: Optional[Dict[str, str]] = None, parent=None): + super().__init__(parent) + self.command = command + self.extra_env = extra_env or {} + + # 工作目录优先级: 参数 > 应用设置 > 自动检测 + if working_dir is not None: + self.working_dir = working_dir + elif app_settings.etl_project_path: + self.working_dir = app_settings.etl_project_path + else: + # 回退到源码目录 + self.working_dir = str(Path(__file__).resolve().parents[2]) + + self.process: Optional[subprocess.Popen] = None + self._stop_requested = False + self._exit_code: Optional[int] = None + self._output_lines: List[str] = [] + + def run(self): + """执行任务""" + try: + self._stop_requested = False + self._output_lines = [] + + # 设置环境变量 + env = os.environ.copy() + env["PYTHONIOENCODING"] = "utf-8" + env["PYTHONUNBUFFERED"] = "1" + + # 添加项目根目录到 PYTHONPATH + project_root = self.working_dir + existing_path = env.get("PYTHONPATH", "") + if existing_path: + env["PYTHONPATH"] = f"{project_root}{os.pathsep}{existing_path}" + else: + env["PYTHONPATH"] = project_root + + # 添加额外的环境变量 + if self.extra_env: + for key, value in self.extra_env.items(): + env[key] = str(value) + self.output_received.emit(f"[环境变量] {key}={value}") + + self.output_received.emit(f"[工作目录] {self.working_dir}") + self.output_received.emit(f"[执行命令] {' '.join(self.command)}") + + # 启动进程 + self.process = subprocess.Popen( + self.command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + errors="replace", + cwd=self.working_dir, + env=env, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, + ) + + # 读取输出 + if self.process.stdout: + for line in iter(self.process.stdout.readline, ""): + if self._stop_requested: + break + + line = line.rstrip("\n\r") + if line: + self._output_lines.append(line) + self.output_received.emit(line) + + # 解析进度信息(如果有) + self._parse_progress(line) + + # 等待进程结束 + if self.process: + self.process.wait() + self._exit_code = self.process.returncode + + # 生成摘要 + summary = self._generate_summary() + self.task_finished.emit(self._exit_code or 0, summary) + + except FileNotFoundError as e: + self.error_occurred.emit(f"找不到 Python 解释器: {e}") + self.task_finished.emit(-1, f"执行失败: {e}") + except Exception as e: + self.error_occurred.emit(f"执行出错: {e}") + self.task_finished.emit(-1, f"执行失败: {e}") + finally: + self.process = None + + def stop(self): + """停止任务""" + self._stop_requested = True + if self.process: + try: + self.process.terminate() + # 给进程一些时间来终止 + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + except Exception: + pass + + def _parse_progress(self, line: str): + """解析进度信息""" + # 尝试从日志中解析进度 + # 示例: "[INFO] 处理进度: 50/100" + import re + match = re.search(r'进度[:\s]*(\d+)/(\d+)', line) + if match: + current = int(match.group(1)) + total = int(match.group(2)) + self.progress_updated.emit(current, total) + + def _generate_summary(self) -> str: + """生成执行摘要""" + if not self._output_lines: + return "无输出" + + # 查找关键信息 + summary_parts = [] + + for line in self._output_lines[-20:]: # 只看最后 20 行 + line_lower = line.lower() + if "success" in line_lower or "完成" in line or "成功" in line: + summary_parts.append(line) + elif "error" in line_lower or "失败" in line or "错误" in line: + summary_parts.append(line) + elif "inserted" in line_lower or "updated" in line_lower: + summary_parts.append(line) + elif "fetched" in line_lower or "抓取" in line: + summary_parts.append(line) + + if summary_parts: + return "\n".join(summary_parts[-5:]) # 最多返回 5 行 + + # 如果没有找到关键信息,返回最后一行 + return self._output_lines[-1] if self._output_lines else "执行完成" + + @property + def exit_code(self) -> Optional[int]: + """获取退出码""" + return self._exit_code + + @property + def output(self) -> str: + """获取完整输出""" + return "\n".join(self._output_lines) diff --git a/etl_billiards/orchestration/scheduler.py b/etl_billiards/orchestration/scheduler.py index 0faaf0a..c499a45 100644 --- a/etl_billiards/orchestration/scheduler.py +++ b/etl_billiards/orchestration/scheduler.py @@ -129,6 +129,13 @@ class ETLScheduler: # ------------------------------------------------------------------ internals def _run_single_task(self, task_code: str, run_uuid: str, store_id: int): """单个任务的抓取/清洗编排。""" + task_code_upper = task_code.upper() + + # 工具类任务:直接执行,不记录 cursor/run + if task_code_upper in self.NO_DB_CONFIG_TASKS: + self._run_utility_task(task_code_upper, store_id) + return + task_cfg = self._load_task_config(task_code, store_id) if not task_cfg: self.logger.warning("任务 %s 未启用或不存在", task_code) @@ -183,6 +190,7 @@ class ETLScheduler: window_end=window.get("end"), run_id=run_id, ) + self._maybe_run_integrity_check(task_code, window) return if self._flow_includes_fetch(): @@ -221,6 +229,7 @@ class ETLScheduler: window_end=window.get("end"), run_id=run_id, ) + self._maybe_run_integrity_check(task_code, window) except Exception as exc: # noqa: BLE001 self.run_tracker.update_run( @@ -322,6 +331,53 @@ class ETLScheduler: def _flow_includes_ingest(self) -> bool: return self.pipeline_flow in {"INGEST_ONLY", "FULL"} + # 不需要数据库配置即可运行的任务(工具类/初始化类) + NO_DB_CONFIG_TASKS = { + # Schema 初始化任务 + "INIT_ODS_SCHEMA", + "INIT_DWD_SCHEMA", + "INIT_DWS_SCHEMA", + # 质量检查任务 + "DATA_INTEGRITY_CHECK", + "DWD_QUALITY_CHECK", + # 工具任务 + "CHECK_CUTOFF", + "MANUAL_INGEST", + "ODS_JSON_ARCHIVE", + # DWS 汇总任务 + "DWS_BUILD_ORDER_SUMMARY", + } + + def _run_utility_task(self, task_code: str, store_id: int): + """ + 执行工具类任务(不记录 cursor/run,直接执行)。 + 这些任务不需要游标管理和运行跟踪。 + """ + self.logger.info("%s: 开始执行工具类任务", task_code) + + try: + # 创建任务实例(不需要 API client,使用 None) + task = self.task_registry.create_task( + task_code, self.config, self.db_ops, None, self.logger + ) + + # 执行任务(工具类任务通常不需要 cursor_data) + result = task.execute(None) + + status = (result.get("status") or "").upper() if isinstance(result, dict) else "SUCCESS" + if status == "SUCCESS": + self.logger.info("%s: 工具类任务执行成功", task_code) + if isinstance(result, dict): + counts = result.get("counts", {}) + if counts: + self.logger.info("%s: 结果统计: %s", task_code, counts) + else: + self.logger.warning("%s: 工具类任务执行结果: %s", task_code, status) + + except Exception as exc: + self.logger.error("%s: 工具类任务执行失败: %s", task_code, exc, exc_info=True) + raise + def _load_task_config(self, task_code: str, store_id: int) -> dict | None: """从数据库加载任务配置。""" sql = """ @@ -334,6 +390,45 @@ class ETLScheduler: rows = self.db_conn.query(sql, (store_id, task_code)) return rows[0] if rows else None + def _maybe_run_integrity_check(self, task_code: str, window: dict | None) -> None: + if not self.config.get("integrity.auto_check", False): + return + if str(task_code or "").upper() != "DWD_LOAD_FROM_ODS": + return + if not isinstance(window, dict): + return + window_start = window.get("start") + window_end = window.get("end") + if not window_start or not window_end: + return + + try: + from quality.integrity_checker import IntegrityWindow, run_integrity_window + + include_dimensions = bool(self.config.get("integrity.include_dimensions", False)) + task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip() + report = run_integrity_window( + cfg=self.config, + window=IntegrityWindow( + start=window_start, + end=window_end, + label="etl_window", + granularity="window", + ), + include_dimensions=include_dimensions, + task_codes=task_codes, + logger=self.logger, + write_report=True, + ) + self.logger.info( + "Integrity check done: report=%s missing=%s errors=%s", + report.get("report_path"), + report.get("api_to_ods", {}).get("total_missing"), + report.get("api_to_ods", {}).get("total_errors"), + ) + except Exception as exc: # noqa: BLE001 + self.logger.warning("Integrity check failed: %s", exc, exc_info=True) + def close(self): """关闭连接。""" self.db_conn.close() diff --git a/etl_billiards/requirements.txt b/etl_billiards/requirements.txt index 42c8889..2470799 100644 --- a/etl_billiards/requirements.txt +++ b/etl_billiards/requirements.txt @@ -3,3 +3,10 @@ psycopg2-binary>=2.9.0 requests>=2.28.0 python-dateutil>=2.8.0 tzdata>=2023.0 +flask>=2.3 + +# GUI 依赖 +PySide6>=6.5.0 + +# 打包工具 (可选,仅打包 EXE 时需要) +# pyinstaller>=6.0.0