Files
Neo-ZQYY/scripts/ops/sync_ddl_after_migration.py

313 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
同步 DDL 文件:将 2026-02-20 迁移后的变更反映到 DDL 文件中。
目标文件:
- db/etl_feiqiu/schemas/dwd.sqlschema=dwd
- db/etl_feiqiu/schemas/schema_dwd_doc.sqlschema=billiards_dwd
- db/etl_feiqiu/schemas/dws.sqlschema=dws
- db/etl_feiqiu/schemas/schema_dws.sqlschema=billiards_dws
策略:对每个 DDL 文件做精确的文本替换/追加,而非全量重写。
"""
import os
import sys
import re
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
SCHEMAS_DIR = Path(__file__).resolve().parents[2] / "db" / "etl_feiqiu" / "schemas"
# ── 变更清单 ──────────────────────────────────────────────────────────
# 1. dim_table_exschema_dwd_doc.sql 缺少 14 列dwd.sql 已有)
DIM_TABLE_EX_OLD_COLS_DOC = """\
table_status INTEGER,
SCD2_start_time TIMESTAMPTZ DEFAULT now(),"""
DIM_TABLE_EX_NEW_COLS_DOC = """\
table_status INTEGER,
create_time TIMESTAMPTZ,
light_status INTEGER,
tablestatusname TEXT,
sitename TEXT,
applet_qr_code_url TEXT,
audit_status INTEGER,
charge_free INTEGER,
delay_lights_time INTEGER,
is_rest_area INTEGER,
only_allow_groupon INTEGER,
order_delay_time INTEGER,
self_table INTEGER,
temporary_light_second INTEGER,
virtual_table INTEGER,
SCD2_start_time TIMESTAMPTZ DEFAULT now(),"""
# 2. dim_assistant_ex两个文件都缺少 4 列
DIM_ASSISTANT_EX_OLD_COLS = """\
serial_number BIGINT,
SCD2_start_time TIMESTAMPTZ,"""
DIM_ASSISTANT_EX_NEW_COLS = """\
serial_number BIGINT,
system_role_id BIGINT,
job_num TEXT,
cx_unit_price NUMERIC(18,2),
pd_unit_price NUMERIC(18,2),
SCD2_start_time TIMESTAMPTZ,"""
# 3. DWD 新表定义(追加到文件末尾)
DWD_NEW_TABLES = """
-- =============================================================================
-- 2026-02-20 新增表
-- =============================================================================
CREATE TABLE IF NOT EXISTS dwd_goods_stock_summary (
site_goods_id BIGINT NOT NULL,
goods_name TEXT,
goods_unit TEXT,
goods_category_id BIGINT,
goods_category_second_id BIGINT,
category_name TEXT,
range_start_stock NUMERIC(18,4),
range_end_stock NUMERIC(18,4),
range_in NUMERIC(18,4),
range_out NUMERIC(18,4),
range_sale NUMERIC(18,4),
range_sale_money NUMERIC(18,2),
range_inventory NUMERIC(18,4),
current_stock NUMERIC(18,4),
site_id BIGINT,
tenant_id BIGINT,
fetched_at TIMESTAMPTZ,
PRIMARY KEY (site_goods_id, fetched_at)
);
COMMENT ON TABLE {schema}.dwd_goods_stock_summary IS '库存汇总明细表事实表。来源ods.goods_stock_summary。按时间窗口增量加载。';
CREATE TABLE IF NOT EXISTS dwd_goods_stock_movement (
site_goods_stock_id BIGINT NOT NULL,
tenant_id BIGINT,
site_id BIGINT,
site_goods_id BIGINT,
goods_name TEXT,
goods_category_id BIGINT,
goods_second_category_id BIGINT,
unit TEXT,
price NUMERIC(18,4),
stock_type INTEGER,
change_num NUMERIC(18,4),
start_num NUMERIC(18,4),
end_num NUMERIC(18,4),
change_num_a NUMERIC(18,4),
start_num_a NUMERIC(18,4),
end_num_a NUMERIC(18,4),
remark TEXT,
operator_name TEXT,
create_time TIMESTAMPTZ,
fetched_at TIMESTAMPTZ,
PRIMARY KEY (site_goods_stock_id)
);
COMMENT ON TABLE {schema}.dwd_goods_stock_movement IS '库存变动流水表事实表。来源ods.goods_stock_movements。按 create_time 增量加载。';
"""
# 4. DWS 新表定义(追加到文件末尾)
DWS_NEW_TABLES = """
-- =============================================================================
-- 2026-02-20 新增:库存汇总表(日/周/月)
-- =============================================================================
CREATE TABLE IF NOT EXISTS {schema}.dws_goods_stock_daily_summary (
site_id BIGINT NOT NULL,
tenant_id BIGINT,
stat_date DATE NOT NULL,
site_goods_id BIGINT NOT NULL,
goods_name TEXT,
goods_unit TEXT,
goods_category_id BIGINT,
goods_category_second_id BIGINT,
category_name TEXT,
range_start_stock NUMERIC,
range_end_stock NUMERIC,
range_in NUMERIC,
range_out NUMERIC,
range_sale NUMERIC,
range_sale_money NUMERIC(12,2),
range_inventory NUMERIC,
current_stock NUMERIC,
stat_period TEXT NOT NULL DEFAULT 'daily',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (site_id, stat_date, site_goods_id)
);
COMMENT ON TABLE {schema}.dws_goods_stock_daily_summary
IS '库存日度汇总:按门店+日期+商品汇总库存变动';
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_daily_date
ON {schema}.dws_goods_stock_daily_summary (stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_daily_goods
ON {schema}.dws_goods_stock_daily_summary (site_goods_id, stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_daily_site
ON {schema}.dws_goods_stock_daily_summary (site_id, stat_date);
CREATE TABLE IF NOT EXISTS {schema}.dws_goods_stock_weekly_summary (
site_id BIGINT NOT NULL,
tenant_id BIGINT,
stat_date DATE NOT NULL,
site_goods_id BIGINT NOT NULL,
goods_name TEXT,
goods_unit TEXT,
goods_category_id BIGINT,
goods_category_second_id BIGINT,
category_name TEXT,
range_start_stock NUMERIC,
range_end_stock NUMERIC,
range_in NUMERIC,
range_out NUMERIC,
range_sale NUMERIC,
range_sale_money NUMERIC(12,2),
range_inventory NUMERIC,
current_stock NUMERIC,
stat_period TEXT NOT NULL DEFAULT 'weekly',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (site_id, stat_date, site_goods_id)
);
COMMENT ON TABLE {schema}.dws_goods_stock_weekly_summary
IS '库存周度汇总:按门店+ISO周+商品汇总库存变动stat_date 为周一日期';
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_weekly_date
ON {schema}.dws_goods_stock_weekly_summary (stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_weekly_goods
ON {schema}.dws_goods_stock_weekly_summary (site_goods_id, stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_weekly_site
ON {schema}.dws_goods_stock_weekly_summary (site_id, stat_date);
CREATE TABLE IF NOT EXISTS {schema}.dws_goods_stock_monthly_summary (
site_id BIGINT NOT NULL,
tenant_id BIGINT,
stat_date DATE NOT NULL,
site_goods_id BIGINT NOT NULL,
goods_name TEXT,
goods_unit TEXT,
goods_category_id BIGINT,
goods_category_second_id BIGINT,
category_name TEXT,
range_start_stock NUMERIC,
range_end_stock NUMERIC,
range_in NUMERIC,
range_out NUMERIC,
range_sale NUMERIC,
range_sale_money NUMERIC(12,2),
range_inventory NUMERIC,
current_stock NUMERIC,
stat_period TEXT NOT NULL DEFAULT 'monthly',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (site_id, stat_date, site_goods_id)
);
COMMENT ON TABLE {schema}.dws_goods_stock_monthly_summary
IS '库存月度汇总:按门店+自然月+商品汇总库存变动stat_date 为月首日期';
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_monthly_date
ON {schema}.dws_goods_stock_monthly_summary (stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_monthly_goods
ON {schema}.dws_goods_stock_monthly_summary (site_goods_id, stat_date);
CREATE INDEX IF NOT EXISTS idx_dws_goods_stock_monthly_site
ON {schema}.dws_goods_stock_monthly_summary (site_id, stat_date);
"""
def patch_file(filepath: Path, old: str, new: str, label: str) -> bool:
"""在文件中替换文本"""
content = filepath.read_text(encoding="utf-8")
if old not in content:
print(f" ⚠️ {label}: 未找到匹配文本,跳过")
return False
if new in content:
print(f" ⏭️ {label}: 已包含新内容,跳过")
return True
content = content.replace(old, new, 1)
filepath.write_text(content, encoding="utf-8")
print(f"{label}")
return True
def append_if_missing(filepath: Path, marker: str, content: str, label: str) -> bool:
"""如果文件中不包含 marker则追加 content"""
text = filepath.read_text(encoding="utf-8")
if marker in text:
print(f" ⏭️ {label}: 已存在,跳过")
return True
text = text.rstrip() + "\n" + content
filepath.write_text(text, encoding="utf-8")
print(f"{label}")
return True
def main():
print("=" * 60)
print("同步 DDL 文件2026-02-20 迁移批次)")
print("=" * 60)
# ── 1. schema_dwd_doc.sqldim_table_ex 加 14 列 ──
doc_file = SCHEMAS_DIR / "schema_dwd_doc.sql"
print(f"\n[1] {doc_file.name}: dim_table_ex +14 列")
patch_file(doc_file, DIM_TABLE_EX_OLD_COLS_DOC, DIM_TABLE_EX_NEW_COLS_DOC,
"dim_table_ex 列定义")
# ── 2. schema_dwd_doc.sqldim_assistant_ex 加 4 列 ──
print(f"\n[2] {doc_file.name}: dim_assistant_ex +4 列")
patch_file(doc_file, DIM_ASSISTANT_EX_OLD_COLS, DIM_ASSISTANT_EX_NEW_COLS,
"dim_assistant_ex 列定义doc")
# ── 3. dwd.sqldim_assistant_ex 加 4 列 ──
dwd_file = SCHEMAS_DIR / "dwd.sql"
print(f"\n[3] {dwd_file.name}: dim_assistant_ex +4 列")
patch_file(dwd_file, DIM_ASSISTANT_EX_OLD_COLS, DIM_ASSISTANT_EX_NEW_COLS,
"dim_assistant_ex 列定义dwd")
# ── 4. dwd.sql追加新表 ──
print(f"\n[4] {dwd_file.name}: 追加 dwd_goods_stock_summary + dwd_goods_stock_movement")
append_if_missing(dwd_file, "dwd_goods_stock_summary",
DWD_NEW_TABLES.format(schema="dwd"),
"DWD 新表")
# ── 5. schema_dwd_doc.sql追加新表 ──
print(f"\n[5] {doc_file.name}: 追加 dwd_goods_stock_summary + dwd_goods_stock_movement")
append_if_missing(doc_file, "dwd_goods_stock_summary",
DWD_NEW_TABLES.format(schema="billiards_dwd"),
"DWD 新表doc")
# ── 6. dws.sql追加库存汇总表 ──
dws_file = SCHEMAS_DIR / "dws.sql"
print(f"\n[6] {dws_file.name}: 追加 3 张库存汇总表")
append_if_missing(dws_file, "dws_goods_stock_daily_summary",
DWS_NEW_TABLES.format(schema="dws"),
"DWS 库存汇总表")
# ── 7. schema_dws.sql追加库存汇总表 ──
dws_doc_file = SCHEMAS_DIR / "schema_dws.sql"
print(f"\n[7] {dws_doc_file.name}: 追加 3 张库存汇总表")
append_if_missing(dws_doc_file, "dws_goods_stock_daily_summary",
DWS_NEW_TABLES.format(schema="billiards_dws"),
"DWS 库存汇总表doc")
print("\n" + "=" * 60)
print("DDL 同步完成")
print("=" * 60)
if __name__ == "__main__":
main()