318 lines
13 KiB
Python
318 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""任务执行工作线程"""
|
||
|
||
import subprocess
|
||
import sys
|
||
import os
|
||
from pathlib import Path
|
||
from typing import List, Optional, Dict
|
||
|
||
from PySide6.QtCore import QThread, Signal
|
||
|
||
from ..utils.app_settings import app_settings
|
||
|
||
|
||
class TaskWorker(QThread):
|
||
"""任务执行工作线程"""
|
||
|
||
# 信号
|
||
output_received = Signal(str) # 收到输出行
|
||
task_finished = Signal(int, str) # 任务完成 (exit_code, summary)
|
||
error_occurred = Signal(str) # 发生错误
|
||
progress_updated = Signal(int, int) # 进度更新 (current, total)
|
||
|
||
def __init__(self, command: List[str], working_dir: Optional[str] = None,
|
||
extra_env: Optional[Dict[str, str]] = None, parent=None):
|
||
super().__init__(parent)
|
||
self.command = command
|
||
self.extra_env = extra_env or {}
|
||
|
||
# 工作目录优先级: 参数 > 应用设置 > 自动检测
|
||
if working_dir is not None:
|
||
self.working_dir = working_dir
|
||
elif app_settings.etl_project_path:
|
||
self.working_dir = app_settings.etl_project_path
|
||
else:
|
||
# 回退到源码目录
|
||
self.working_dir = str(Path(__file__).resolve().parents[2])
|
||
|
||
self.process: Optional[subprocess.Popen] = None
|
||
self._stop_requested = False
|
||
self._exit_code: Optional[int] = None
|
||
self._output_lines: List[str] = []
|
||
|
||
def run(self):
|
||
"""执行任务"""
|
||
try:
|
||
self._stop_requested = False
|
||
self._output_lines = []
|
||
|
||
# 设置环境变量
|
||
env = os.environ.copy()
|
||
env["PYTHONIOENCODING"] = "utf-8"
|
||
env["PYTHONUNBUFFERED"] = "1"
|
||
|
||
# 添加项目根目录到 PYTHONPATH
|
||
project_root = self.working_dir
|
||
existing_path = env.get("PYTHONPATH", "")
|
||
if existing_path:
|
||
env["PYTHONPATH"] = f"{project_root}{os.pathsep}{existing_path}"
|
||
else:
|
||
env["PYTHONPATH"] = project_root
|
||
|
||
# 添加额外的环境变量
|
||
if self.extra_env:
|
||
for key, value in self.extra_env.items():
|
||
env[key] = str(value)
|
||
self.output_received.emit(f"[环境变量] {key}={value}")
|
||
|
||
self.output_received.emit(f"[工作目录] {self.working_dir}")
|
||
self.output_received.emit(f"[执行命令] {' '.join(self.command)}")
|
||
|
||
# 启动进程
|
||
self.process = subprocess.Popen(
|
||
self.command,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
cwd=self.working_dir,
|
||
env=env,
|
||
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
|
||
)
|
||
|
||
# 读取输出
|
||
if self.process.stdout:
|
||
for line in iter(self.process.stdout.readline, ""):
|
||
if self._stop_requested:
|
||
break
|
||
|
||
line = line.rstrip("\n\r")
|
||
if line:
|
||
self._output_lines.append(line)
|
||
self.output_received.emit(line)
|
||
|
||
# 解析进度信息(如果有)
|
||
self._parse_progress(line)
|
||
|
||
# 等待进程结束
|
||
if self.process:
|
||
self.process.wait()
|
||
self._exit_code = self.process.returncode
|
||
|
||
# 生成摘要
|
||
summary = self._generate_summary()
|
||
self.task_finished.emit(self._exit_code or 0, summary)
|
||
|
||
except FileNotFoundError as e:
|
||
self.error_occurred.emit(f"找不到 Python 解释器: {e}")
|
||
self.task_finished.emit(-1, f"执行失败: {e}")
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"执行出错: {e}")
|
||
self.task_finished.emit(-1, f"执行失败: {e}")
|
||
finally:
|
||
self.process = None
|
||
|
||
def stop(self):
|
||
"""停止任务"""
|
||
self._stop_requested = True
|
||
if self.process:
|
||
try:
|
||
self.process.terminate()
|
||
# 给进程一些时间来终止
|
||
try:
|
||
self.process.wait(timeout=5)
|
||
except subprocess.TimeoutExpired:
|
||
self.process.kill()
|
||
except Exception:
|
||
pass
|
||
|
||
def _parse_progress(self, line: str):
|
||
"""解析进度信息"""
|
||
# 尝试从日志中解析进度
|
||
# 示例: "[INFO] 处理进度: 50/100"
|
||
import re
|
||
match = re.search(r'进度[:\s]*(\d+)/(\d+)', line)
|
||
if match:
|
||
current = int(match.group(1))
|
||
total = int(match.group(2))
|
||
self.progress_updated.emit(current, total)
|
||
|
||
def _generate_summary(self) -> str:
|
||
"""生成执行摘要"""
|
||
if not self._output_lines:
|
||
return "无输出"
|
||
|
||
return self._parse_detailed_summary()
|
||
|
||
def _parse_detailed_summary(self) -> str:
|
||
"""解析详细的执行摘要"""
|
||
import re
|
||
import json
|
||
|
||
summary_parts = []
|
||
|
||
# 统计各类信息
|
||
ods_stats = [] # ODS 抓取统计
|
||
dwd_stats = [] # DWD 装载统计
|
||
integrity_stats = {} # 数据校验统计
|
||
errors = [] # 错误信息
|
||
task_results = [] # 任务结果
|
||
|
||
for line in self._output_lines:
|
||
# 1. 解析 ODS 抓取完成信息
|
||
# 格式: "xxx: 抓取完成,文件=xxx,记录数=123"
|
||
match = re.search(r'(\w+): 抓取完成.*记录数[=:]\s*(\d+)', line)
|
||
if match:
|
||
task_name = match.group(1)
|
||
record_count = int(match.group(2))
|
||
if record_count > 0:
|
||
ods_stats.append(f"{task_name}: {record_count}条")
|
||
continue
|
||
|
||
# 2. 解析 DWD 装载完成信息
|
||
# 格式: "DWD 装载完成:xxx,用时 1.02s"
|
||
match = re.search(r'DWD 装载完成[::]\s*(\S+).*用时\s*([\d.]+)s', line)
|
||
if match:
|
||
table_name = match.group(1).replace('billiards_dwd.', '')
|
||
continue
|
||
|
||
# 3. 解析任务完成统计 (JSON格式)
|
||
# 格式: "xxx: 完成,统计={'tables': [...]}"
|
||
if "完成,统计=" in line or "完成,统计=" in line:
|
||
try:
|
||
match = re.search(r"统计=(\{.+\})", line)
|
||
if match:
|
||
stats_str = match.group(1).replace("'", '"')
|
||
stats = json.loads(stats_str)
|
||
|
||
# 解析 DWD 装载统计
|
||
if 'tables' in stats:
|
||
total_processed = 0
|
||
total_inserted = 0
|
||
tables_with_data = []
|
||
|
||
for tbl in stats['tables']:
|
||
table_name = tbl.get('table', '').replace('billiards_dwd.', '')
|
||
processed = tbl.get('processed', 0)
|
||
inserted = tbl.get('inserted', 0)
|
||
|
||
if processed > 0:
|
||
total_processed += processed
|
||
tables_with_data.append(f"{table_name}({processed})")
|
||
elif inserted > 0:
|
||
total_inserted += inserted
|
||
tables_with_data.append(f"{table_name}(+{inserted})")
|
||
|
||
if total_processed > 0 or total_inserted > 0:
|
||
dwd_stats.append(f"处理维度: {total_processed}条, 新增事实: {total_inserted}条")
|
||
if len(tables_with_data) <= 5:
|
||
dwd_stats.append(f"涉及表: {', '.join(tables_with_data)}")
|
||
else:
|
||
dwd_stats.append(f"涉及 {len(tables_with_data)} 张表")
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
# 4. 解析数据校验结果
|
||
# 格式: "CHECK_DONE task=xxx missing=1 records=136 errors=0"
|
||
match = re.search(r'CHECK_DONE task=(\w+) missing=(\d+) records=(\d+)', line)
|
||
if match:
|
||
task_name = match.group(1)
|
||
missing = int(match.group(2))
|
||
records = int(match.group(3))
|
||
if missing > 0:
|
||
if 'missing_tasks' not in integrity_stats:
|
||
integrity_stats['missing_tasks'] = []
|
||
integrity_stats['missing_tasks'].append(f"{task_name}: 缺失{missing}/{records}")
|
||
integrity_stats['total_records'] = integrity_stats.get('total_records', 0) + records
|
||
integrity_stats['total_missing'] = integrity_stats.get('total_missing', 0) + missing
|
||
continue
|
||
|
||
# 5. 解析数据校验最终结果
|
||
# 格式: "结果统计: {'missing': 463, 'errors': 0, 'backfilled': 0}"
|
||
if "结果统计:" in line or "结果统计:" in line:
|
||
try:
|
||
match = re.search(r"\{.+\}", line)
|
||
if match:
|
||
stats_str = match.group(0).replace("'", '"')
|
||
stats = json.loads(stats_str)
|
||
integrity_stats['final_missing'] = stats.get('missing', 0)
|
||
integrity_stats['final_errors'] = stats.get('errors', 0)
|
||
integrity_stats['backfilled'] = stats.get('backfilled', 0)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
# 6. 解析错误信息
|
||
if "[ERROR]" in line or "错误" in line.lower() or "error" in line.lower():
|
||
if "Traceback" not in line and "File " not in line:
|
||
errors.append(line.strip()[:100])
|
||
|
||
# 7. 解析任务完成信息
|
||
if "任务执行成功" in line or "ETL运行完成" in line:
|
||
task_results.append("✓ " + line.split("]")[-1].strip() if "]" in line else line.strip())
|
||
elif "任务执行失败" in line:
|
||
task_results.append("✗ " + line.split("]")[-1].strip() if "]" in line else line.strip())
|
||
|
||
# 构建摘要
|
||
if ods_stats:
|
||
summary_parts.append("【ODS 抓取】" + ", ".join(ods_stats[:5]))
|
||
if len(ods_stats) > 5:
|
||
summary_parts[-1] += f" 等{len(ods_stats)}项"
|
||
|
||
if dwd_stats:
|
||
summary_parts.append("【DWD 装载】" + "; ".join(dwd_stats))
|
||
|
||
if integrity_stats:
|
||
total_missing = integrity_stats.get('final_missing', integrity_stats.get('total_missing', 0))
|
||
total_records = integrity_stats.get('total_records', 0)
|
||
backfilled = integrity_stats.get('backfilled', 0)
|
||
|
||
int_summary = f"【数据校验】检查 {total_records} 条记录"
|
||
if total_missing > 0:
|
||
int_summary += f", 发现 {total_missing} 条缺失"
|
||
if backfilled > 0:
|
||
int_summary += f", 已补全 {backfilled} 条"
|
||
else:
|
||
int_summary += ", 数据完整"
|
||
summary_parts.append(int_summary)
|
||
|
||
# 显示缺失详情
|
||
if integrity_stats.get('missing_tasks'):
|
||
missing_detail = integrity_stats['missing_tasks'][:3]
|
||
summary_parts.append(" 缺失: " + "; ".join(missing_detail))
|
||
if len(integrity_stats['missing_tasks']) > 3:
|
||
summary_parts[-1] += f" 等{len(integrity_stats['missing_tasks'])}项"
|
||
|
||
if errors:
|
||
summary_parts.append("【错误】" + "; ".join(errors[:3]))
|
||
|
||
if task_results:
|
||
summary_parts.append("【结果】" + " | ".join(task_results))
|
||
|
||
if summary_parts:
|
||
return "\n".join(summary_parts)
|
||
|
||
# 如果没有解析到任何信息,返回最后几行关键信息
|
||
key_lines = []
|
||
for line in self._output_lines[-10:]:
|
||
if "完成" in line or "成功" in line or "失败" in line:
|
||
key_lines.append(line.strip()[:80])
|
||
|
||
if key_lines:
|
||
return "\n".join(key_lines[-3:])
|
||
|
||
return self._output_lines[-1] if self._output_lines else "执行完成"
|
||
|
||
@property
|
||
def exit_code(self) -> Optional[int]:
|
||
"""获取退出码"""
|
||
return self._exit_code
|
||
|
||
@property
|
||
def output(self) -> str:
|
||
"""获取完整输出"""
|
||
return "\n".join(self._output_lines)
|