Files
Neo-ZQYY/tools/db/setup_fdw_both.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

84 lines
2.5 KiB
Python

# -*- coding: utf-8 -*-
"""
在 zqyy_app 和 test_zqyy_app 中执行 FDW 配置。
- zqyy_app -> setup_fdw.sql (指向 etl_feiqiu)
- test_zqyy_app -> setup_fdw_test.sql (指向 test_etl_feiqiu)
"""
import os
import psycopg2
CONN = dict(host="100.64.0.4", port=5432, user="local-Python", password="Neo-local-1991125")
BASE = r"C:\Project\NeoZQYY"
# 实际密码替换占位符 '***'
APP_READER_PWD = "AppR3ad_2026!"
TARGETS = [
("zqyy_app", os.path.join(BASE, "db", "fdw", "setup_fdw.sql")),
("test_zqyy_app", os.path.join(BASE, "db", "fdw", "setup_fdw_test.sql")),
]
for dbname, sql_path in TARGETS:
print(f"\n{'='*60}")
print(f"执行 FDW 配置: {dbname} <- {os.path.basename(sql_path)}")
print(f"{'='*60}")
sql = open(sql_path, encoding="utf-8").read()
# 替换密码占位符
sql = sql.replace("password '***'", f"password '{APP_READER_PWD}'")
conn = psycopg2.connect(**CONN, dbname=dbname)
conn.autocommit = True
cur = conn.cursor()
# 逐条执行(按分号拆分,跳过注释和空行)
statements = []
current = []
for line in sql.split("\n"):
stripped = line.strip()
if stripped.startswith("--") or not stripped:
continue
current.append(line)
if stripped.endswith(";"):
statements.append("\n".join(current))
current = []
success = 0
skip = 0
fail = 0
for stmt in statements:
try:
cur.execute(stmt)
first_line = stmt.strip().split("\n")[0][:80]
print(f" [OK] {first_line}")
success += 1
except psycopg2.errors.DuplicateObject as e:
conn.rollback()
print(f" [SKIP] 已存在: {str(e).strip().split(chr(10))[0]}")
skip += 1
except Exception as e:
conn.rollback()
print(f" [FAIL] {str(e).strip().split(chr(10))[0]}")
print(f" SQL: {stmt[:100]}")
fail += 1
# 验证
cur.execute("SELECT 1 FROM pg_extension WHERE extname = 'postgres_fdw'")
fdw_ext = cur.fetchone() is not None
cur.execute("SELECT srvname FROM pg_foreign_server")
servers = [r[0] for r in cur.fetchall()]
cur.execute(
"SELECT count(*) FROM information_schema.tables "
"WHERE table_schema = 'fdw_etl'"
)
fdw_tables = cur.fetchone()[0]
print(f"\n 结果: {success} OK, {skip} SKIP, {fail} FAIL")
print(f" 验证: fdw扩展={fdw_ext}, servers={servers}, fdw_etl表数={fdw_tables}")
conn.close()
print("\n完成!")