Files
Neo-ZQYY/scripts/ops/extract_timing_data.py
Neo b25308c3f4 feat: P1-P3 全栈集成 — 数据库基础 + DWS 扩展 + 小程序鉴权 + 工程化体系
## P1 数据库基础
- zqyy_app: 创建 auth/biz schema、FDW 连接 etl_feiqiu
- etl_feiqiu: 创建 app schema RLS 视图、商品库存预警表
- 清理 assistant_abolish 残留数据

## P2 ETL/DWS 扩展
- 新增 DWS 助教订单贡献度表 (dws.assistant_order_contribution)
- 新增 assistant_order_contribution_task 任务及 RLS 视图
- member_consumption 增加充值字段、assistant_daily 增加处罚字段
- 更新 ODS/DWD/DWS 任务文档及业务规则文档
- 更新 consistency_checker、flow_runner、task_registry 等核心模块

## P3 小程序鉴权系统
- 新增 xcx_auth 路由/schema(微信登录 + JWT)
- 新增 wechat/role/matching/application 服务层
- zqyy_app 鉴权表迁移 + 角色权限种子数据
- auth/dependencies.py 支持小程序 JWT 鉴权

## 文档与审计
- 新增 DOCUMENTATION-MAP 文档导航
- 新增 7 份 BD_Manual 数据库变更文档
- 更新 DDL 基线快照(etl_feiqiu 6 schema + zqyy_app auth)
- 新增全栈集成审计记录、部署检查清单更新
- 新增 BACKLOG 路线图、FDW→Core 迁移计划

## Kiro 工程化
- 新增 5 个 Spec(P1/P2/P3/全栈集成/核心业务)
- 新增审计自动化脚本(agent_on_stop/build_audit_context/compliance_prescan)
- 新增 6 个 Hook(合规检查/会话日志/提交审计等)
- 新增 doc-map steering 文件

## 运维与测试
- 新增 ops 脚本:迁移验证/API 健康检查/ETL 监控/集成报告
- 新增属性测试:test_dws_contribution / test_auth_system
- 清理过期 export 报告文件
- 更新 .gitignore 排除规则
2026-02-26 08:03:53 +08:00

473 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
从 API 获取 ETL 执行日志,提取精细计时数据。
Task 4.1: 运维联调 - 性能计时分析
策略:
- 所有任务都使用统一的切片格式 "XXX: 开始执行(n/N)" / "XXX: 完成(n/N)"
- 根据任务名前缀分类到 ODS / DWD / DWS / INDEX 阶段
- 工具类任务使用 "开始执行工具类任务" / "工具类任务执行成功"
- 失败任务使用 "任务 XXX 失败:" 格式
"""
import os
import sys
import re
import requests
from datetime import datetime
from collections import OrderedDict
from dotenv import load_dotenv
# 加载环境变量
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
BACKEND_URL = "http://localhost:8000"
EXECUTION_ID = "41938155-db8c-4eec-9b81-9e5aef42fb8a"
# 任务分层映射(根据 design.md 中的任务列表)
INDEX_TASKS = {"DWS_WINBACK_INDEX", "DWS_NEWCONV_INDEX", "DWS_RELATION_INDEX"}
DWD_TASKS = {"DWD_LOAD_FROM_ODS"}
# DWS 任务 = 以 DWS_ 开头但不在 INDEX_TASKS 中的
# ODS 任务 = 以 ODS_ 开头的
def classify_task(task_name):
"""根据任务名分类到阶段"""
if task_name in INDEX_TASKS:
return "INDEX"
if task_name in DWD_TASKS:
return "DWD"
if task_name.startswith("DWS_"):
return "DWS"
if task_name.startswith("ODS_"):
return "ODS"
return "OTHER"
def login():
resp = requests.post(f"{BACKEND_URL}/api/auth/login",
json={"username": "admin", "password": "admin123"})
resp.raise_for_status()
return resp.json()["access_token"]
def get_logs(token):
headers = {"Authorization": f"Bearer {token}"}
resp = requests.get(f"{BACKEND_URL}/api/execution/{EXECUTION_ID}/logs",
headers=headers)
resp.raise_for_status()
data = resp.json()
return data.get("error_log", "") or data.get("output_log", "")
def parse_timestamp(ts_str):
try:
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def extract_timing_data(log_text):
"""从日志提取所有任务的计时数据"""
lines = log_text.split('\n')
ts_re = re.compile(r'^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]')
# 切片开始: "XXX: 开始执行(1/4),窗口[...]"
# 也匹配: "XXX: 开始执行(1/4), 窗口[...]"(英文逗号)
slice_start_re = re.compile(
r'(\w+): 开始执行\((\d+)/(\d+)\)[,]\s*窗口\[([^\]]+)\]')
# 切片完成: "XXX: 完成(1/4),已处理 30/111.17 天"
slice_end_re = re.compile(
r'(\w+): 完成\((\d+)/(\d+)\)[,]\s*已处理')
# 工具类任务开始: "XXX: 开始执行工具类任务"
util_start_re = re.compile(r'(\w+): 开始执行工具类任务')
# 工具类任务完成: "XXX: 工具类任务执行成功"
util_end_re = re.compile(r'(\w+): 工具类任务执行(成功|失败)')
# 任务失败: "任务 XXX 失败:"
task_fail_re = re.compile(r'任务 (\w+) 失败')
# 任务未启用: "任务 XXX 未启用或不存在"
task_skip_re = re.compile(r'任务 (\w+) 未启用或不存在')
# 抓取阶段开始(非工具类 DWS 任务的首条日志)
fetch_start_re = re.compile(r'(\w+): (抓取阶段开始|ODS fetch\+load start)')
# 任务完成统计: "XXX: 完成,统计=" 或 "XXX: 完成, 统计="
task_done_re = re.compile(r'(\w+): 完成[,]\s*统计=')
# ODS 任务完成: "ODS_XXX ODS 任务完成:"
ods_done_re = re.compile(r'(\w+) ODS 任务完成')
# Flow 结束
flow_end_re = re.compile(r'所有任务执行完成|Flow 执行完成')
flow_start_re = re.compile(r'开始执行 Flow: api_full|FLOW_API_FULL.*开始执行')
# 数据结构: task_name -> {layer, start, end, status, slices: [...]}
tasks = OrderedDict()
flow_start = None
flow_end = None
# 跟踪切片开始时间
pending_slices = {} # (task_name, slice_idx) -> (ts, window)
for line in lines:
m = ts_re.match(line)
if not m:
continue
ts = parse_timestamp(m.group(1))
if not ts:
continue
content = line[m.end():].strip()
# 去掉日志级别前缀
content = re.sub(
r'^(INFO|WARNING|ERROR|DEBUG|CRITICAL)\s*\|\s*\w+\s*\|\s*', '', content)
# Flow 开始
if flow_start_re.search(content):
if flow_start is None:
flow_start = ts
# Flow 结束
if flow_end_re.search(content):
flow_end = ts
# 切片开始
sm = slice_start_re.search(content)
if sm:
task_name = sm.group(1)
idx = int(sm.group(2))
total = int(sm.group(3))
window = sm.group(4)
layer = classify_task(task_name)
if task_name not in tasks:
tasks[task_name] = {
'layer': layer, 'start': ts, 'end': None,
'status': 'running', 'total_slices': total, 'slices': []
}
pending_slices[(task_name, idx)] = (ts, window)
continue
# 切片完成
sm = slice_end_re.search(content)
if sm:
task_name = sm.group(1)
idx = int(sm.group(2))
key = (task_name, idx)
if key in pending_slices:
start_ts, window = pending_slices.pop(key)
dur = (ts - start_ts).total_seconds()
if task_name in tasks:
tasks[task_name]['slices'].append({
'idx': idx, 'window': window,
'start': start_ts, 'end': ts, 'duration_sec': dur
})
tasks[task_name]['end'] = ts
continue
# 工具类任务开始
sm = util_start_re.search(content)
if sm:
task_name = sm.group(1)
layer = classify_task(task_name)
if task_name not in tasks:
tasks[task_name] = {
'layer': layer, 'start': ts, 'end': None,
'status': 'running', 'total_slices': 0, 'slices': []
}
else:
tasks[task_name]['start'] = ts
continue
# 工具类任务完成
sm = util_end_re.search(content)
if sm:
task_name = sm.group(1)
success = sm.group(2) == '成功'
if task_name in tasks:
tasks[task_name]['end'] = ts
tasks[task_name]['status'] = 'success' if success else 'failed'
continue
# 抓取阶段开始(作为任务首次出现的标记)
sm = fetch_start_re.search(content)
if sm:
task_name = sm.group(1)
layer = classify_task(task_name)
if task_name not in tasks:
tasks[task_name] = {
'layer': layer, 'start': ts, 'end': None,
'status': 'running', 'total_slices': 0, 'slices': []
}
continue
# 任务完成统计: "XXX: 完成,统计=" 格式
sm = task_done_re.search(content)
if sm:
task_name = sm.group(1)
if task_name in tasks:
tasks[task_name]['end'] = ts
if tasks[task_name]['status'] == 'running':
tasks[task_name]['status'] = 'success'
continue
# ODS 任务完成: "ODS_XXX ODS 任务完成:" 格式
sm = ods_done_re.search(content)
if sm:
task_name = sm.group(1)
if task_name in tasks:
tasks[task_name]['end'] = ts
if tasks[task_name]['status'] == 'running':
tasks[task_name]['status'] = 'success'
continue
# 任务未启用/跳过
sm = task_skip_re.search(content)
if sm:
task_name = sm.group(1)
layer = classify_task(task_name)
tasks[task_name] = {
'layer': layer, 'start': ts, 'end': ts,
'status': 'skipped', 'total_slices': 0, 'slices': []
}
continue
# 任务失败
sm = task_fail_re.search(content)
if sm:
task_name = sm.group(1)
if task_name in tasks:
tasks[task_name]['end'] = ts
tasks[task_name]['status'] = 'failed'
else:
layer = classify_task(task_name)
tasks[task_name] = {
'layer': layer, 'start': ts, 'end': ts,
'status': 'failed', 'total_slices': 0, 'slices': []
}
# 计算每个任务的总耗时
for name, t in tasks.items():
if t['slices']:
t['total_duration_sec'] = sum(
s['duration_sec'] for s in t['slices'])
# 如果所有切片都完成了,标记为成功
if t['status'] == 'running' and t.get('total_slices', 0) > 0:
if len(t['slices']) >= t['total_slices']:
t['status'] = 'success'
# ODS 任务切片全部完成即视为成功total_slices 可能为 0 因为日志格式)
if t['status'] == 'running' and t['layer'] == 'ODS' and len(t['slices']) == 4:
t['status'] = 'success'
elif t['start'] and t['end']:
t['total_duration_sec'] = (t['end'] - t['start']).total_seconds()
else:
t['total_duration_sec'] = 0
return {'flow_start': flow_start, 'flow_end': flow_end, 'tasks': tasks}
def fmt_dur(seconds):
if seconds is None or seconds == 0:
return "<1s"
if seconds < 60:
return f"{seconds:.0f}s"
m = int(seconds // 60)
s = seconds % 60
return f"{m}m{s:.0f}s"
def fmt_ts(ts):
return ts.strftime('%H:%M:%S') if ts else 'N/A'
def generate_report(data):
"""生成 Markdown 计时报告"""
out = []
w = out.append
w("# ETL 执行精细计时报告\n")
w(f"- execution_id: `{EXECUTION_ID}`")
w(f"- Flow 开始: {data['flow_start']}")
w(f"- Flow 结束: {data['flow_end']}")
if data['flow_start'] and data['flow_end']:
total = (data['flow_end'] - data['flow_start']).total_seconds()
w(f"- 总耗时: {fmt_dur(total)}")
w("")
# 按阶段分组
layers = OrderedDict([('ODS', []), ('DWD', []), ('DWS', []), ('INDEX', [])])
for name, t in data['tasks'].items():
layer = t['layer']
if layer in layers:
layers[layer].append((name, t))
all_durations = [] # (name, dur, layer) for Top-5
layer_totals = {}
for layer, task_list in layers.items():
w(f"## {layer} 阶段\n")
if not task_list:
w(f"未检测到 {layer} 任务计时数据\n")
layer_totals[layer] = 0
continue
# 按耗时降序
task_list.sort(key=lambda x: x[1]['total_duration_sec'], reverse=True)
# 区分成功、失败、跳过
success_tasks = [(n, t) for n, t in task_list if t['status'] not in ('failed', 'skipped')]
failed_tasks = [(n, t) for n, t in task_list if t['status'] == 'failed']
skipped_tasks = [(n, t) for n, t in task_list if t['status'] == 'skipped']
layer_dur = 0
if success_tasks:
w("| 任务 | 切片数 | 总耗时 | 开始 | 结束 | 状态 |")
w("|------|--------|--------|------|------|------|")
for name, t in success_tasks:
dur = t['total_duration_sec']
layer_dur += dur
sc = len(t['slices']) or t.get('total_slices', 0)
status = '' if t['status'] == 'success' else ''
w(f"| {name} | {sc} | {fmt_dur(dur)} | {fmt_ts(t['start'])} | {fmt_ts(t['end'])} | {status} |")
all_durations.append((name, dur, layer))
w("")
if failed_tasks:
w(f"### {layer} 失败任务\n")
w("| 任务 | 失败时间 | 状态 |")
w("|------|---------|------|")
for name, t in failed_tasks:
w(f"| {name} | {fmt_ts(t['end'])} | ❌ 失败 |")
all_durations.append((name, 0, layer))
w("")
if skipped_tasks:
w(f"### {layer} 跳过任务\n")
for name, t in skipped_tasks:
w(f"- {name}(未启用)")
w("")
# 阶段总耗时 = 从第一个任务开始到最后一个任务结束
all_starts = [t['start'] for _, t in task_list if t['start']]
all_ends = [t['end'] for _, t in task_list if t['end']]
if all_starts and all_ends:
stage_wall = (max(all_ends) - min(all_starts)).total_seconds()
w(f"{layer} 阶段墙钟耗时(从首个任务开始到末个任务结束): **{fmt_dur(stage_wall)}**")
w(f"{layer} 阶段任务累计耗时: **{fmt_dur(layer_dur)}**")
layer_totals[layer] = stage_wall
else:
layer_totals[layer] = 0
w("")
# 切片详情(仅展示有切片且耗时 > 5s 的任务)
tasks_with_slices = [(n, t) for n, t in task_list
if t['slices'] and t['total_duration_sec'] > 5]
if tasks_with_slices:
w(f"### {layer} 窗口切片详情\n")
for name, t in tasks_with_slices:
w(f"#### {name}\n")
w("| 切片 | 窗口 | 耗时 | 开始 | 结束 |")
w("|------|------|------|------|------|")
for s in sorted(t['slices'], key=lambda x: x['idx']):
total_s = t.get('total_slices', '?')
win_short = s['window'].replace('+08:00', '')
w(f"| {s['idx']}/{total_s} | {win_short} | {fmt_dur(s['duration_sec'])} | {fmt_ts(s['start'])} | {fmt_ts(s['end'])} |")
w("")
# 阶段汇总
w("## 阶段耗时汇总\n")
w("| 阶段 | 墙钟耗时 | 占比 |")
w("|------|---------|------|")
total_wall = sum(layer_totals.values())
for layer in ['ODS', 'DWD', 'DWS', 'INDEX']:
dur = layer_totals.get(layer, 0)
pct = dur / total_wall * 100 if total_wall > 0 else 0
w(f"| {layer} | {fmt_dur(dur)} | {pct:.1f}% |")
w(f"| **合计** | **{fmt_dur(total_wall)}** | **100%** |")
w("")
# Top-5
w("## Top-5 耗时最长的任务\n")
top5 = sorted(all_durations, key=lambda x: x[1], reverse=True)[:5]
w("| 排名 | 任务 | 阶段 | 耗时 |")
w("|------|------|------|------|")
for i, (name, dur, layer) in enumerate(top5, 1):
w(f"| {i} | {name} | {layer} | {fmt_dur(dur)} |")
w("")
# 任务执行总结
w("## 执行统计\n")
total_tasks = len(data['tasks'])
success = sum(1 for t in data['tasks'].values() if t['status'] == 'success')
failed = sum(1 for t in data['tasks'].values() if t['status'] == 'failed')
skipped = sum(1 for t in data['tasks'].values() if t['status'] == 'skipped')
running = sum(1 for t in data['tasks'].values() if t['status'] == 'running')
w(f"- 总任务数: {total_tasks}")
w(f"- 成功: {success}")
w(f"- 失败: {failed}")
if skipped:
w(f"- 跳过(未启用): {skipped}")
if running:
w(f"- 状态不明: {running}")
w("")
if failed:
w("### 失败任务清单\n")
for name, t in data['tasks'].items():
if t['status'] == 'failed':
w(f"- **{name}** ({t['layer']}) — 失败时间 {fmt_ts(t['end'])}")
w("")
return '\n'.join(out)
def main():
print("=" * 60)
print("ETL 执行日志计时数据提取")
print("=" * 60)
# 1. 登录
print("\n[1] 登录获取 token...")
token = login()
print(f" Token: {token[:20]}...")
# 2. 获取日志
print("\n[2] 获取执行日志...")
log_text = get_logs(token)
print(f" 日志长度: {len(log_text)} 字符, {log_text.count(chr(10))}")
if not log_text:
print(" 错误: 日志为空!")
sys.exit(1)
# 保存原始日志
raw_path = os.path.join(os.path.dirname(__file__), '..', '..',
'export', 'temp_raw_execution_log.txt')
os.makedirs(os.path.dirname(raw_path), exist_ok=True)
with open(raw_path, 'w', encoding='utf-8') as f:
f.write(log_text)
print(f" 原始日志已保存: {raw_path}")
# 3. 解析
print("\n[3] 解析计时数据...")
data = extract_timing_data(log_text)
print(f" Flow: {data['flow_start']} ~ {data['flow_end']}")
layers = {}
for t in data['tasks'].values():
layers.setdefault(t['layer'], []).append(t)
for layer in ['ODS', 'DWD', 'DWS', 'INDEX']:
tl = layers.get(layer, [])
ok = sum(1 for t in tl if t['status'] == 'success')
fail = sum(1 for t in tl if t['status'] == 'failed')
print(f" {layer}: {len(tl)} 任务 ({ok} 成功, {fail} 失败)")
# 4. 生成报告
print("\n[4] 生成计时报告...")
report = generate_report(data)
report_path = os.path.join(os.path.dirname(__file__), '..', '..',
'export', 'temp_timing_report.md')
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f" 报告已保存: {report_path}")
# 5. 打印
print("\n" + "=" * 60)
print(report)
if __name__ == '__main__':
main()