Files
Neo-ZQYY/scripts/ops/qa_session_logs.py

262 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""会话日志质检脚本 — 检查 extract_kiro_session.py 产出物的完整性和正确性。
用法:
python -B scripts/ops/qa_session_logs.py # 检查所有已索引记录
python -B scripts/ops/qa_session_logs.py --watch # 实时监控模式(每 3s 刷新)
python -B scripts/ops/qa_session_logs.py --report # 输出质检报告到 stdout
"""
import json
import os
import re
import sys
import time
from pathlib import Path
from _env_paths import ensure_repo_root
ensure_repo_root()
SESSION_LOG_DIR = Path("docs/audit/session_logs")
INDEX_PATH = SESSION_LOG_DIR / "_session_index.json"
# ── 检查项定义 ──────────────────────────────────────────────
def check_index_entry(eid: str, entry: dict) -> list[str]:
"""检查单条索引条目,返回问题列表"""
issues = []
# 必填字段
for field in ("output_dir", "chatSessionId", "startTime", "endTime", "status", "workflowType"):
if not entry.get(field):
issues.append(f"索引缺失字段: {field}")
# summary 字段
summary = entry.get("summary")
if not summary:
issues.append("索引缺失 summary")
else:
for sf in ("duration_s", "msg_count", "action_count", "files_modified", "files_created", "sub_agents", "errors"):
if sf not in summary:
issues.append(f"summary 缺失: {sf}")
# output_dir 存在性
out_dir = entry.get("output_dir", "")
if out_dir and not os.path.isdir(out_dir):
issues.append(f"output_dir 不存在: {out_dir}")
return issues
def check_md_file(filepath: str) -> list[str]:
"""检查单个 md 文件的结构完整性"""
issues = []
p = Path(filepath)
if not p.exists():
return [f"文件不存在: {filepath}"]
text = p.read_text(encoding="utf-8", errors="replace")
lines = text.split("\n")
# 基本大小检查
if len(lines) < 20:
issues.append(f"文件过短: {len(lines)}")
# 检查执行摘要(应在前 10 行内)
head = "\n".join(lines[:15])
if "## 📋 执行摘要" not in head:
issues.append("前 15 行未找到执行摘要")
# 检查必要章节
required_sections = ["## 1. 元数据", "## 2. 用户输入", "## 3. 对话记录", "## 4. Actions 时间线"]
for sec in required_sections:
if sec not in text:
issues.append(f"缺失章节: {sec}")
# 检查围栏配对(反引号围栏)
# fence() 生成的围栏:开启行 = ```lang 或 ````lang关闭行 = ``` 或 ````
# 统计所有围栏行3+ 个反引号开头的行),奇数行为开启,偶数行为关闭
fence_lines = 0
for line in lines:
stripped = line.strip()
if re.match(r"^`{3,}", stripped):
fence_lines += 1
if fence_lines % 2 != 0:
issues.append(f"围栏不配对: 共 {fence_lines} 个围栏行(应为偶数)")
# 检查语义标签图标Step 标题应有图标)
step_lines = [l for l in lines if l.startswith("### Step ")]
steps_with_icon = 0
icon_pattern = re.compile(r"[⚡🔀📖🔍💬📋🩺❌📄🖥🔧]")
for sl in step_lines:
if icon_pattern.search(sl):
steps_with_icon += 1
if step_lines and steps_with_icon == 0:
issues.append("Step 标题无语义图标")
# 检查 invokeSubAgent 是否解析了代理名(只检查 Step 标题行)
step_with_invoke = [l for l in lines if l.startswith("### Step ") and "invokeSubAgent" in l]
if any("→ ?" in l for l in step_with_invoke):
issues.append("存在未解析的子代理名 (→ ?)")
# 检查裸露 heading# 开头但不在围栏内,且不是合法章节标题)
# 简化检查:看是否有零宽空格转义
# (这个检查比较宽松,只报告明显问题)
return issues
def check_execution(eid: str, entry: dict) -> dict:
"""完整检查一个 execution索引 + 文件)"""
result = {
"eid": eid[:8],
"eid_full": eid,
"workflow": entry.get("workflowType", "?"),
"status": entry.get("status", "?"),
"start": entry.get("startTime", "?"),
"index_issues": [],
"file_issues": {}, # filename -> issues
"ok": True,
}
# 检查索引
result["index_issues"] = check_index_entry(eid, entry)
# 检查目录下所有 md 文件
out_dir = entry.get("output_dir", "")
if out_dir and os.path.isdir(out_dir):
md_files = sorted(Path(out_dir).glob("*.md"))
if not md_files:
result["index_issues"].append("output_dir 下无 md 文件")
for mf in md_files:
issues = check_md_file(str(mf))
if issues:
result["file_issues"][mf.name] = issues
elif out_dir:
pass # 已在 index_issues 中报告
# 汇总
if result["index_issues"] or result["file_issues"]:
result["ok"] = False
return result
# ── 主流程 ──────────────────────────────────────────────────
def load_index() -> dict:
if not INDEX_PATH.exists():
return {"version": 2, "entries": {}}
return json.loads(INDEX_PATH.read_text(encoding="utf-8"))
def run_qa() -> list[dict]:
"""对所有已索引记录执行质检,返回结果列表"""
index = load_index()
entries = index.get("entries", {})
results = []
for eid, entry in entries.items():
results.append(check_execution(eid, entry))
return results
def print_summary(results: list[dict], elapsed: float = 0):
"""打印质检摘要"""
total = len(results)
passed = sum(1 for r in results if r["ok"])
failed = total - passed
print(f"\n{'='*60}")
print(f" 会话日志质检报告")
print(f" 检查时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if elapsed:
print(f" 耗时: {elapsed:.1f}s")
print(f"{'='*60}")
print(f" 总计: {total} | ✅ 通过: {passed} | ❌ 失败: {failed}")
print(f"{'='*60}\n")
if failed == 0:
print(" 所有记录质检通过 ✅\n")
return
# 详细失败信息
for r in results:
if r["ok"]:
continue
print(f"{r['eid']} [{r['workflow']}] {r['start']}")
for issue in r["index_issues"]:
print(f" 索引: {issue}")
for fname, issues in r["file_issues"].items():
for issue in issues:
print(f" {fname}: {issue}")
print()
def print_watch_line(results: list[dict], cycle: int):
"""监控模式下的单行状态输出"""
total = len(results)
passed = sum(1 for r in results if r["ok"])
failed = total - passed
ts = time.strftime("%H:%M:%S")
status = "✅ ALL PASS" if failed == 0 else f"{failed} FAIL"
print(f" [{ts}] #{cycle:03d} 总计={total} 通过={passed} 失败={failed} {status}")
def watch_mode():
"""实时监控模式:每 3 秒检查一次,检测到新记录时输出状态"""
print(f" 会话日志质检 — 实时监控模式 (Ctrl+C 退出)")
print(f" 监控目标: {INDEX_PATH}")
print(f"{''*60}")
last_count = 0
cycle = 0
last_results = []
try:
while True:
cycle += 1
results = run_qa()
current_count = len(results)
# 有新记录或首次运行时输出
if current_count != last_count or cycle == 1:
print_watch_line(results, cycle)
# 如果有新的失败,输出详情
new_fails = [r for r in results if not r["ok"]
and r["eid_full"] not in {lr["eid_full"] for lr in last_results if not lr["ok"]}]
for r in new_fails:
all_issues = r["index_issues"] + [
f"{fn}: {iss}" for fn, issues in r["file_issues"].items() for iss in issues
]
print(f"{r['eid']}: {'; '.join(all_issues[:3])}")
last_count = current_count
last_results = results
time.sleep(3)
except KeyboardInterrupt:
print(f"\n{''*60}")
print(f" 监控结束,最终状态:")
print_summary(last_results)
def main():
import argparse
parser = argparse.ArgumentParser(description="会话日志质检脚本")
parser.add_argument("--watch", action="store_true", help="实时监控模式")
parser.add_argument("--report", action="store_true", help="输出完整质检报告")
args = parser.parse_args()
if args.watch:
watch_mode()
else:
t0 = time.time()
results = run_qa()
elapsed = time.time() - t0
print_summary(results, elapsed)
# 退出码:有失败则返回 1
if any(not r["ok"] for r in results):
sys.exit(1)
if __name__ == "__main__":
main()