576 lines
21 KiB
Python
576 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""DWS 层逐任务调试脚本。
|
||
|
||
连接真实数据库,逐个执行 15 个 DWS 汇总任务,
|
||
验证返回结果和 DWS 表写入情况,抽样检查汇总数据与 DWD 明细数据的一致性。
|
||
|
||
用法:
|
||
cd apps/etl/connectors/feiqiu
|
||
python -m scripts.debug.debug_dws [--hours 48] [--tasks DWS_FINANCE_DAILY,DWS_ASSISTANT_DAILY]
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import sys
|
||
import time
|
||
import traceback
|
||
from dataclasses import asdict, dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from zoneinfo import ZoneInfo
|
||
|
||
# ── 确保项目根目录在 sys.path ──
|
||
_FEIQIU_ROOT = Path(__file__).resolve().parents[2]
|
||
if str(_FEIQIU_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_FEIQIU_ROOT))
|
||
|
||
from config.settings import AppConfig
|
||
from database.connection import DatabaseConnection
|
||
from database.operations import DatabaseOperations
|
||
from api.client import APIClient
|
||
from orchestration.task_registry import default_registry
|
||
from orchestration.cursor_manager import CursorManager
|
||
from orchestration.run_tracker import RunTracker
|
||
from orchestration.task_executor import TaskExecutor
|
||
|
||
|
||
@dataclass
|
||
class DebugResult:
|
||
"""单个 DWS 任务的调试结果"""
|
||
layer: str = "DWS"
|
||
task_code: str = ""
|
||
status: str = "" # PASS / FAIL / WARN / ERROR
|
||
message: str = ""
|
||
counts: dict = field(default_factory=dict)
|
||
target_table: str = ""
|
||
pre_row_count: int | None = None
|
||
post_row_count: int | None = None
|
||
consistency_check: dict | None = None
|
||
duration_sec: float = 0.0
|
||
error_detail: str | None = None
|
||
fix_applied: str | None = None
|
||
|
||
|
||
# ── 工具函数 ──────────────────────────────────────────────────
|
||
|
||
def _setup_logging() -> logging.Logger:
|
||
logger = logging.getLogger("debug_dws")
|
||
logger.setLevel(logging.INFO)
|
||
if not logger.handlers:
|
||
handler = logging.StreamHandler(sys.stdout)
|
||
handler.setFormatter(logging.Formatter(
|
||
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S"
|
||
))
|
||
logger.addHandler(handler)
|
||
return logger
|
||
|
||
|
||
def _build_components(config: AppConfig, logger: logging.Logger):
|
||
"""构建 DB / API / TaskExecutor 等组件。"""
|
||
db_conn = DatabaseConnection(
|
||
dsn=config["db"]["dsn"],
|
||
session=config["db"].get("session"),
|
||
connect_timeout=config["db"].get("connect_timeout_sec"),
|
||
)
|
||
api_client = APIClient(
|
||
base_url=config["api"]["base_url"],
|
||
token=config["api"]["token"],
|
||
timeout=config["api"].get("timeout_sec", 20),
|
||
retry_max=config["api"].get("retries", {}).get("max_attempts", 3),
|
||
headers_extra=config["api"].get("headers_extra"),
|
||
)
|
||
db_ops = DatabaseOperations(db_conn)
|
||
cursor_mgr = CursorManager(db_conn)
|
||
run_tracker = RunTracker(db_conn)
|
||
|
||
executor = TaskExecutor(
|
||
config, db_ops, api_client,
|
||
cursor_mgr, run_tracker, default_registry, logger,
|
||
)
|
||
return db_conn, api_client, db_ops, executor
|
||
|
||
|
||
def _get_dws_target_table(task_code: str, config, db_conn, api_client, logger) -> str | None:
|
||
"""通过临时实例获取 DWS 任务的目标表名。"""
|
||
meta = default_registry.get_metadata(task_code)
|
||
if meta is None:
|
||
return None
|
||
try:
|
||
task_instance = meta.task_class(config, db_conn, api_client, logger)
|
||
raw_name = task_instance.get_target_table()
|
||
# 目标表名不含 schema 前缀时补上 dws.
|
||
if raw_name and "." not in raw_name:
|
||
return f"dws.{raw_name}"
|
||
return raw_name
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _query_count(db_conn: DatabaseConnection, table: str) -> int:
|
||
"""查询表的总行数。"""
|
||
rows = db_conn.query(f"SELECT COUNT(*) AS cnt FROM {table}")
|
||
return int(rows[0]["cnt"]) if rows else 0
|
||
|
||
|
||
def _has_column(db_conn: DatabaseConnection, table: str, column: str) -> bool:
|
||
"""检查表是否包含指定列。"""
|
||
sql = """
|
||
SELECT 1 FROM information_schema.columns
|
||
WHERE table_schema || '.' || table_name = %s
|
||
AND column_name = %s
|
||
LIMIT 1
|
||
"""
|
||
rows = db_conn.query(sql, (table, column))
|
||
return bool(rows)
|
||
|
||
|
||
def _table_exists(db_conn: DatabaseConnection, table: str) -> bool:
|
||
"""检查表/视图是否存在。"""
|
||
rows = db_conn.query("SELECT to_regclass(%s) AS reg", (table,))
|
||
return bool(rows and rows[0].get("reg"))
|
||
|
||
|
||
# ── DWS 与 DWD 一致性抽样验证 ────────────────────────────────
|
||
|
||
# 已知的 DWS→DWD 聚合关系映射(用于抽样验证)
|
||
# 格式: dws_table -> {dwd_source, dws_date_col, dwd_date_col, amount_cols}
|
||
_DWS_DWD_CONSISTENCY_MAP: dict[str, dict] = {
|
||
"dws.dws_assistant_daily_detail": {
|
||
"dwd_source": "dwd.dwd_assistant_service_log",
|
||
"dws_date_col": "stat_date",
|
||
"dwd_date_col": "service_date",
|
||
"group_cols": ["site_id", "assistant_id"],
|
||
"dws_count_col": "service_count",
|
||
"dwd_count_expr": "COUNT(*)",
|
||
"description": "助教日度服务次数 vs DWD 服务流水",
|
||
},
|
||
"dws.dws_finance_daily_summary": {
|
||
"dwd_source": "dwd.dwd_order",
|
||
"dws_date_col": "stat_date",
|
||
"dwd_date_col": "order_date",
|
||
"group_cols": ["site_id"],
|
||
"dws_count_col": "order_count",
|
||
"dwd_count_expr": "COUNT(*)",
|
||
"description": "财务日度订单数 vs DWD 订单表",
|
||
},
|
||
"dws.dws_member_visit_detail": {
|
||
"dwd_source": "dwd.dwd_order",
|
||
"dws_date_col": "visit_date",
|
||
"dwd_date_col": "order_date",
|
||
"group_cols": ["site_id", "member_id"],
|
||
"dws_count_col": None, # 无直接计数列,仅做行数对比
|
||
"dwd_count_expr": None,
|
||
"description": "会员到店明细 vs DWD 订单表",
|
||
},
|
||
}
|
||
|
||
|
||
def _check_dws_dwd_consistency(
|
||
db_conn: DatabaseConnection,
|
||
dws_table: str,
|
||
logger: logging.Logger,
|
||
) -> dict:
|
||
"""抽样验证 DWS 汇总数据与 DWD 明细数据的一致性。
|
||
|
||
对已知映射关系的表,抽取最近 3 天的数据做聚合对比。
|
||
对未知映射的表,仅做基本行数检查。
|
||
"""
|
||
result = {"checks": [], "has_mapping": False}
|
||
|
||
mapping = _DWS_DWD_CONSISTENCY_MAP.get(dws_table)
|
||
if not mapping:
|
||
result["checks"].append("ℹ 无已知 DWS→DWD 映射,跳过一致性验证")
|
||
return result
|
||
|
||
result["has_mapping"] = True
|
||
result["description"] = mapping["description"]
|
||
dwd_source = mapping["dwd_source"]
|
||
dws_date_col = mapping["dws_date_col"]
|
||
dwd_date_col = mapping["dwd_date_col"]
|
||
|
||
# 检查 DWD 源表是否存在
|
||
if not _table_exists(db_conn, dwd_source):
|
||
result["checks"].append(f"⚠ DWD 源表不存在: {dwd_source}")
|
||
return result
|
||
|
||
# 抽样:取 DWS 表中最近 3 个不同日期
|
||
try:
|
||
sample_sql = f"""
|
||
SELECT DISTINCT "{dws_date_col}" AS d
|
||
FROM {dws_table}
|
||
ORDER BY d DESC
|
||
LIMIT 3
|
||
"""
|
||
date_rows = db_conn.query(sample_sql)
|
||
if not date_rows:
|
||
result["checks"].append("ℹ DWS 表无数据,跳过一致性验证")
|
||
return result
|
||
sample_dates = [r["d"] for r in date_rows]
|
||
except Exception as exc:
|
||
result["checks"].append(f"✗ 查询 DWS 日期失败: {exc}")
|
||
return result
|
||
|
||
# 对比每个抽样日期的行数
|
||
mismatches = []
|
||
for sample_date in sample_dates:
|
||
try:
|
||
dws_count_sql = f"""
|
||
SELECT COUNT(*) AS cnt FROM {dws_table}
|
||
WHERE "{dws_date_col}" = %s
|
||
"""
|
||
dws_rows = db_conn.query(dws_count_sql, (sample_date,))
|
||
dws_count = int(dws_rows[0]["cnt"]) if dws_rows else 0
|
||
|
||
# DWD 侧:检查对应日期列是否存在
|
||
if not _has_column(db_conn, dwd_source, dwd_date_col):
|
||
result["checks"].append(f"⚠ DWD 表缺少日期列 {dwd_date_col}")
|
||
break
|
||
|
||
dwd_count_sql = f"""
|
||
SELECT COUNT(*) AS cnt FROM {dwd_source}
|
||
WHERE "{dwd_date_col}" = %s
|
||
"""
|
||
dwd_rows = db_conn.query(dwd_count_sql, (sample_date,))
|
||
dwd_count = int(dwd_rows[0]["cnt"]) if dwd_rows else 0
|
||
|
||
# DWS 是聚合表,行数通常 <= DWD 行数(按 group_cols 聚合)
|
||
if dws_count > 0 and dwd_count == 0:
|
||
mismatches.append(
|
||
f"日期 {sample_date}: DWS={dws_count} 但 DWD=0(DWD 无对应数据)"
|
||
)
|
||
elif dws_count == 0 and dwd_count > 0:
|
||
mismatches.append(
|
||
f"日期 {sample_date}: DWS=0 但 DWD={dwd_count}(DWS 未汇总)"
|
||
)
|
||
else:
|
||
result["checks"].append(
|
||
f"✓ 日期 {sample_date}: DWS={dws_count}行, DWD={dwd_count}行"
|
||
)
|
||
except Exception as exc:
|
||
result["checks"].append(f"✗ 日期 {sample_date} 对比失败: {exc}")
|
||
|
||
if mismatches:
|
||
result["checks"].extend(f"⚠ {m}" for m in mismatches)
|
||
result["mismatch_count"] = len(mismatches)
|
||
else:
|
||
result["mismatch_count"] = 0
|
||
|
||
return result
|
||
|
||
|
||
# ── 核心调试逻辑 ──────────────────────────────────────────────
|
||
|
||
def debug_single_dws_task(
|
||
task_code: str,
|
||
executor: TaskExecutor,
|
||
db_conn: DatabaseConnection,
|
||
config: AppConfig,
|
||
api_client,
|
||
logger: logging.Logger,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> DebugResult:
|
||
"""执行单个 DWS 任务并验证结果。"""
|
||
result = DebugResult(task_code=task_code)
|
||
|
||
# 获取目标表名
|
||
target_table = _get_dws_target_table(task_code, config, db_conn, api_client, logger)
|
||
result.target_table = target_table or ""
|
||
|
||
store_id = int(config.get("app.store_id"))
|
||
run_uuid = f"debug-dws-{task_code.lower()}-{int(time.time())}"
|
||
|
||
logger.info("━" * 60)
|
||
logger.info("▶ 开始调试: %s (表: %s)", task_code, target_table or "未知")
|
||
|
||
# 执行前查询表行数
|
||
if target_table and _table_exists(db_conn, target_table):
|
||
try:
|
||
result.pre_row_count = _query_count(db_conn, target_table)
|
||
logger.info(" 执行前表行数: %d", result.pre_row_count)
|
||
except Exception as exc:
|
||
logger.warning(" 查询执行前行数失败: %s", exc)
|
||
|
||
# 执行任务
|
||
t0 = time.monotonic()
|
||
try:
|
||
task_result = executor.run_single_task(
|
||
task_code=task_code,
|
||
run_uuid=run_uuid,
|
||
store_id=store_id,
|
||
data_source="online",
|
||
)
|
||
result.duration_sec = round(time.monotonic() - t0, 2)
|
||
except Exception as exc:
|
||
result.duration_sec = round(time.monotonic() - t0, 2)
|
||
result.status = "ERROR"
|
||
result.message = f"任务执行异常: {exc}"
|
||
result.error_detail = traceback.format_exc()
|
||
logger.error(" ✗ 执行异常: %s", exc)
|
||
return result
|
||
|
||
# 解析返回结果
|
||
task_status = (task_result.get("status") or "").upper()
|
||
counts = task_result.get("counts") or {}
|
||
result.counts = counts
|
||
|
||
logger.info(" 返回状态: %s", task_status)
|
||
logger.info(" counts: %s", counts)
|
||
|
||
# 执行后查询表行数
|
||
if target_table and _table_exists(db_conn, target_table):
|
||
try:
|
||
result.post_row_count = _query_count(db_conn, target_table)
|
||
logger.info(" 执行后表行数: %d", result.post_row_count)
|
||
|
||
if result.pre_row_count is not None:
|
||
delta = result.post_row_count - result.pre_row_count
|
||
logger.info(" 行数变化: %+d", delta)
|
||
except Exception as exc:
|
||
logger.warning(" 查询执行后行数失败: %s", exc)
|
||
|
||
# 抽样验证 DWS 与 DWD 一致性
|
||
if target_table and _table_exists(db_conn, target_table):
|
||
try:
|
||
consistency = _check_dws_dwd_consistency(db_conn, target_table, logger)
|
||
result.consistency_check = consistency
|
||
for check in consistency.get("checks", []):
|
||
logger.info(" 一致性: %s", check)
|
||
except Exception as exc:
|
||
logger.warning(" ⚠ 一致性检查异常: %s", exc)
|
||
|
||
# 最终状态判定
|
||
issues = []
|
||
errors_count = counts.get("errors", 0)
|
||
if errors_count:
|
||
issues.append(f"执行有 {errors_count} 个错误")
|
||
|
||
if result.consistency_check and result.consistency_check.get("mismatch_count", 0) > 0:
|
||
issues.append(f"一致性检查有 {result.consistency_check['mismatch_count']} 处不一致")
|
||
|
||
if result.post_row_count is not None and result.post_row_count == 0:
|
||
issues.append("执行后表为空")
|
||
|
||
if issues:
|
||
result.status = "WARN"
|
||
result.message = "; ".join(issues)
|
||
elif task_status in ("SUCCESS", "PARTIAL", "COMPLETE"):
|
||
result.status = "PASS"
|
||
result.message = f"执行成功, counts={counts}"
|
||
elif task_status == "SKIP":
|
||
result.status = "WARN"
|
||
result.message = "任务被跳过(未启用或不存在)"
|
||
else:
|
||
result.status = "WARN"
|
||
result.message = f"未知状态: {task_status}"
|
||
|
||
icon = {"PASS": "✓", "WARN": "⚠", "ERROR": "✗", "FAIL": "✗"}.get(result.status, "?")
|
||
logger.info(" %s 结果: %s - %s (耗时 %.1fs)", icon, result.status, result.message, result.duration_sec)
|
||
return result
|
||
|
||
|
||
# ── 主流程 ────────────────────────────────────────────────────
|
||
|
||
def run_dws_debug(
|
||
hours: float = 48.0,
|
||
task_filter: list[str] | None = None,
|
||
) -> list[DebugResult]:
|
||
"""执行 DWS 层全量调试。
|
||
|
||
Args:
|
||
hours: 回溯窗口小时数(默认 48 小时,DWS 汇总通常按天粒度)
|
||
task_filter: 仅调试指定的任务代码列表,None 表示全部
|
||
Returns:
|
||
所有任务的 DebugResult 列表
|
||
"""
|
||
logger = _setup_logging()
|
||
logger.info("=" * 60)
|
||
logger.info("DWS 层调试开始")
|
||
logger.info("=" * 60)
|
||
|
||
# 加载配置(从 .env)
|
||
config = AppConfig.load()
|
||
tz = ZoneInfo(config.get("app.timezone", "Asia/Shanghai"))
|
||
window_end = datetime.now(tz)
|
||
window_start = window_end - timedelta(hours=hours)
|
||
|
||
logger.info("门店 ID: %s", config.get("app.store_id"))
|
||
logger.info("数据库: %s", config.get("db.name", ""))
|
||
logger.info("API: %s", config.get("api.base_url", ""))
|
||
logger.info("时间窗口: %s ~ %s (%.1f 小时)", window_start, window_end, hours)
|
||
|
||
# 设置 window_override 让所有任务使用统一窗口
|
||
config.config.setdefault("run", {}).setdefault("window_override", {})
|
||
config.config["run"]["window_override"]["start"] = window_start
|
||
config.config["run"]["window_override"]["end"] = window_end
|
||
|
||
# 构建组件
|
||
db_conn, api_client, db_ops, executor = _build_components(config, logger)
|
||
|
||
# 获取所有 DWS 层任务
|
||
all_dws_codes = sorted(default_registry.get_tasks_by_layer("DWS"))
|
||
if task_filter:
|
||
filter_set = {t.upper() for t in task_filter}
|
||
dws_codes = [c for c in all_dws_codes if c in filter_set]
|
||
skipped = filter_set - set(dws_codes)
|
||
if skipped:
|
||
logger.warning("以下任务不在 DWS 层注册表中,已跳过: %s", skipped)
|
||
else:
|
||
dws_codes = all_dws_codes
|
||
|
||
logger.info("待调试 DWS 任务: %d 个", len(dws_codes))
|
||
logger.info("任务列表: %s", ", ".join(dws_codes))
|
||
logger.info("")
|
||
|
||
# 逐个执行
|
||
results: list[DebugResult] = []
|
||
for idx, task_code in enumerate(dws_codes, start=1):
|
||
logger.info("[%d/%d] %s", idx, len(dws_codes), task_code)
|
||
try:
|
||
r = debug_single_dws_task(
|
||
task_code=task_code,
|
||
executor=executor,
|
||
db_conn=db_conn,
|
||
config=config,
|
||
api_client=api_client,
|
||
logger=logger,
|
||
window_start=window_start,
|
||
window_end=window_end,
|
||
)
|
||
except Exception as exc:
|
||
r = DebugResult(
|
||
task_code=task_code,
|
||
status="ERROR",
|
||
message=f"未捕获异常: {exc}",
|
||
error_detail=traceback.format_exc(),
|
||
)
|
||
logger.error(" ✗ 未捕获异常: %s", exc)
|
||
results.append(r)
|
||
|
||
# 确保连接可用
|
||
db_conn.ensure_open()
|
||
|
||
# 汇总
|
||
_print_summary(results, logger)
|
||
|
||
# 输出 JSON 结果
|
||
output_dir = _FEIQIU_ROOT / "scripts" / "debug" / "output"
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now(tz).strftime("%Y%m%d_%H%M%S")
|
||
output_file = output_dir / f"debug_dws_{ts}.json"
|
||
_save_results(results, output_file)
|
||
logger.info("结果已保存: %s", output_file)
|
||
|
||
# 清理
|
||
db_conn.close()
|
||
return results
|
||
|
||
|
||
# ── 汇总与输出 ────────────────────────────────────────────────
|
||
|
||
def _print_summary(results: list[DebugResult], logger: logging.Logger):
|
||
"""打印调试汇总。"""
|
||
logger.info("")
|
||
logger.info("=" * 60)
|
||
logger.info("DWS 层调试汇总")
|
||
logger.info("=" * 60)
|
||
|
||
pass_count = sum(1 for r in results if r.status == "PASS")
|
||
warn_count = sum(1 for r in results if r.status == "WARN")
|
||
error_count = sum(1 for r in results if r.status in ("ERROR", "FAIL"))
|
||
total_duration = sum(r.duration_sec for r in results)
|
||
|
||
logger.info("总计: %d 个任务", len(results))
|
||
logger.info(" ✓ PASS: %d", pass_count)
|
||
logger.info(" ⚠ WARN: %d", warn_count)
|
||
logger.info(" ✗ ERROR: %d", error_count)
|
||
logger.info(" 总耗时: %.1f 秒", total_duration)
|
||
logger.info("")
|
||
|
||
# 按任务类型分组统计
|
||
regular_tasks = [r for r in results if not r.task_code.startswith("DWS_MV_")
|
||
and r.task_code != "DWS_RETENTION_CLEANUP"
|
||
and r.task_code != "DWS_BUILD_ORDER_SUMMARY"]
|
||
mv_tasks = [r for r in results if r.task_code.startswith("DWS_MV_")]
|
||
utility_tasks = [r for r in results if r.task_code in ("DWS_RETENTION_CLEANUP", "DWS_BUILD_ORDER_SUMMARY")]
|
||
|
||
if regular_tasks:
|
||
logger.info("业务汇总任务: %d 个 (PASS=%d, WARN=%d, ERROR=%d)",
|
||
len(regular_tasks),
|
||
sum(1 for r in regular_tasks if r.status == "PASS"),
|
||
sum(1 for r in regular_tasks if r.status == "WARN"),
|
||
sum(1 for r in regular_tasks if r.status in ("ERROR", "FAIL")))
|
||
if mv_tasks:
|
||
logger.info("物化视图刷新: %d 个 (PASS=%d, WARN=%d, ERROR=%d)",
|
||
len(mv_tasks),
|
||
sum(1 for r in mv_tasks if r.status == "PASS"),
|
||
sum(1 for r in mv_tasks if r.status == "WARN"),
|
||
sum(1 for r in mv_tasks if r.status in ("ERROR", "FAIL")))
|
||
if utility_tasks:
|
||
logger.info("工具类任务: %d 个 (PASS=%d, WARN=%d, ERROR=%d)",
|
||
len(utility_tasks),
|
||
sum(1 for r in utility_tasks if r.status == "PASS"),
|
||
sum(1 for r in utility_tasks if r.status == "WARN"),
|
||
sum(1 for r in utility_tasks if r.status in ("ERROR", "FAIL")))
|
||
|
||
# 列出非 PASS 的任务
|
||
non_pass = [r for r in results if r.status != "PASS"]
|
||
if non_pass:
|
||
logger.info("")
|
||
logger.info("需关注的任务:")
|
||
for r in non_pass:
|
||
logger.info(" [%s] %s: %s", r.status, r.task_code, r.message)
|
||
else:
|
||
logger.info("")
|
||
logger.info("所有任务均通过 ✓")
|
||
|
||
|
||
def _save_results(results: list[DebugResult], path: Path):
|
||
"""将结果序列化为 JSON。"""
|
||
data = [_sanitize_for_json(asdict(r)) for r in results]
|
||
path.write_text(
|
||
json.dumps(data, ensure_ascii=False, indent=2, default=str),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
|
||
def _sanitize_for_json(obj):
|
||
"""递归处理不可序列化的值。"""
|
||
if isinstance(obj, dict):
|
||
return {k: _sanitize_for_json(v) for k, v in obj.items()}
|
||
if isinstance(obj, (list, tuple)):
|
||
return [_sanitize_for_json(v) for v in obj]
|
||
if isinstance(obj, datetime):
|
||
return obj.isoformat()
|
||
return obj
|
||
|
||
|
||
# ── CLI 入口 ──────────────────────────────────────────────────
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(description="DWS 层逐任务调试")
|
||
parser.add_argument("--hours", type=float, default=48.0,
|
||
help="回溯窗口小时数(默认 48,DWS 按天粒度汇总)")
|
||
parser.add_argument("--tasks", type=str, default=None,
|
||
help="仅调试指定任务,逗号分隔(如 DWS_FINANCE_DAILY,DWS_ASSISTANT_DAILY)")
|
||
return parser.parse_args()
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
task_filter = None
|
||
if args.tasks:
|
||
task_filter = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
|
||
|
||
results = run_dws_debug(hours=args.hours, task_filter=task_filter)
|
||
|
||
# 退出码: 有 ERROR 则非零
|
||
has_error = any(r.status in ("ERROR", "FAIL") for r in results)
|
||
sys.exit(1 if has_error else 0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|