# -*- coding: utf-8 -*- """任务执行工作线程""" import subprocess import sys import os from pathlib import Path from typing import List, Optional, Dict from PySide6.QtCore import QThread, Signal from ..utils.app_settings import app_settings class TaskWorker(QThread): """任务执行工作线程""" # 信号 output_received = Signal(str) # 收到输出行 task_finished = Signal(int, str) # 任务完成 (exit_code, summary) error_occurred = Signal(str) # 发生错误 progress_updated = Signal(int, int) # 进度更新 (current, total) def __init__(self, command: List[str], working_dir: Optional[str] = None, extra_env: Optional[Dict[str, str]] = None, parent=None): super().__init__(parent) self.command = command self.extra_env = extra_env or {} # 工作目录优先级: 参数 > 应用设置 > 自动检测 if working_dir is not None: self.working_dir = working_dir elif app_settings.etl_project_path: self.working_dir = app_settings.etl_project_path else: # 回退到源码目录 self.working_dir = str(Path(__file__).resolve().parents[2]) self.process: Optional[subprocess.Popen] = None self._stop_requested = False self._exit_code: Optional[int] = None self._output_lines: List[str] = [] def run(self): """执行任务""" try: self._stop_requested = False self._output_lines = [] # 设置环境变量 env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" env["PYTHONUNBUFFERED"] = "1" # 添加项目根目录到 PYTHONPATH project_root = self.working_dir existing_path = env.get("PYTHONPATH", "") if existing_path: env["PYTHONPATH"] = f"{project_root}{os.pathsep}{existing_path}" else: env["PYTHONPATH"] = project_root # 添加额外的环境变量 if self.extra_env: for key, value in self.extra_env.items(): env[key] = str(value) self.output_received.emit(f"[环境变量] {key}={value}") self.output_received.emit(f"[工作目录] {self.working_dir}") self.output_received.emit(f"[执行命令] {' '.join(self.command)}") # 启动进程 self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", cwd=self.working_dir, env=env, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) # 读取输出 if self.process.stdout: for line in iter(self.process.stdout.readline, ""): if self._stop_requested: break line = line.rstrip("\n\r") if line: self._output_lines.append(line) self.output_received.emit(line) # 解析进度信息(如果有) self._parse_progress(line) # 等待进程结束 if self.process: self.process.wait() self._exit_code = self.process.returncode # 生成摘要 summary = self._generate_summary() self.task_finished.emit(self._exit_code or 0, summary) except FileNotFoundError as e: self.error_occurred.emit(f"找不到 Python 解释器: {e}") self.task_finished.emit(-1, f"执行失败: {e}") except Exception as e: self.error_occurred.emit(f"执行出错: {e}") self.task_finished.emit(-1, f"执行失败: {e}") finally: self.process = None def stop(self): """停止任务""" self._stop_requested = True if self.process: try: self.process.terminate() # 给进程一些时间来终止 try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() except Exception: pass def _parse_progress(self, line: str): """解析进度信息""" # 尝试从日志中解析进度 # 示例: "[INFO] 处理进度: 50/100" import re match = re.search(r'进度[:\s]*(\d+)/(\d+)', line) if match: current = int(match.group(1)) total = int(match.group(2)) self.progress_updated.emit(current, total) def _generate_summary(self) -> str: """生成执行摘要""" if not self._output_lines: return "无输出" return self._parse_detailed_summary() def _parse_detailed_summary(self) -> str: """解析详细的执行摘要""" import re import json summary_parts = [] # 统计各类信息 ods_stats = [] # ODS 抓取统计 dwd_stats = [] # DWD 装载统计 integrity_stats = {} # 数据校验统计 errors = [] # 错误信息 task_results = [] # 任务结果 for line in self._output_lines: # 1. 解析 ODS 抓取完成信息 # 格式: "xxx: 抓取完成,文件=xxx,记录数=123" match = re.search(r'(\w+): 抓取完成.*记录数[=:]\s*(\d+)', line) if match: task_name = match.group(1) record_count = int(match.group(2)) if record_count > 0: ods_stats.append(f"{task_name}: {record_count}条") continue # 2. 解析 DWD 装载完成信息 # 格式: "DWD 装载完成:xxx,用时 1.02s" match = re.search(r'DWD 装载完成[::]\s*(\S+).*用时\s*([\d.]+)s', line) if match: table_name = match.group(1).replace('billiards_dwd.', '') continue # 3. 解析任务完成统计 (JSON格式) # 格式: "xxx: 完成,统计={'tables': [...]}" if "完成,统计=" in line or "完成,统计=" in line: try: match = re.search(r"统计=(\{.+\})", line) if match: stats_str = match.group(1).replace("'", '"') stats = json.loads(stats_str) # 解析 DWD 装载统计 if 'tables' in stats: total_processed = 0 total_inserted = 0 tables_with_data = [] for tbl in stats['tables']: table_name = tbl.get('table', '').replace('billiards_dwd.', '') processed = tbl.get('processed', 0) inserted = tbl.get('inserted', 0) if processed > 0: total_processed += processed tables_with_data.append(f"{table_name}({processed})") elif inserted > 0: total_inserted += inserted tables_with_data.append(f"{table_name}(+{inserted})") if total_processed > 0 or total_inserted > 0: dwd_stats.append(f"处理维度: {total_processed}条, 新增事实: {total_inserted}条") if len(tables_with_data) <= 5: dwd_stats.append(f"涉及表: {', '.join(tables_with_data)}") else: dwd_stats.append(f"涉及 {len(tables_with_data)} 张表") except Exception: pass continue # 4. 解析数据校验结果 # 格式: "CHECK_DONE task=xxx missing=1 records=136 errors=0" match = re.search(r'CHECK_DONE task=(\w+) missing=(\d+) records=(\d+)', line) if match: task_name = match.group(1) missing = int(match.group(2)) records = int(match.group(3)) if missing > 0: if 'missing_tasks' not in integrity_stats: integrity_stats['missing_tasks'] = [] integrity_stats['missing_tasks'].append(f"{task_name}: 缺失{missing}/{records}") integrity_stats['total_records'] = integrity_stats.get('total_records', 0) + records integrity_stats['total_missing'] = integrity_stats.get('total_missing', 0) + missing continue # 5. 解析数据校验最终结果 # 格式: "结果统计: {'missing': 463, 'errors': 0, 'backfilled': 0}" if "结果统计:" in line or "结果统计:" in line: try: match = re.search(r"\{.+\}", line) if match: stats_str = match.group(0).replace("'", '"') stats = json.loads(stats_str) integrity_stats['final_missing'] = stats.get('missing', 0) integrity_stats['final_errors'] = stats.get('errors', 0) integrity_stats['backfilled'] = stats.get('backfilled', 0) except Exception: pass continue # 6. 解析错误信息 if "[ERROR]" in line or "错误" in line.lower() or "error" in line.lower(): if "Traceback" not in line and "File " not in line: errors.append(line.strip()[:100]) # 7. 解析任务完成信息 if "任务执行成功" in line or "ETL运行完成" in line: task_results.append("✓ " + line.split("]")[-1].strip() if "]" in line else line.strip()) elif "任务执行失败" in line: task_results.append("✗ " + line.split("]")[-1].strip() if "]" in line else line.strip()) # 构建摘要 if ods_stats: summary_parts.append("【ODS 抓取】" + ", ".join(ods_stats[:5])) if len(ods_stats) > 5: summary_parts[-1] += f" 等{len(ods_stats)}项" if dwd_stats: summary_parts.append("【DWD 装载】" + "; ".join(dwd_stats)) if integrity_stats: total_missing = integrity_stats.get('final_missing', integrity_stats.get('total_missing', 0)) total_records = integrity_stats.get('total_records', 0) backfilled = integrity_stats.get('backfilled', 0) int_summary = f"【数据校验】检查 {total_records} 条记录" if total_missing > 0: int_summary += f", 发现 {total_missing} 条缺失" if backfilled > 0: int_summary += f", 已补全 {backfilled} 条" else: int_summary += ", 数据完整" summary_parts.append(int_summary) # 显示缺失详情 if integrity_stats.get('missing_tasks'): missing_detail = integrity_stats['missing_tasks'][:3] summary_parts.append(" 缺失: " + "; ".join(missing_detail)) if len(integrity_stats['missing_tasks']) > 3: summary_parts[-1] += f" 等{len(integrity_stats['missing_tasks'])}项" if errors: summary_parts.append("【错误】" + "; ".join(errors[:3])) if task_results: summary_parts.append("【结果】" + " | ".join(task_results)) if summary_parts: return "\n".join(summary_parts) # 如果没有解析到任何信息,返回最后几行关键信息 key_lines = [] for line in self._output_lines[-10:]: if "完成" in line or "成功" in line or "失败" in line: key_lines.append(line.strip()[:80]) if key_lines: return "\n".join(key_lines[-3:]) return self._output_lines[-1] if self._output_lines else "执行完成" @property def exit_code(self) -> Optional[int]: """获取退出码""" return self._exit_code @property def output(self) -> str: """获取完整输出""" return "\n".join(self._output_lines)