Files
ZQYY.FQ-ETL/gui/models/task_model.py

210 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""任务数据模型"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Any
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending" # 待执行
RUNNING = "running" # 执行中
SUCCESS = "success" # 成功
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消
class TaskCategory(Enum):
"""任务分类"""
ODS = "ODS" # ODS 数据抓取任务
DWD = "DWD" # DWD 装载任务
DWS = "DWS" # DWS 汇总任务
SCHEMA = "Schema" # Schema 初始化任务
QUALITY = "Quality" # 质量检查任务
OTHER = "Other" # 其他任务
# 任务分类映射
TASK_CATEGORIES: Dict[str, TaskCategory] = {
# ODS 任务
"ODS_PAYMENT": TaskCategory.ODS,
"ODS_MEMBER": TaskCategory.ODS,
"ODS_MEMBER_CARD": TaskCategory.ODS,
"ODS_MEMBER_BALANCE": TaskCategory.ODS,
"ODS_SETTLEMENT_RECORDS": TaskCategory.ODS,
"ODS_TABLE_USE": TaskCategory.ODS,
"ODS_ASSISTANT_ACCOUNT": TaskCategory.ODS,
"ODS_ASSISTANT_LEDGER": TaskCategory.ODS,
"ODS_ASSISTANT_ABOLISH": TaskCategory.ODS,
"ODS_REFUND": TaskCategory.ODS,
"ODS_PLATFORM_COUPON": TaskCategory.ODS,
"ODS_RECHARGE_SETTLE": TaskCategory.ODS,
"ODS_GROUP_PACKAGE": TaskCategory.ODS,
"ODS_GROUP_BUY_REDEMPTION": TaskCategory.ODS,
"ODS_INVENTORY_STOCK": TaskCategory.ODS,
"ODS_INVENTORY_CHANGE": TaskCategory.ODS,
"ODS_TABLES": TaskCategory.ODS,
"ODS_GOODS_CATEGORY": TaskCategory.ODS,
"ODS_STORE_GOODS": TaskCategory.ODS,
"ODS_STORE_GOODS_SALES": TaskCategory.ODS,
"ODS_TABLE_FEE_DISCOUNT": TaskCategory.ODS,
"ODS_TENANT_GOODS": TaskCategory.ODS,
"ODS_SETTLEMENT_TICKET": TaskCategory.ODS,
# DWD 任务
"DWD_LOAD_FROM_ODS": TaskCategory.DWD,
"DWD_QUALITY_CHECK": TaskCategory.QUALITY,
"PAYMENTS_DWD": TaskCategory.DWD,
"MEMBERS_DWD": TaskCategory.DWD,
"TICKET_DWD": TaskCategory.DWD,
# DWS 任务
"INIT_DWS_SCHEMA": TaskCategory.SCHEMA,
"SEED_DWS_CONFIG": TaskCategory.SCHEMA,
"DWS_BUILD_ORDER_SUMMARY": TaskCategory.DWS,
"DWS_WINBACK_INDEX": TaskCategory.DWS,
"DWS_NEWCONV_INDEX": TaskCategory.DWS,
"DWS_RECALL_INDEX": TaskCategory.DWS,
"DWS_INTIMACY_INDEX": TaskCategory.DWS,
"DWS_RELATION_INDEX": TaskCategory.DWS,
"DWS_ML_MANUAL_IMPORT": TaskCategory.DWS,
"DWS_ASSISTANT_DAILY": TaskCategory.DWS,
"DWS_ASSISTANT_MONTHLY": TaskCategory.DWS,
"DWS_ASSISTANT_CUSTOMER": TaskCategory.DWS,
"DWS_ASSISTANT_SALARY": TaskCategory.DWS,
"DWS_ASSISTANT_FINANCE": TaskCategory.DWS,
"DWS_MEMBER_CONSUMPTION": TaskCategory.DWS,
"DWS_MEMBER_VISIT": TaskCategory.DWS,
"DWS_FINANCE_DAILY": TaskCategory.DWS,
"DWS_FINANCE_RECHARGE": TaskCategory.DWS,
"DWS_FINANCE_INCOME_STRUCTURE": TaskCategory.DWS,
"DWS_FINANCE_DISCOUNT_DETAIL": TaskCategory.DWS,
"DWS_RETENTION_CLEANUP": TaskCategory.DWS,
"DWS_MV_REFRESH_FINANCE_DAILY": TaskCategory.DWS,
"DWS_MV_REFRESH_ASSISTANT_DAILY": TaskCategory.DWS,
# Schema 任务
"INIT_ODS_SCHEMA": TaskCategory.SCHEMA,
"INIT_DWD_SCHEMA": TaskCategory.SCHEMA,
# 其他任务
"MANUAL_INGEST": TaskCategory.OTHER,
"CHECK_CUTOFF": TaskCategory.OTHER,
"DATA_INTEGRITY_CHECK": TaskCategory.QUALITY,
"ODS_JSON_ARCHIVE": TaskCategory.OTHER,
# 旧版任务(兼容)
"PRODUCTS": TaskCategory.ODS,
"TABLES": TaskCategory.ODS,
"MEMBERS": TaskCategory.ODS,
"ASSISTANTS": TaskCategory.ODS,
"PACKAGES_DEF": TaskCategory.ODS,
"ORDERS": TaskCategory.ODS,
"PAYMENTS": TaskCategory.ODS,
"REFUNDS": TaskCategory.ODS,
"COUPON_USAGE": TaskCategory.ODS,
"INVENTORY_CHANGE": TaskCategory.ODS,
"TOPUPS": TaskCategory.ODS,
"TABLE_DISCOUNT": TaskCategory.ODS,
"ASSISTANT_ABOLISH": TaskCategory.ODS,
"LEDGER": TaskCategory.ODS,
}
def get_task_category(task_code: str) -> TaskCategory:
"""获取任务分类"""
return TASK_CATEGORIES.get(task_code.upper(), TaskCategory.OTHER)
@dataclass
class TaskItem:
"""任务项"""
task_code: str
name: str = ""
description: str = ""
category: TaskCategory = TaskCategory.OTHER
enabled: bool = True
def __post_init__(self):
if not self.name:
self.name = self.task_code
if not self.category or self.category == TaskCategory.OTHER:
self.category = get_task_category(self.task_code)
@dataclass
class TaskConfig:
"""任务执行配置"""
tasks: List[str] = field(default_factory=list)
pipeline_flow: str = "FULL" # FULL, FETCH_ONLY, INGEST_ONLY
dry_run: bool = False
window_start: Optional[str] = None
window_end: Optional[str] = None
window_split: Optional[str] = None # none, day, week, month
window_split_days: Optional[int] = None # 按天切分的天数1/10/30
window_compensation: int = 0 # 补偿小时数
ingest_source: Optional[str] = None
store_id: Optional[int] = None
pg_dsn: Optional[str] = None
api_token: Optional[str] = None
extra_args: Dict[str, Any] = field(default_factory=dict)
env_vars: Dict[str, str] = field(default_factory=dict) # 额外环境变量
# 新增:管道配置
pipeline: str = "api_ods_dwd" # 管道类型
processing_mode: str = "increment_only" # increment_only / verify_only / increment_verify
fetch_before_verify: bool = False # 校验前从 API 获取数据(仅 verify_only 模式有效)
window_mode: str = "lookback" # lookback / custom
lookback_hours: int = 24 # 回溯小时数
overlap_seconds: int = 600 # 冗余秒数
@dataclass
class TaskHistory:
"""任务执行历史"""
id: str
task_codes: List[str]
status: TaskStatus
start_time: datetime
end_time: Optional[datetime] = None
exit_code: Optional[int] = None
command: str = ""
output_log: str = ""
error_message: str = ""
summary: Dict[str, Any] = field(default_factory=dict)
@property
def duration_seconds(self) -> Optional[float]:
"""执行时长(秒)"""
if self.end_time and self.start_time:
return (self.end_time - self.start_time).total_seconds()
return None
@property
def duration_str(self) -> str:
"""格式化的执行时长"""
secs = self.duration_seconds
if secs is None:
return "-"
if secs < 60:
return f"{secs:.1f}"
elif secs < 3600:
mins = int(secs // 60)
secs = secs % 60
return f"{mins}{secs:.0f}"
else:
hours = int(secs // 3600)
mins = int((secs % 3600) // 60)
return f"{hours}{mins}"
@dataclass
class QueuedTask:
"""队列中的任务"""
id: str
config: TaskConfig
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
output: str = ""
error: str = ""
exit_code: Optional[int] = None