Files
Neo-ZQYY/tools/db/gen_consolidated_ddl.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

281 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
从测试数据库导出完整 DDL按 schema 分文件写入 docs/database/ddl/。
以数据库现状为准,整合所有 schema/表/约束/索引/视图/物化视图/序列/FDW 配置。
输出文件:
docs/database/ddl/etl_feiqiu__meta.sql
docs/database/ddl/etl_feiqiu__ods.sql
docs/database/ddl/etl_feiqiu__dwd.sql
docs/database/ddl/etl_feiqiu__core.sql
docs/database/ddl/etl_feiqiu__dws.sql
docs/database/ddl/etl_feiqiu__app.sql
docs/database/ddl/zqyy_app__public.sql
docs/database/ddl/zqyy_app__auth.sql
docs/database/ddl/zqyy_app__biz.sql
docs/database/ddl/fdw.sql
用法cd C:\\Project\\NeoZQYY && python scripts/ops/gen_consolidated_ddl.py
"""
import os, sys
from pathlib import Path
from datetime import date
import psycopg2
# ── 环境 ──────────────────────────────────────────────────────────────────
from dotenv import load_dotenv
ROOT = Path(__file__).resolve().parent.parent.parent
load_dotenv(ROOT / ".env")
ETL_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
APP_DSN = os.environ.get("TEST_APP_DB_DSN") or os.environ.get("APP_DB_DSN")
if not ETL_DSN:
sys.exit("ERROR: TEST_DB_DSN / PG_DSN 未配置")
if not APP_DSN:
sys.exit("ERROR: TEST_APP_DB_DSN / APP_DB_DSN 未配置")
OUTPUT_DIR = ROOT / "docs" / "database" / "ddl"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
FDW_FILE = ROOT / "db" / "fdw" / "setup_fdw.sql"
TODAY = date.today().isoformat()
# ── SQL 模板 ──────────────────────────────────────────────────────────────
SQL_TABLES = """
WITH cols AS (
SELECT table_schema, table_name,
string_agg(
format(E' %%I %%s%%s%%s',
column_name,
CASE WHEN data_type = 'USER-DEFINED' THEN udt_name
WHEN data_type = 'ARRAY' THEN udt_name
WHEN character_maximum_length IS NOT NULL THEN data_type || '(' || character_maximum_length || ')'
WHEN numeric_precision IS NOT NULL AND data_type IN ('numeric','decimal') THEN data_type || '(' || numeric_precision || ',' || numeric_scale || ')'
ELSE data_type END,
CASE WHEN column_default IS NOT NULL THEN ' DEFAULT ' || column_default ELSE '' END,
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END
), E',\\n' ORDER BY ordinal_position
) as col_defs
FROM information_schema.columns
WHERE table_schema = %s
AND table_name IN (SELECT table_name FROM information_schema.tables WHERE table_schema = %s AND table_type = 'BASE TABLE')
GROUP BY table_schema, table_name
)
SELECT format(E'CREATE TABLE %%I.%%I (\\n%%s\\n);', table_schema, table_name, col_defs) as ddl
FROM cols ORDER BY table_name;
"""
SQL_CONSTRAINTS = """
SELECT n.nspname as schema, conrelid::regclass as tbl, conname,
pg_get_constraintdef(c.oid) as def, contype
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE n.nspname = %s AND contype IN ('p','u','f')
ORDER BY conrelid::regclass::text, contype, conname;
"""
SQL_INDEXES = """
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = %s
AND indexname NOT IN (SELECT conname FROM pg_constraint WHERE contype IN ('p','u'))
ORDER BY tablename, indexname;
"""
SQL_SEQUENCES = """
SELECT sequence_name, data_type
FROM information_schema.sequences
WHERE sequence_schema = %s
ORDER BY sequence_name;
"""
SQL_VIEWS = """
SELECT viewname, definition
FROM pg_views
WHERE schemaname = %s
ORDER BY viewname;
"""
SQL_MATVIEWS = """
SELECT matviewname, definition
FROM pg_matviews
WHERE schemaname = %s
ORDER BY matviewname;
"""
SQL_MV_INDEXES = """
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = %s
AND tablename LIKE 'mv_%%'
ORDER BY tablename, indexname;
"""
SQL_TABLE_COUNT = """
SELECT count(*) FROM information_schema.tables
WHERE table_schema = %s AND table_type = 'BASE TABLE';
"""
# ── 辅助函数 ──────────────────────────────────────────────────────────────
def query(conn, sql, params=None):
with conn.cursor() as cur:
cur.execute(sql, params)
return cur.fetchall()
def section(f, title, level=1):
sep = "=" * 77 if level == 1 else "-" * 77
f.write(f"\n-- {sep}\n-- {title}\n-- {sep}\n\n")
def write_sequences(f, conn, schema):
rows = query(conn, SQL_SEQUENCES, (schema,))
if not rows:
return
f.write("-- 序列\n")
for name, dtype in rows:
f.write(f"CREATE SEQUENCE IF NOT EXISTS {schema}.{name} AS {dtype};\n")
f.write("\n")
def write_tables(f, conn, schema):
rows = query(conn, SQL_TABLES, (schema, schema))
if not rows:
return
f.write("-- 表\n")
for (ddl,) in rows:
f.write(ddl + "\n\n")
def write_constraints(f, conn, schema):
rows = query(conn, SQL_CONSTRAINTS, (schema,))
if not rows:
return
f.write("-- 约束(主键 / 唯一 / 外键)\n")
for _, tbl, conname, condef, _ in rows:
f.write(f"ALTER TABLE {tbl} ADD CONSTRAINT {conname} {condef};\n")
f.write("\n")
def write_indexes(f, conn, schema):
rows = query(conn, SQL_INDEXES, (schema,))
if not rows:
return
f.write("-- 索引\n")
for _, indexdef in rows:
f.write(indexdef + ";\n")
f.write("\n")
def write_views(f, conn, schema):
rows = query(conn, SQL_VIEWS, (schema,))
if not rows:
return
f.write("-- 视图\n")
for vname, vdef in rows:
f.write(f"CREATE OR REPLACE VIEW {schema}.{vname} AS\n{vdef.strip()}\n;\n\n")
def write_matviews(f, conn, schema):
rows = query(conn, SQL_MATVIEWS, (schema,))
if not rows:
return
f.write("-- 物化视图\n")
for mvname, mvdef in rows:
f.write(f"CREATE MATERIALIZED VIEW {schema}.{mvname} AS\n{mvdef.strip()}\n;\n\n")
# 物化视图索引
idx_rows = query(conn, SQL_MV_INDEXES, (schema,))
if idx_rows:
f.write("-- 物化视图索引\n")
for _, indexdef in idx_rows:
f.write(indexdef + ";\n")
f.write("\n")
def write_schema_file(conn, db_name, schema, label, views_only=False):
"""为单个 schema 生成独立 DDL 文件。"""
filename = f"{db_name}__{schema}.sql"
filepath = OUTPUT_DIR / filename
# 获取表数量
table_count = query(conn, SQL_TABLE_COUNT, (schema,))[0][0]
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"""\
-- =============================================================================
-- {db_name} / {schema}{label}
-- 生成日期:{TODAY}
-- 来源:测试库(通过脚本自动导出)
-- =============================================================================
CREATE SCHEMA IF NOT EXISTS {schema};
""")
if views_only:
write_views(f, conn, schema)
else:
write_sequences(f, conn, schema)
write_tables(f, conn, schema)
write_constraints(f, conn, schema)
write_indexes(f, conn, schema)
write_views(f, conn, schema)
write_matviews(f, conn, schema)
size_kb = filepath.stat().st_size / 1024
obj_desc = "仅视图" if views_only else f"{table_count}"
print(f"{filename:<35s} {size_kb:>6.1f} KB ({obj_desc})")
return filepath
def write_fdw_file():
"""输出 FDW 配置文件。"""
filepath = OUTPUT_DIR / "fdw.sql"
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"""\
-- =============================================================================
-- FDW 跨库映射(在 zqyy_app 中执行)
-- 生成日期:{TODAY}
-- 来源db/fdw/setup_fdw.sql
-- =============================================================================
""")
if FDW_FILE.exists():
f.write(FDW_FILE.read_text(encoding="utf-8"))
f.write("\n")
else:
f.write("-- FDW 配置文件未找到db/fdw/setup_fdw.sql\n")
size_kb = filepath.stat().st_size / 1024
print(f"{'fdw.sql':<35s} {size_kb:>6.1f} KB")
return filepath
# ── 主流程 ────────────────────────────────────────────────────────────────
def main():
etl_conn = psycopg2.connect(ETL_DSN)
app_conn = psycopg2.connect(APP_DSN)
print(f"输出目录:{OUTPUT_DIR}\n")
# etl_feiqiu 六层 schema
write_schema_file(etl_conn, "etl_feiqiu", "meta", "ETL 调度元数据")
write_schema_file(etl_conn, "etl_feiqiu", "ods", "原始数据层")
write_schema_file(etl_conn, "etl_feiqiu", "dwd", "明细数据层")
write_schema_file(etl_conn, "etl_feiqiu", "core", "跨门店标准化维度/事实")
write_schema_file(etl_conn, "etl_feiqiu", "dws", "汇总数据层")
write_schema_file(etl_conn, "etl_feiqiu", "app", "RLS 视图层", views_only=True)
# zqyy_app
write_schema_file(app_conn, "zqyy_app", "public", "小程序业务表")
write_schema_file(app_conn, "zqyy_app", "auth", "用户认证与权限")
write_schema_file(app_conn, "zqyy_app", "biz", "核心业务表(任务/备注/触发器)")
# FDW
write_fdw_file()
etl_conn.close()
app_conn.close()
# 删除旧的合并文件
old_file = ROOT / "docs" / "database" / "consolidated_ddl.sql"
if old_file.exists():
old_file.unlink()
print(f"\n🗑️ 已删除旧文件:{old_file.name}")
print(f"\n✅ 完成,共 10 个文件")
if __name__ == "__main__":
main()