# -*- coding: utf-8 -*- """ 一次性运维脚本:初始店铺同步(数据迁移补数据)。 步骤: 1. 检查 biz.sites 表是否存在,不存在则先执行 DDL 迁移脚本 2. 调用 sync_sites_from_etl() 从 ETL 库补充缺失店铺 使用测试库:test_zqyy_app(业务)+ test_etl_feiqiu(ETL)。 需求: A1b.1, A1b.2 """ import sys from pathlib import Path # 定位项目根目录并加载 .env _project_root = Path(__file__).resolve().parents[2] sys.path.insert(0, str(_project_root / "apps" / "backend")) from dotenv import load_dotenv load_dotenv(_project_root / ".env", override=False) # 导入后端模块(会触发 app.config 加载,验证环境变量) from app.config import APP_DB_NAME, ETL_DB_NAME from app.database import get_connection def _ensure_biz_tables_exist() -> bool: """检查 biz.sites 表是否存在,不存在则执行迁移脚本。返回是否执行了迁移。""" conn = get_connection() try: with conn.cursor() as cur: cur.execute( "SELECT 1 FROM information_schema.tables " "WHERE table_schema = 'biz' AND table_name = 'sites'" ) if cur.fetchone(): return False # 表已存在,无需迁移 # 表不存在,执行迁移脚本 migration_path = _project_root / "db" / "zqyy_app" / "migrations" / "2026-03-22__ns41_registry_tables.sql" if not migration_path.exists(): print(f"[ERROR] 迁移脚本不存在: {migration_path}") sys.exit(1) print(f"[INFO] biz.sites 表不存在,执行迁移脚本: {migration_path.name}") sql = migration_path.read_text(encoding="utf-8") with conn.cursor() as cur: cur.execute(sql) conn.commit() print("[INFO] 迁移脚本执行成功") return True except Exception as e: conn.rollback() print(f"[ERROR] 迁移脚本执行失败: {e}") sys.exit(1) finally: conn.close() def main() -> None: # 安全检查:确认连接的是测试库 if "test" not in APP_DB_NAME: print(f"[ERROR] APP_DB_NAME={APP_DB_NAME},不是测试库,中止执行") sys.exit(1) if "test" not in ETL_DB_NAME: print(f"[ERROR] ETL_DB_NAME={ETL_DB_NAME},不是测试库,中止执行") sys.exit(1) print(f"[INFO] 业务库: {APP_DB_NAME}") print(f"[INFO] ETL 库: {ETL_DB_NAME}") # 确保 biz 表已创建 migrated = _ensure_biz_tables_exist() if migrated: print("[INFO] DDL 迁移已完成,继续执行同步") else: print("[INFO] biz 表已存在,跳过迁移") # 执行同步 from app.routers.admin_registry import sync_sites_from_etl print("[INFO] 开始初始店铺同步...") result = sync_sites_from_etl() print(f"[RESULT] 新增店铺: {result.inserted}") print(f"[RESULT] 更新店铺: {result.updated}") print("[INFO] 同步完成") if __name__ == "__main__": main()