""" 初始化 etl_feiqiu 和 zqyy_app 数据库的 DDL 执行脚本。 通过 psycopg2 直接连接新库并执行 SQL 文件。 """ import os import sys import psycopg2 DB_HOST = "100.64.0.4" DB_PORT = 5432 DB_USER = "local-Python" DB_PASSWORD = "Neo-local-1991125" BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def execute_sql_file(conn, filepath, label=""): """读取并执行一个 SQL 文件""" full_path = os.path.join(BASE_DIR, filepath) if not os.path.exists(full_path): print(f" [SKIP] 文件不存在: {filepath}") return False with open(full_path, "r", encoding="utf-8") as f: sql = f.read() if not sql.strip(): print(f" [SKIP] 文件为空: {filepath}") return False try: cur = conn.cursor() cur.execute(sql) conn.commit() print(f" [OK] {label or filepath}") return True except Exception as e: conn.rollback() print(f" [FAIL] {label or filepath}: {e}") return False def init_etl_feiqiu(): """初始化 etl_feiqiu 数据库:六层 schema DDL + 种子数据""" print("\n=== 初始化 etl_feiqiu 数据库 ===") conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="etl_feiqiu" ) conn.autocommit = False # 六层 schema DDL(按依赖顺序) schema_files = [ ("db/etl_feiqiu/schemas/meta.sql", "meta schema(调度元数据)"), ("db/etl_feiqiu/schemas/ods.sql", "ods schema(原始数据)"), ("db/etl_feiqiu/schemas/dwd.sql", "dwd schema(明细数据)"), ("db/etl_feiqiu/schemas/core.sql", "core schema(跨门店标准化)"), ("db/etl_feiqiu/schemas/dws.sql", "dws schema(汇总数据)"), ("db/etl_feiqiu/schemas/app.sql", "app schema(RLS 视图层)"), ] success_count = 0 fail_count = 0 for filepath, label in schema_files: if execute_sql_file(conn, filepath, label): success_count += 1 else: fail_count += 1 # 种子数据 seed_files = [ ("db/etl_feiqiu/seeds/seed_ods_tasks.sql", "种子:ODS 任务"), ("db/etl_feiqiu/seeds/seed_scheduler_tasks.sql", "种子:调度任务"), ("db/etl_feiqiu/seeds/seed_dws_config.sql", "种子:DWS 配置"), ("db/etl_feiqiu/seeds/seed_index_parameters.sql", "种子:指数参数"), ] for filepath, label in seed_files: if execute_sql_file(conn, filepath, label): success_count += 1 else: fail_count += 1 conn.close() print(f"\netl_feiqiu 完成: {success_count} 成功, {fail_count} 失败") return fail_count == 0 def init_zqyy_app(): """初始化 zqyy_app 数据库:schema + 迁移 + 种子""" print("\n=== 初始化 zqyy_app 数据库 ===") conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="zqyy_app" ) conn.autocommit = False files = [ ("db/zqyy_app/schemas/init.sql", "zqyy_app schema(用户/RBAC)"), ("db/zqyy_app/migrations/20250715_create_admin_web_tables.sql", "迁移:admin_web 表"), ("db/zqyy_app/seeds/admin_web_seed.sql", "种子:默认管理员"), ] success_count = 0 fail_count = 0 for filepath, label in files: if execute_sql_file(conn, filepath, label): success_count += 1 else: fail_count += 1 conn.close() print(f"\nzqyy_app 完成: {success_count} 成功, {fail_count} 失败") return fail_count == 0 if __name__ == "__main__": print("开始初始化数据库...") r1 = init_etl_feiqiu() r2 = init_zqyy_app() if r1 and r2: print("\n✓ 全部初始化成功") else: print("\n✗ 部分初始化失败,请检查上方错误信息") sys.exit(1)