Files
ZQYY.FQ-ETL/cli/main.py

505 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""CLI主入口
支持两种执行模式:
1. 传统模式:指定任务列表直接执行
2. 管道模式:指定管道类型和处理模式,执行多层 ETL
处理模式说明:
- increment_only仅增量 - 只执行增量数据处理
- verify_only校验并修复 - 跳过增量,直接校验数据一致性并自动补齐
- 可选 --fetch-before-verify校验前先从 API 获取数据
- increment_verify增量+校验并修复 - 先增量处理,再校验补齐
示例:
# 传统模式
python -m cli.main --tasks ODS_MEMBER,ODS_ORDER
# 管道模式(仅增量)
python -m cli.main --pipeline api_full --processing-mode increment_only
# 管道模式(校验并修复,跳过增量)
python -m cli.main --pipeline api_full --processing-mode verify_only
# 管道模式(校验并修复,校验前先从 API 获取数据)
python -m cli.main --pipeline api_full --processing-mode verify_only --fetch-before-verify
# 管道模式(增量+校验并修复)
python -m cli.main --pipeline api_full --processing-mode increment_verify
# 带时间窗口的管道模式
python -m cli.main --pipeline api_ods_dwd --window-start "2026-02-01" --window-end "2026-02-02"
"""
import sys
import argparse
import logging
from datetime import datetime
from pathlib import Path
from config.settings import AppConfig
from orchestration.scheduler import ETLScheduler # 保留task 9 处理薄包装层
# 新架构依赖
from database.connection import DatabaseConnection
from database.operations import DatabaseOperations
from orchestration.cursor_manager import CursorManager
from orchestration.run_tracker import RunTracker
from orchestration.task_registry import default_registry
from orchestration.task_executor import TaskExecutor
from orchestration.pipeline_runner import PipelineRunner
from api.client import APIClient
# 管道选项
PIPELINE_CHOICES = [
"api_ods", # API → ODS
"api_ods_dwd", # API → ODS → DWD
"api_full", # API → ODS → DWD → DWS汇总 → DWS指数
"ods_dwd", # ODS → DWD
"dwd_dws", # DWD → DWS汇总
"dwd_dws_index", # DWD → DWS汇总 → DWS指数
"dwd_index", # DWD → DWS指数
]
# 处理模式选项
PROCESSING_MODE_CHOICES = [
"increment_only", # 仅增量
"verify_only", # 校验并修复(跳过增量)
"increment_verify", # 增量 + 校验并修复
]
# 时间窗口切分选项
WINDOW_SPLIT_CHOICES = ["none", "day", "week", "month"]
def setup_logging():
"""设置日志(使用统一格式)"""
try:
from utils.logging_utils import UNIFIED_FORMAT, DATE_FORMAT
fmt = UNIFIED_FORMAT
datefmt = DATE_FORMAT
except ImportError:
fmt = "[%(asctime)s] %(levelname)-5s | %(name)s | %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(level=logging.INFO, format=fmt, datefmt=datefmt)
return logging.getLogger("etl_billiards")
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="台球场ETL系统",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 传统任务模式
python -m cli.main --tasks ODS_MEMBER,ODS_ORDER --store-id 1
# 管道模式(仅增量)
python -m cli.main --pipeline api_ods_dwd --processing-mode increment_only
# 管道模式(校验并修复,跳过增量)
python -m cli.main --pipeline api_full --processing-mode verify_only
# 管道模式(校验并修复,先从 API 获取数据)
python -m cli.main --pipeline api_full --processing-mode verify_only --fetch-before-verify
# 管道模式(增量+校验并修复)
python -m cli.main --pipeline api_full --processing-mode increment_verify
# 指定时间窗口
python -m cli.main --pipeline api_ods --window-start "2026-02-01" --window-end "2026-02-02"
""",
)
# 基本参数
parser.add_argument("--store-id", type=int, help="门店ID")
parser.add_argument("--tasks", help="任务列表,逗号分隔(传统模式)")
parser.add_argument("--dry-run", action="store_true", help="试运行(不提交)")
# 管道参数(新增)
parser.add_argument(
"--pipeline",
choices=PIPELINE_CHOICES,
help="管道类型api_ods, api_ods_dwd, api_full, ods_dwd, dwd_dws, dwd_dws_index, dwd_index",
)
parser.add_argument(
"--processing-mode",
dest="processing_mode",
choices=PROCESSING_MODE_CHOICES,
default="increment_only",
help="处理模式increment_only仅增量/ verify_only校验并修复/ increment_verify增量+校验并修复)",
)
parser.add_argument(
"--fetch-before-verify",
dest="fetch_before_verify",
action="store_true",
help="校验前先从 API 获取数据(仅在 verify_only 模式下有效)",
)
parser.add_argument(
"--verify-tables",
dest="verify_tables",
help="仅校验指定表(逗号分隔),用于单表验证",
)
parser.add_argument(
"--window-split",
dest="window_split",
choices=WINDOW_SPLIT_CHOICES,
default="none",
help="时间窗口切分none不切分/ day / week / month",
)
parser.add_argument(
"--lookback-hours",
dest="lookback_hours",
type=int,
default=24,
help="回溯小时数默认24小时",
)
parser.add_argument(
"--overlap-seconds",
dest="overlap_seconds",
type=int,
default=3600,
help="冗余秒数默认3600秒=1小时",
)
# 数据库参数
parser.add_argument("--pg-dsn", help="PostgreSQL DSN")
parser.add_argument("--pg-host", help="PostgreSQL主机")
parser.add_argument("--pg-port", type=int, help="PostgreSQL端口")
parser.add_argument("--pg-name", help="PostgreSQL数据库名")
parser.add_argument("--pg-user", help="PostgreSQL用户名")
parser.add_argument("--pg-password", help="PostgreSQL密码")
# API参数
parser.add_argument("--api-base", help="API基础URL")
parser.add_argument("--api-token", "--token", dest="api_token", help="API令牌Bearer Token")
parser.add_argument("--api-timeout", type=int, help="API超时(秒)")
parser.add_argument("--api-page-size", type=int, help="分页大小")
parser.add_argument("--api-retry-max", type=int, help="API重试最大次数")
# 回溯/手动窗口
parser.add_argument(
"--window-start",
dest="window_start",
help="固定时间窗口开始优先级高于游标例如2025-07-01 00:00:00",
)
parser.add_argument(
"--window-end",
dest="window_end",
help="固定时间窗口结束(优先级高于游标,推荐用月末+1例如2025-08-01 00:00:00",
)
parser.add_argument(
"--force-window-override",
action="store_true",
help="强制使用 window_start/window_end不走 MAX(fetched_at) 兜底",
)
parser.add_argument(
"--window-split-unit",
dest="window_split_unit",
help="窗口切分单位day/week/month/none默认来自配置 run.window_split.unit",
)
parser.add_argument(
"--window-split-days",
dest="window_split_days",
type=int,
choices=[1, 10, 30],
help="按天切分的天数1/10/30默认来自配置 run.window_split.days",
)
parser.add_argument(
"--window-compensation-hours",
dest="window_compensation_hours",
type=int,
help="窗口前后补偿小时数,默认来自配置 run.window_split.compensation_hours",
)
# 目录参数
parser.add_argument("--export-root", help="导出根目录")
parser.add_argument("--log-root", help="日志根目录")
# 数据源模式(新参数,替代 --pipeline-flow
parser.add_argument(
"--data-source",
dest="data_source",
choices=["online", "offline", "hybrid"],
default=None,
help="数据源模式online仅在线抓取/ offline仅本地入库/ hybrid抓取+入库)",
)
# 抓取/清洗管线(--pipeline-flow 已弃用,请使用 --data-source
parser.add_argument("--pipeline-flow", choices=["FULL", "FETCH_ONLY", "INGEST_ONLY"], help="[已弃用] 请使用 --data-source")
parser.add_argument("--fetch-root", help="抓取JSON输出根目录")
parser.add_argument("--ingest-source", help="本地清洗入库源目录")
parser.add_argument("--write-pretty-json", action="store_true", help="抓取JSON美化输出")
# 运行窗口
parser.add_argument("--idle-start", help="闲时窗口开始(HH:MM)")
parser.add_argument("--idle-end", help="闲时窗口结束(HH:MM)")
parser.add_argument("--allow-empty-advance", action="store_true", help="允许空结果推进窗口")
return parser.parse_args()
def resolve_data_source(args) -> str:
"""解析 data_source 参数,处理旧参数 --pipeline-flow 的弃用映射。
优先级:--data-source > --pipeline-flow > 默认值 hybrid
"""
_FLOW_TO_DATA_SOURCE = {
"FULL": "hybrid",
"FETCH_ONLY": "online",
"INGEST_ONLY": "offline",
}
if args.data_source:
return args.data_source
if args.pipeline_flow:
import warnings
mapped = _FLOW_TO_DATA_SOURCE.get(args.pipeline_flow.upper(), "hybrid")
warnings.warn(
f"--pipeline-flow 已弃用,请使用 --data-source {mapped}",
DeprecationWarning,
stacklevel=2,
)
return mapped
return "hybrid" # 默认值
def build_cli_overrides(args) -> dict:
"""从命令行参数构建配置覆盖"""
overrides = {}
# 基本信息
if args.store_id is not None:
overrides.setdefault("app", {})["store_id"] = args.store_id
# 数据库
if args.pg_dsn:
overrides.setdefault("db", {})["dsn"] = args.pg_dsn
if args.pg_host:
overrides.setdefault("db", {})["host"] = args.pg_host
if args.pg_port:
overrides.setdefault("db", {})["port"] = args.pg_port
if args.pg_name:
overrides.setdefault("db", {})["name"] = args.pg_name
if args.pg_user:
overrides.setdefault("db", {})["user"] = args.pg_user
if args.pg_password:
overrides.setdefault("db", {})["password"] = args.pg_password
# API
if args.api_base:
overrides.setdefault("api", {})["base_url"] = args.api_base
if args.api_token:
overrides.setdefault("api", {})["token"] = args.api_token
if args.api_timeout:
overrides.setdefault("api", {})["timeout_sec"] = args.api_timeout
if args.api_page_size:
overrides.setdefault("api", {})["page_size"] = args.api_page_size
if args.api_retry_max:
overrides.setdefault("api", {}).setdefault("retries", {})["max_attempts"] = args.api_retry_max
# 目录
if args.export_root:
overrides.setdefault("io", {})["export_root"] = args.export_root
if args.log_root:
overrides.setdefault("io", {})["log_root"] = args.log_root
# 抓取/清洗管线(旧参数保留向后兼容)
if args.pipeline_flow:
overrides.setdefault("pipeline", {})["flow"] = args.pipeline_flow.upper()
# 数据源模式(新参数)
data_source = resolve_data_source(args)
overrides.setdefault("run", {})["data_source"] = data_source
if args.fetch_root:
overrides.setdefault("pipeline", {})["fetch_root"] = args.fetch_root
if args.ingest_source:
overrides.setdefault("pipeline", {})["ingest_source_dir"] = args.ingest_source
if args.write_pretty_json:
overrides.setdefault("io", {})["write_pretty_json"] = True
# 回溯/手动窗口
if args.window_start or args.window_end:
overrides.setdefault("run", {}).setdefault("window_override", {})
if args.window_start:
overrides["run"]["window_override"]["start"] = args.window_start
if args.window_end:
overrides["run"]["window_override"]["end"] = args.window_end
if args.force_window_override:
overrides.setdefault("run", {})["force_window_override"] = True
if args.window_split_unit:
overrides.setdefault("run", {}).setdefault("window_split", {})["unit"] = args.window_split_unit
if args.window_split_days is not None:
overrides.setdefault("run", {}).setdefault("window_split", {})["days"] = args.window_split_days
if args.window_compensation_hours is not None:
overrides.setdefault("run", {}).setdefault("window_split", {})[
"compensation_hours"
] = args.window_compensation_hours
# 运行窗口
if args.idle_start:
overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start
if args.idle_end:
overrides.setdefault("run", {}).setdefault("idle_window", {})["end"] = args.idle_end
if args.allow_empty_advance:
overrides.setdefault("run", {})["allow_empty_result_advance"] = True
# 任务
if args.tasks:
tasks = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
overrides.setdefault("run", {})["tasks"] = tasks
return overrides
def parse_datetime(s: str) -> datetime:
"""解析日期时间字符串"""
if not s:
return None
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d",
"%Y/%m/%d %H:%M:%S",
"%Y/%m/%d",
]
for fmt in formats:
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
raise ValueError(f"无法解析日期时间: {s}")
def main():
"""主函数
资源生命周期由 CLI 层统一管理try/finally
TaskExecutor / PipelineRunner 通过依赖注入接收已创建的资源。
"""
logger = setup_logging()
args = parse_args()
try:
# 加载配置
cli_overrides = build_cli_overrides(args)
config = AppConfig.load(cli_overrides)
logger.info("配置加载完成")
logger.info("门店ID: %s", config.get('app.store_id'))
# ── 创建资源 ──────────────────────────────────────────
db_conn = DatabaseConnection(
dsn=config["db"]["dsn"],
session=config["db"].get("session"),
connect_timeout=config["db"].get("connect_timeout_sec"),
)
api_client = APIClient(
base_url=config["api"]["base_url"],
token=config["api"]["token"],
timeout=config["api"].get("timeout_sec", 20),
retry_max=config["api"].get("retries", {}).get("max_attempts", 3),
headers_extra=config["api"].get("headers_extra"),
)
try:
# ── 组装依赖 ──────────────────────────────────────
db_ops = DatabaseOperations(db_conn)
cursor_mgr = CursorManager(db_conn)
run_tracker = RunTracker(db_conn)
registry = default_registry
executor = TaskExecutor(
config, db_ops, api_client,
cursor_mgr, run_tracker, registry, logger,
)
data_source = resolve_data_source(args)
# ── 判断执行模式 ──────────────────────────────────
if args.pipeline:
# 管道模式
logger.info("执行模式: 管道模式")
logger.info("管道类型: %s", args.pipeline)
logger.info("处理模式: %s", args.processing_mode)
# 解析时间窗口
window_start = None
window_end = None
if args.window_start:
window_start = parse_datetime(args.window_start)
if args.window_end:
window_end = parse_datetime(args.window_end)
# 如果没有指定时间窗口,使用回溯
if window_start is None and window_end is None:
from datetime import timedelta
from zoneinfo import ZoneInfo
tz = ZoneInfo(config.get("app.timezone", "Asia/Shanghai"))
window_end = datetime.now(tz)
window_start = window_end - timedelta(hours=args.lookback_hours)
logger.info("使用回溯时间窗口: %s ~ %s", window_start, window_end)
# 将回溯窗口设置为 window_override确保 ODS 任务使用指定窗口
config.config.setdefault("run", {}).setdefault("window_override", {})
config.config["run"]["window_override"]["start"] = window_start
config.config["run"]["window_override"]["end"] = window_end
# 任务过滤器
task_codes = None
if args.tasks:
task_codes = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
# 校验表过滤
verify_tables = None
if args.verify_tables:
verify_tables = [t.strip().lower() for t in args.verify_tables.split(",") if t.strip()]
# 组装 PipelineRunner 并执行
runner = PipelineRunner(
config, executor, registry,
db_conn, api_client, logger,
)
result = runner.run(
pipeline=args.pipeline,
processing_mode=args.processing_mode,
data_source=data_source,
window_start=window_start,
window_end=window_end,
window_split=args.window_split if args.window_split != "none" else None,
task_codes=task_codes,
fetch_before_verify=args.fetch_before_verify,
verify_tables=verify_tables,
)
logger.info("管道执行完成: %s", result.get("status"))
else:
# 传统模式
logger.info("执行模式: 传统模式")
task_codes = config.get("run.tasks")
logger.info("任务列表: %s", task_codes)
executor.run_tasks(task_codes, data_source=data_source)
finally:
# 确保资源释放(需求 6.1, 6.4
db_conn.close()
logger.info("ETL运行完成")
return 0
except Exception as e:
logger.error("ETL运行失败: %s", e, exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())