# -*- coding: utf-8 -*- """调度任务数据模型""" import json from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from enum import Enum from typing import Optional, List, Dict, Any from pathlib import Path class ScheduleType(Enum): """调度类型""" ONCE = "once" # 一次性 INTERVAL = "interval" # 固定间隔 DAILY = "daily" # 每天 WEEKLY = "weekly" # 每周 CRON = "cron" # Cron 表达式 class IntervalUnit(Enum): """间隔单位""" MINUTES = "minutes" HOURS = "hours" DAYS = "days" @dataclass class ScheduleConfig: """调度配置""" schedule_type: ScheduleType = ScheduleType.ONCE # 间隔调度 interval_value: int = 1 interval_unit: IntervalUnit = IntervalUnit.HOURS # 每日调度 daily_time: str = "04:00" # HH:MM # 每周调度 weekly_days: List[int] = field(default_factory=lambda: [1]) # 1-7, 1=周一 weekly_time: str = "04:00" # Cron 表达式 cron_expression: str = "0 4 * * *" # 通用设置 enabled: bool = True start_date: Optional[str] = None # YYYY-MM-DD end_date: Optional[str] = None # YYYY-MM-DD def to_dict(self) -> dict: """转换为字典""" return { "schedule_type": self.schedule_type.value, "interval_value": self.interval_value, "interval_unit": self.interval_unit.value, "daily_time": self.daily_time, "weekly_days": self.weekly_days, "weekly_time": self.weekly_time, "cron_expression": self.cron_expression, "enabled": self.enabled, "start_date": self.start_date, "end_date": self.end_date, } @classmethod def from_dict(cls, data: dict) -> "ScheduleConfig": """从字典创建""" return cls( schedule_type=ScheduleType(data.get("schedule_type", "once")), interval_value=data.get("interval_value", 1), interval_unit=IntervalUnit(data.get("interval_unit", "hours")), daily_time=data.get("daily_time", "04:00"), weekly_days=data.get("weekly_days", [1]), weekly_time=data.get("weekly_time", "04:00"), cron_expression=data.get("cron_expression", "0 4 * * *"), enabled=data.get("enabled", True), start_date=data.get("start_date"), end_date=data.get("end_date"), ) def get_description(self) -> str: """获取调度描述""" if self.schedule_type == ScheduleType.ONCE: return "一次性执行" elif self.schedule_type == ScheduleType.INTERVAL: unit_names = {"minutes": "分钟", "hours": "小时", "days": "天"} return f"每 {self.interval_value} {unit_names[self.interval_unit.value]}" elif self.schedule_type == ScheduleType.DAILY: return f"每天 {self.daily_time}" elif self.schedule_type == ScheduleType.WEEKLY: day_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"} days = "、".join(f"周{day_names[d]}" for d in sorted(self.weekly_days)) return f"每周 {days} {self.weekly_time}" elif self.schedule_type == ScheduleType.CRON: return f"Cron: {self.cron_expression}" return "未知" # 首次执行延迟秒数 FIRST_RUN_DELAY_SECONDS = 60 def get_next_run_time(self, last_run: Optional[datetime] = None) -> Optional[datetime]: """计算下次运行时间 注意:首次执行(last_run 为 None)时会延迟 60 秒,避免创建后立即执行 """ now = datetime.now() # 检查日期范围 if self.start_date: start = datetime.strptime(self.start_date, "%Y-%m-%d") if now < start: now = start if self.end_date: end = datetime.strptime(self.end_date, "%Y-%m-%d") + timedelta(days=1) if now >= end: return None # 首次执行延迟 60 秒 first_run_time = now + timedelta(seconds=self.FIRST_RUN_DELAY_SECONDS) if self.schedule_type == ScheduleType.ONCE: return None if last_run else first_run_time elif self.schedule_type == ScheduleType.INTERVAL: if not last_run: return first_run_time if self.interval_unit == IntervalUnit.MINUTES: delta = timedelta(minutes=self.interval_value) elif self.interval_unit == IntervalUnit.HOURS: delta = timedelta(hours=self.interval_value) else: delta = timedelta(days=self.interval_value) return last_run + delta elif self.schedule_type == ScheduleType.DAILY: hour, minute = map(int, self.daily_time.split(":")) next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run <= now: next_run += timedelta(days=1) return next_run elif self.schedule_type == ScheduleType.WEEKLY: hour, minute = map(int, self.weekly_time.split(":")) # 找到下一个匹配的日期 for i in range(8): check_date = now + timedelta(days=i) weekday = check_date.isoweekday() # 1-7 if weekday in self.weekly_days: next_run = check_date.replace(hour=hour, minute=minute, second=0, microsecond=0) if next_run > now: return next_run return None elif self.schedule_type == ScheduleType.CRON: # 简化版 Cron 解析(只支持基本格式) try: return self._parse_simple_cron(now) except Exception: return None return None def _parse_simple_cron(self, now: datetime) -> Optional[datetime]: """简化版 Cron 解析""" parts = self.cron_expression.split() if len(parts) != 5: return None minute, hour, day, month, weekday = parts # 只处理简单情况 if minute.isdigit() and hour.isdigit(): next_run = now.replace( hour=int(hour), minute=int(minute), second=0, microsecond=0 ) if next_run <= now: next_run += timedelta(days=1) return next_run return None @dataclass class ScheduleExecutionRecord: """调度执行记录""" task_id: str # 关联的 QueuedTask ID executed_at: datetime # 执行时间 status: str = "" # 状态:success, failed, pending exit_code: Optional[int] = None # 退出码 duration_seconds: float = 0.0 # 耗时(秒) summary: str = "" # 执行摘要 output: str = "" # 完整执行日志 error: str = "" # 错误信息 # 日志最大长度限制(字符数) MAX_OUTPUT_LENGTH: int = 100000 # 100KB def to_dict(self) -> dict: return { "task_id": self.task_id, "executed_at": self.executed_at.isoformat(), "status": self.status, "exit_code": self.exit_code, "duration_seconds": self.duration_seconds, "summary": self.summary, "output": self.output[:self.MAX_OUTPUT_LENGTH] if self.output else "", "error": self.error[:5000] if self.error else "", } @classmethod def from_dict(cls, data: dict) -> "ScheduleExecutionRecord": return cls( task_id=data.get("task_id", ""), executed_at=datetime.fromisoformat(data["executed_at"]) if data.get("executed_at") else datetime.now(), status=data.get("status", ""), exit_code=data.get("exit_code"), duration_seconds=data.get("duration_seconds", 0.0), summary=data.get("summary", ""), output=data.get("output", ""), error=data.get("error", ""), ) @dataclass class ScheduledTask: """调度任务""" id: str name: str task_codes: List[str] schedule: ScheduleConfig task_config: Dict[str, Any] = field(default_factory=dict) # 运行状态 enabled: bool = True last_run: Optional[datetime] = None next_run: Optional[datetime] = None run_count: int = 0 last_status: str = "" # 执行历史(最近 N 次执行记录) execution_history: List[ScheduleExecutionRecord] = field(default_factory=list) MAX_HISTORY_SIZE: int = field(default=50, repr=False) # 保留最近50次执行记录 created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def add_execution_record(self, record: ScheduleExecutionRecord): """添加执行记录""" self.execution_history.insert(0, record) # 限制历史记录数量 if len(self.execution_history) > self.MAX_HISTORY_SIZE: self.execution_history = self.execution_history[:self.MAX_HISTORY_SIZE] def update_execution_record(self, task_id: str, status: str, exit_code: int, duration: float, summary: str, output: str = "", error: str = ""): """更新执行记录状态""" for record in self.execution_history: if record.task_id == task_id: record.status = status record.exit_code = exit_code record.duration_seconds = duration record.summary = summary record.output = output record.error = error break def to_dict(self) -> dict: """转换为字典""" return { "id": self.id, "name": self.name, "task_codes": self.task_codes, "schedule": self.schedule.to_dict(), "task_config": self.task_config, "enabled": self.enabled, "last_run": self.last_run.isoformat() if self.last_run else None, "next_run": self.next_run.isoformat() if self.next_run else None, "run_count": self.run_count, "last_status": self.last_status, "execution_history": [r.to_dict() for r in self.execution_history], "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), } @classmethod def from_dict(cls, data: dict) -> "ScheduledTask": """从字典创建""" history_data = data.get("execution_history", []) execution_history = [ScheduleExecutionRecord.from_dict(r) for r in history_data] return cls( id=data["id"], name=data["name"], task_codes=data["task_codes"], schedule=ScheduleConfig.from_dict(data.get("schedule", {})), task_config=data.get("task_config", {}), enabled=data.get("enabled", True), last_run=datetime.fromisoformat(data["last_run"]) if data.get("last_run") else None, next_run=datetime.fromisoformat(data["next_run"]) if data.get("next_run") else None, run_count=data.get("run_count", 0), last_status=data.get("last_status", ""), execution_history=execution_history, created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(), updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(), ) def update_next_run(self): """更新下次运行时间""" if self.enabled and self.schedule.enabled: self.next_run = self.schedule.get_next_run_time(self.last_run) else: self.next_run = None self.updated_at = datetime.now() class ScheduleStore: """调度任务存储""" def __init__(self, storage_path: Optional[Path] = None): if storage_path is None: storage_path = Path(__file__).resolve().parents[2] / "scheduled_tasks.json" self.storage_path = storage_path self.tasks: Dict[str, ScheduledTask] = {} self.load() def load(self): """加载任务""" if self.storage_path.exists(): try: data = json.loads(self.storage_path.read_text(encoding="utf-8")) self.tasks = { task_id: ScheduledTask.from_dict(task_data) for task_id, task_data in data.get("tasks", {}).items() } except Exception: self.tasks = {} def save(self): """保存任务""" data = { "tasks": { task_id: task.to_dict() for task_id, task in self.tasks.items() } } self.storage_path.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8" ) def add_task(self, task: ScheduledTask): """添加任务""" task.update_next_run() self.tasks[task.id] = task self.save() def remove_task(self, task_id: str): """移除任务""" if task_id in self.tasks: del self.tasks[task_id] self.save() def update_task(self, task: ScheduledTask): """更新任务""" task.update_next_run() task.updated_at = datetime.now() self.tasks[task.id] = task self.save() def get_task(self, task_id: str) -> Optional[ScheduledTask]: """获取任务""" return self.tasks.get(task_id) def get_all_tasks(self) -> List[ScheduledTask]: """获取所有任务""" return list(self.tasks.values()) def get_due_tasks(self) -> List[ScheduledTask]: """获取到期需要执行的任务""" now = datetime.now() due_tasks = [] for task in self.tasks.values(): if task.enabled and task.next_run and task.next_run <= now: due_tasks.append(task) return due_tasks