264 lines
9.8 KiB
Python
264 lines
9.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""ETL 执行计时器
|
|
|
|
记录每个步骤和子步骤的开始/结束时间与耗时(精确到毫秒),
|
|
全部任务完成后输出 Markdown 格式的计时结果文档。
|
|
|
|
输出路径通过 ETL_REPORT_ROOT 环境变量控制。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
|
|
# 默认时区
|
|
_DEFAULT_TZ = ZoneInfo("Asia/Shanghai")
|
|
|
|
|
|
@dataclass
|
|
class StepRecord:
|
|
"""单个步骤的计时记录"""
|
|
|
|
name: str
|
|
start_time: datetime
|
|
end_time: Optional[datetime] = None
|
|
# 耗时(毫秒),由 stop() 计算填充
|
|
elapsed_ms: float = 0.0
|
|
# 高精度单调时钟起点(不对外暴露,仅用于计算耗时)
|
|
_mono_start: float = field(default=0.0, repr=False)
|
|
# 子步骤列表
|
|
children: list["StepRecord"] = field(default_factory=list)
|
|
|
|
# ------------------------------------------------------------------
|
|
@property
|
|
def elapsed_seconds(self) -> float:
|
|
return self.elapsed_ms / 1000.0
|
|
|
|
def to_dict(self) -> dict:
|
|
"""序列化为字典,方便日志或 JSON 输出"""
|
|
return {
|
|
"name": self.name,
|
|
"start_time": self.start_time.isoformat(),
|
|
"end_time": self.end_time.isoformat() if self.end_time else None,
|
|
"elapsed_ms": round(self.elapsed_ms, 3),
|
|
"children": [c.to_dict() for c in self.children],
|
|
}
|
|
|
|
|
|
class EtlTimer:
|
|
"""ETL 执行计时器
|
|
|
|
用法::
|
|
|
|
timer = EtlTimer()
|
|
timer.start_step("ODS_ASSISTANT_ACCOUNT")
|
|
timer.start_sub_step("ODS_ASSISTANT_ACCOUNT", "fetch")
|
|
...
|
|
timer.stop_sub_step("ODS_ASSISTANT_ACCOUNT", "fetch")
|
|
timer.stop_step("ODS_ASSISTANT_ACCOUNT")
|
|
timer.finish() # 输出 Markdown 报告
|
|
"""
|
|
|
|
def __init__(self, tz: ZoneInfo | None = None) -> None:
|
|
self._tz = tz or _DEFAULT_TZ
|
|
self._steps: list[StepRecord] = []
|
|
# name → StepRecord 快速查找
|
|
self._step_map: dict[str, StepRecord] = {}
|
|
# 整体计时
|
|
self._overall_start: Optional[datetime] = None
|
|
self._overall_end: Optional[datetime] = None
|
|
self._overall_mono_start: float = 0.0
|
|
self._overall_elapsed_ms: float = 0.0
|
|
|
|
# ── 整体计时 ─────────────────────────────────────────────
|
|
|
|
def start(self) -> None:
|
|
"""启动整体计时"""
|
|
self._overall_start = datetime.now(self._tz)
|
|
self._overall_mono_start = time.perf_counter()
|
|
|
|
def finish(self, *, write_report: bool = True) -> str:
|
|
"""结束整体计时,可选输出报告文件。
|
|
|
|
Returns:
|
|
Markdown 格式的计时报告文本
|
|
"""
|
|
mono_end = time.perf_counter()
|
|
self._overall_end = datetime.now(self._tz)
|
|
self._overall_elapsed_ms = (mono_end - self._overall_mono_start) * 1000
|
|
|
|
report = self._render_markdown()
|
|
if write_report:
|
|
self._write_report(report)
|
|
return report
|
|
|
|
# ── 步骤级 ───────────────────────────────────────────────
|
|
|
|
def start_step(self, name: str) -> StepRecord:
|
|
"""开始一个主步骤"""
|
|
now = datetime.now(self._tz)
|
|
rec = StepRecord(
|
|
name=name,
|
|
start_time=now,
|
|
_mono_start=time.perf_counter(),
|
|
)
|
|
self._steps.append(rec)
|
|
self._step_map[name] = rec
|
|
return rec
|
|
|
|
def stop_step(self, name: str) -> StepRecord:
|
|
"""结束一个主步骤"""
|
|
rec = self._step_map.get(name)
|
|
if rec is None:
|
|
raise KeyError(f"未找到步骤: {name}")
|
|
mono_end = time.perf_counter()
|
|
rec.end_time = datetime.now(self._tz)
|
|
rec.elapsed_ms = (mono_end - rec._mono_start) * 1000
|
|
return rec
|
|
|
|
# ── 子步骤级 ─────────────────────────────────────────────
|
|
|
|
def start_sub_step(self, parent_name: str, sub_name: str) -> StepRecord:
|
|
"""在指定主步骤下开始一个子步骤"""
|
|
parent = self._step_map.get(parent_name)
|
|
if parent is None:
|
|
raise KeyError(f"未找到父步骤: {parent_name}")
|
|
now = datetime.now(self._tz)
|
|
child = StepRecord(
|
|
name=sub_name,
|
|
start_time=now,
|
|
_mono_start=time.perf_counter(),
|
|
)
|
|
parent.children.append(child)
|
|
# 子步骤用 "parent/child" 作为复合键
|
|
self._step_map[f"{parent_name}/{sub_name}"] = child
|
|
return child
|
|
|
|
def stop_sub_step(self, parent_name: str, sub_name: str) -> StepRecord:
|
|
"""结束一个子步骤"""
|
|
key = f"{parent_name}/{sub_name}"
|
|
rec = self._step_map.get(key)
|
|
if rec is None:
|
|
raise KeyError(f"未找到子步骤: {key}")
|
|
mono_end = time.perf_counter()
|
|
rec.end_time = datetime.now(self._tz)
|
|
rec.elapsed_ms = (mono_end - rec._mono_start) * 1000
|
|
return rec
|
|
|
|
# ── 查询 ─────────────────────────────────────────────────
|
|
|
|
@property
|
|
def steps(self) -> list[StepRecord]:
|
|
return list(self._steps)
|
|
|
|
@property
|
|
def overall_elapsed_ms(self) -> float:
|
|
return self._overall_elapsed_ms
|
|
|
|
def get_step(self, name: str) -> Optional[StepRecord]:
|
|
return self._step_map.get(name)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""整体序列化"""
|
|
return {
|
|
"overall_start": self._overall_start.isoformat() if self._overall_start else None,
|
|
"overall_end": self._overall_end.isoformat() if self._overall_end else None,
|
|
"overall_elapsed_ms": round(self._overall_elapsed_ms, 3),
|
|
"steps": [s.to_dict() for s in self._steps],
|
|
}
|
|
|
|
# ── Markdown 渲染 ────────────────────────────────────────
|
|
|
|
def _render_markdown(self) -> str:
|
|
lines: list[str] = []
|
|
lines.append("# ETL 执行计时报告")
|
|
lines.append("")
|
|
|
|
if self._overall_start:
|
|
lines.append(f"- 开始时间: {self._overall_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
|
|
if self._overall_end:
|
|
lines.append(f"- 结束时间: {self._overall_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
|
|
lines.append(f"- 总耗时: {_fmt_ms(self._overall_elapsed_ms)}")
|
|
lines.append(f"- 步骤数: {len(self._steps)}")
|
|
lines.append("")
|
|
|
|
# 汇总表格
|
|
lines.append("## 步骤汇总")
|
|
lines.append("")
|
|
lines.append("| # | 步骤名称 | 开始时间 | 结束时间 | 耗时 |")
|
|
lines.append("|---|---------|---------|---------|------|")
|
|
for i, step in enumerate(self._steps, 1):
|
|
start_str = step.start_time.strftime("%H:%M:%S.%f")[:-3]
|
|
end_str = step.end_time.strftime("%H:%M:%S.%f")[:-3] if step.end_time else "-"
|
|
lines.append(
|
|
f"| {i} | {step.name} | {start_str} | {end_str} | {_fmt_ms(step.elapsed_ms)} |"
|
|
)
|
|
lines.append("")
|
|
|
|
# 各步骤详情(含子步骤)
|
|
has_children = any(s.children for s in self._steps)
|
|
if has_children:
|
|
lines.append("## 步骤详情")
|
|
lines.append("")
|
|
for step in self._steps:
|
|
if not step.children:
|
|
continue
|
|
lines.append(f"### {step.name}")
|
|
lines.append("")
|
|
lines.append(f"- 总耗时: {_fmt_ms(step.elapsed_ms)}")
|
|
lines.append("")
|
|
lines.append("| # | 子步骤 | 开始时间 | 结束时间 | 耗时 |")
|
|
lines.append("|---|-------|---------|---------|------|")
|
|
for j, child in enumerate(step.children, 1):
|
|
c_start = child.start_time.strftime("%H:%M:%S.%f")[:-3]
|
|
c_end = child.end_time.strftime("%H:%M:%S.%f")[:-3] if child.end_time else "-"
|
|
lines.append(
|
|
f"| {j} | {child.name} | {c_start} | {c_end} | {_fmt_ms(child.elapsed_ms)} |"
|
|
)
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
# ── 文件输出 ──────────────────────────────────────────────
|
|
|
|
def _write_report(self, content: str) -> Path:
|
|
"""将报告写入 ETL_REPORT_ROOT 目录"""
|
|
report_root = os.environ.get("ETL_REPORT_ROOT")
|
|
if not report_root:
|
|
raise KeyError(
|
|
"环境变量 ETL_REPORT_ROOT 未定义。"
|
|
"请在根 .env 中配置,参考 docs/deployment/EXPORT-PATHS.md"
|
|
)
|
|
out_dir = Path(report_root)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
stamp = datetime.now(self._tz).strftime("%Y%m%d_%H%M%S")
|
|
out_path = out_dir / f"etl_timing_{stamp}.md"
|
|
out_path.write_text(content, encoding="utf-8")
|
|
return out_path
|
|
|
|
|
|
# ── 工具函数 ──────────────────────────────────────────────────
|
|
|
|
|
|
def _fmt_ms(ms: float) -> str:
|
|
"""将毫秒格式化为人类可读字符串"""
|
|
if ms < 1000:
|
|
return f"{ms:.1f}ms"
|
|
secs = ms / 1000
|
|
if secs < 60:
|
|
return f"{secs:.2f}s"
|
|
mins = int(secs // 60)
|
|
remaining = secs % 60
|
|
if mins < 60:
|
|
return f"{mins}m{remaining:.1f}s"
|
|
hours = int(mins // 60)
|
|
remaining_mins = mins % 60
|
|
return f"{hours}h{remaining_mins}m"
|