"""会话日志质检脚本 — 检查 extract_kiro_session.py 产出物的完整性和正确性。 用法: python -B scripts/ops/qa_session_logs.py # 检查所有已索引记录 python -B scripts/ops/qa_session_logs.py --watch # 实时监控模式(每 3s 刷新) python -B scripts/ops/qa_session_logs.py --report # 输出质检报告到 stdout """ import json import os import re import sys import time from pathlib import Path from _env_paths import ensure_repo_root ensure_repo_root() SESSION_LOG_DIR = Path("docs/audit/session_logs") INDEX_PATH = SESSION_LOG_DIR / "_session_index.json" # ── 检查项定义 ────────────────────────────────────────────── def check_index_entry(eid: str, entry: dict) -> list[str]: """检查单条索引条目,返回问题列表""" issues = [] # 必填字段 for field in ("output_dir", "chatSessionId", "startTime", "endTime", "status", "workflowType"): if not entry.get(field): issues.append(f"索引缺失字段: {field}") # summary 字段 summary = entry.get("summary") if not summary: issues.append("索引缺失 summary") else: for sf in ("duration_s", "msg_count", "action_count", "files_modified", "files_created", "sub_agents", "errors"): if sf not in summary: issues.append(f"summary 缺失: {sf}") # output_dir 存在性 out_dir = entry.get("output_dir", "") if out_dir and not os.path.isdir(out_dir): issues.append(f"output_dir 不存在: {out_dir}") return issues def check_md_file(filepath: str) -> list[str]: """检查单个 md 文件的结构完整性""" issues = [] p = Path(filepath) if not p.exists(): return [f"文件不存在: {filepath}"] text = p.read_text(encoding="utf-8", errors="replace") lines = text.split("\n") # 基本大小检查 if len(lines) < 20: issues.append(f"文件过短: {len(lines)} 行") # 检查执行摘要(应在前 10 行内) head = "\n".join(lines[:15]) if "## 📋 执行摘要" not in head: issues.append("前 15 行未找到执行摘要") # 检查必要章节 required_sections = ["## 1. 元数据", "## 2. 用户输入", "## 3. 对话记录", "## 4. Actions 时间线"] for sec in required_sections: if sec not in text: issues.append(f"缺失章节: {sec}") # 检查围栏配对(反引号围栏) # fence() 生成的围栏:开启行 = ```lang 或 ````lang,关闭行 = ``` 或 ```` # 统计所有围栏行(3+ 个反引号开头的行),奇数行为开启,偶数行为关闭 fence_lines = 0 for line in lines: stripped = line.strip() if re.match(r"^`{3,}", stripped): fence_lines += 1 if fence_lines % 2 != 0: issues.append(f"围栏不配对: 共 {fence_lines} 个围栏行(应为偶数)") # 检查语义标签图标(Step 标题应有图标) step_lines = [l for l in lines if l.startswith("### Step ")] steps_with_icon = 0 icon_pattern = re.compile(r"[⚡🔀📖🔍💬📋🩺❌📄🖥🔧]") for sl in step_lines: if icon_pattern.search(sl): steps_with_icon += 1 if step_lines and steps_with_icon == 0: issues.append("Step 标题无语义图标") # 检查 invokeSubAgent 是否解析了代理名(只检查 Step 标题行) step_with_invoke = [l for l in lines if l.startswith("### Step ") and "invokeSubAgent" in l] if any("→ ?" in l for l in step_with_invoke): issues.append("存在未解析的子代理名 (→ ?)") # 检查裸露 heading(# 开头但不在围栏内,且不是合法章节标题) # 简化检查:看是否有零宽空格转义 # (这个检查比较宽松,只报告明显问题) return issues def check_execution(eid: str, entry: dict) -> dict: """完整检查一个 execution(索引 + 文件)""" result = { "eid": eid[:8], "eid_full": eid, "workflow": entry.get("workflowType", "?"), "status": entry.get("status", "?"), "start": entry.get("startTime", "?"), "index_issues": [], "file_issues": {}, # filename -> issues "ok": True, } # 检查索引 result["index_issues"] = check_index_entry(eid, entry) # 检查目录下所有 md 文件 out_dir = entry.get("output_dir", "") if out_dir and os.path.isdir(out_dir): md_files = sorted(Path(out_dir).glob("*.md")) if not md_files: result["index_issues"].append("output_dir 下无 md 文件") for mf in md_files: issues = check_md_file(str(mf)) if issues: result["file_issues"][mf.name] = issues elif out_dir: pass # 已在 index_issues 中报告 # 汇总 if result["index_issues"] or result["file_issues"]: result["ok"] = False return result # ── 主流程 ────────────────────────────────────────────────── def load_index() -> dict: if not INDEX_PATH.exists(): return {"version": 2, "entries": {}} return json.loads(INDEX_PATH.read_text(encoding="utf-8")) def run_qa() -> list[dict]: """对所有已索引记录执行质检,返回结果列表""" index = load_index() entries = index.get("entries", {}) results = [] for eid, entry in entries.items(): results.append(check_execution(eid, entry)) return results def print_summary(results: list[dict], elapsed: float = 0): """打印质检摘要""" total = len(results) passed = sum(1 for r in results if r["ok"]) failed = total - passed print(f"\n{'='*60}") print(f" 会话日志质检报告") print(f" 检查时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") if elapsed: print(f" 耗时: {elapsed:.1f}s") print(f"{'='*60}") print(f" 总计: {total} | ✅ 通过: {passed} | ❌ 失败: {failed}") print(f"{'='*60}\n") if failed == 0: print(" 所有记录质检通过 ✅\n") return # 详细失败信息 for r in results: if r["ok"]: continue print(f" ❌ {r['eid']} [{r['workflow']}] {r['start']}") for issue in r["index_issues"]: print(f" 索引: {issue}") for fname, issues in r["file_issues"].items(): for issue in issues: print(f" {fname}: {issue}") print() def print_watch_line(results: list[dict], cycle: int): """监控模式下的单行状态输出""" total = len(results) passed = sum(1 for r in results if r["ok"]) failed = total - passed ts = time.strftime("%H:%M:%S") status = "✅ ALL PASS" if failed == 0 else f"❌ {failed} FAIL" print(f" [{ts}] #{cycle:03d} 总计={total} 通过={passed} 失败={failed} {status}") def watch_mode(): """实时监控模式:每 3 秒检查一次,检测到新记录时输出状态""" print(f" 会话日志质检 — 实时监控模式 (Ctrl+C 退出)") print(f" 监控目标: {INDEX_PATH}") print(f"{'─'*60}") last_count = 0 cycle = 0 last_results = [] try: while True: cycle += 1 results = run_qa() current_count = len(results) # 有新记录或首次运行时输出 if current_count != last_count or cycle == 1: print_watch_line(results, cycle) # 如果有新的失败,输出详情 new_fails = [r for r in results if not r["ok"] and r["eid_full"] not in {lr["eid_full"] for lr in last_results if not lr["ok"]}] for r in new_fails: all_issues = r["index_issues"] + [ f"{fn}: {iss}" for fn, issues in r["file_issues"].items() for iss in issues ] print(f" ↳ {r['eid']}: {'; '.join(all_issues[:3])}") last_count = current_count last_results = results time.sleep(3) except KeyboardInterrupt: print(f"\n{'─'*60}") print(f" 监控结束,最终状态:") print_summary(last_results) def main(): import argparse parser = argparse.ArgumentParser(description="会话日志质检脚本") parser.add_argument("--watch", action="store_true", help="实时监控模式") parser.add_argument("--report", action="store_true", help="输出完整质检报告") args = parser.parse_args() if args.watch: watch_mode() else: t0 = time.time() results = run_qa() elapsed = time.time() - t0 print_summary(results, elapsed) # 退出码:有失败则返回 1 if any(not r["ok"] for r in results): sys.exit(1) if __name__ == "__main__": main()