Files
feiqiu-ETL/etl_billiards/config/env_parser.py
2025-11-30 07:19:05 +08:00

176 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""环境变量解析"""
import os
import json
from pathlib import Path
from copy import deepcopy
ENV_MAP = {
"TIMEZONE": ("app.timezone",),
"STORE_ID": ("app.store_id",),
"SCHEMA_OLTP": ("app.schema_oltp",),
"SCHEMA_ETL": ("app.schema_etl",),
"PG_DSN": ("db.dsn",),
"PG_HOST": ("db.host",),
"PG_PORT": ("db.port",),
"PG_NAME": ("db.name",),
"PG_USER": ("db.user",),
"PG_PASSWORD": ("db.password",),
"PG_CONNECT_TIMEOUT": ("db.connect_timeout_sec",),
"API_BASE": ("api.base_url",),
"API_TOKEN": ("api.token",),
"FICOO_TOKEN": ("api.token",),
"API_TIMEOUT": ("api.timeout_sec",),
"API_PAGE_SIZE": ("api.page_size",),
"API_RETRY_MAX": ("api.retries.max_attempts",),
"API_RETRY_BACKOFF": ("api.retries.backoff_sec",),
"API_PARAMS": ("api.params",),
"EXPORT_ROOT": ("io.export_root",),
"LOG_ROOT": ("io.log_root",),
"MANIFEST_NAME": ("io.manifest_name",),
"INGEST_REPORT_NAME": ("io.ingest_report_name",),
"WRITE_PRETTY_JSON": ("io.write_pretty_json",),
"RUN_TASKS": ("run.tasks",),
"OVERLAP_SECONDS": ("run.overlap_seconds",),
"WINDOW_BUSY_MIN": ("run.window_minutes.default_busy",),
"WINDOW_IDLE_MIN": ("run.window_minutes.default_idle",),
"IDLE_START": ("run.idle_window.start",),
"IDLE_END": ("run.idle_window.end",),
"IDLE_WINDOW_START": ("run.idle_window.start",),
"IDLE_WINDOW_END": ("run.idle_window.end",),
"ALLOW_EMPTY_RESULT_ADVANCE": ("run.allow_empty_result_advance",),
"ALLOW_EMPTY_ADVANCE": ("run.allow_empty_result_advance",),
"PIPELINE_FLOW": ("pipeline.flow",),
"JSON_FETCH_ROOT": ("pipeline.fetch_root",),
"JSON_SOURCE_DIR": ("pipeline.ingest_source_dir",),
"FETCH_ROOT": ("pipeline.fetch_root",),
"INGEST_SOURCE_DIR": ("pipeline.ingest_source_dir",),
}
def _deep_set(d, dotted_keys, value):
cur = d
for k in dotted_keys[:-1]:
cur = cur.setdefault(k, {})
cur[dotted_keys[-1]] = value
def _coerce_env(v: str):
if v is None:
return None
s = v.strip()
if s.lower() in ("true", "false"):
return s.lower() == "true"
try:
if s.isdigit() or (s.startswith("-") and s[1:].isdigit()):
return int(s)
except Exception:
pass
if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
try:
return json.loads(s)
except Exception:
return s
return s
def _strip_inline_comment(value: str) -> str:
"""去掉未被引号包裹的内联注释"""
result = []
in_quote = False
quote_char = ""
escape = False
for ch in value:
if escape:
result.append(ch)
escape = False
continue
if ch == "\\":
escape = True
result.append(ch)
continue
if ch in ("'", '"'):
if not in_quote:
in_quote = True
quote_char = ch
elif quote_char == ch:
in_quote = False
quote_char = ""
result.append(ch)
continue
if ch == "#" and not in_quote:
break
result.append(ch)
return "".join(result).rstrip()
def _unquote_value(value: str) -> str:
"""处理引号/原始字符串以及尾随逗号"""
trimmed = value.strip()
trimmed = _strip_inline_comment(trimmed)
trimmed = trimmed.rstrip(",").rstrip()
if not trimmed:
return trimmed
if len(trimmed) >= 2 and trimmed[0] in ("'", '"') and trimmed[-1] == trimmed[0]:
return trimmed[1:-1]
if (
len(trimmed) >= 3
and trimmed[0] in ("r", "R")
and trimmed[1] in ("'", '"')
and trimmed[-1] == trimmed[1]
):
return trimmed[2:-1]
return trimmed
def _parse_dotenv_line(line: str) -> tuple[str, str] | None:
"""解析 .env 文件中的单行"""
stripped = line.strip()
if not stripped or stripped.startswith("#"):
return None
if stripped.startswith("export "):
stripped = stripped[len("export ") :].strip()
if "=" not in stripped:
return None
key, value = stripped.split("=", 1)
key = key.strip()
value = _unquote_value(value)
return key, value
def _load_dotenv_values() -> dict:
"""从项目根目录读取 .env 文件键值"""
if os.environ.get("ETL_SKIP_DOTENV") in ("1", "true", "TRUE", "True"):
return {}
root = Path(__file__).resolve().parents[1]
dotenv_path = root / ".env"
if not dotenv_path.exists():
return {}
values: dict[str, str] = {}
for line in dotenv_path.read_text(encoding="utf-8", errors="ignore").splitlines():
parsed = _parse_dotenv_line(line)
if parsed:
key, value = parsed
values[key] = value
return values
def _apply_env_values(cfg: dict, source: dict):
for env_key, dotted in ENV_MAP.items():
val = source.get(env_key)
if val is None:
continue
v2 = _coerce_env(val)
for path in dotted:
if path == "run.tasks" and isinstance(v2, str):
v2 = [item.strip() for item in v2.split(",") if item.strip()]
_deep_set(cfg, path.split("."), v2)
def load_env_overrides(defaults: dict) -> dict:
cfg = deepcopy(defaults)
# 先读取 .env再读取真实环境变量确保 CLI 仍然最高优先级
_apply_env_values(cfg, _load_dotenv_values())
_apply_env_values(cfg, os.environ)
return cfg