54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""简单的任务结果汇总与格式化工具。"""
|
||
from __future__ import annotations
|
||
|
||
from typing import Iterable
|
||
|
||
|
||
def summarize_counts(task_results: Iterable[dict]) -> dict:
|
||
"""
|
||
汇总多个任务的 counts,返回总计与逐任务明细。
|
||
task_results: 形如 {"task_code": str, "counts": {...}} 的字典序列。
|
||
"""
|
||
totals = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
details = []
|
||
|
||
for res in task_results:
|
||
code = res.get("task_code") or res.get("code") or "UNKNOWN"
|
||
counts = res.get("counts") or {}
|
||
row = {"task_code": code}
|
||
for key in totals.keys():
|
||
val = int(counts.get(key, 0) or 0)
|
||
row[key] = val
|
||
totals[key] += val
|
||
details.append(row)
|
||
|
||
return {"total": totals, "details": details}
|
||
|
||
|
||
def format_report(summary: dict) -> str:
|
||
"""将 summarize_counts 的输出格式化为可读文案。"""
|
||
lines = []
|
||
totals = summary.get("total", {})
|
||
lines.append(
|
||
"TOTAL fetched={fetched} inserted={inserted} updated={updated} skipped={skipped} errors={errors}".format(
|
||
fetched=totals.get("fetched", 0),
|
||
inserted=totals.get("inserted", 0),
|
||
updated=totals.get("updated", 0),
|
||
skipped=totals.get("skipped", 0),
|
||
errors=totals.get("errors", 0),
|
||
)
|
||
)
|
||
for row in summary.get("details", []):
|
||
lines.append(
|
||
"{task_code}: fetched={fetched} inserted={inserted} updated={updated} skipped={skipped} errors={errors}".format(
|
||
task_code=row.get("task_code", "UNKNOWN"),
|
||
fetched=row.get("fetched", 0),
|
||
inserted=row.get("inserted", 0),
|
||
updated=row.get("updated", 0),
|
||
skipped=row.get("skipped", 0),
|
||
errors=row.get("errors", 0),
|
||
)
|
||
)
|
||
return "\n".join(lines)
|