Files
feiqiu-ETL/tmp/check_ddl_vs_db.py
2026-02-04 21:39:01 +08:00

232 lines
8.1 KiB
Python

# -*- coding: utf-8 -*-
"""
检查 DDL 文件与数据库实际结构的差异
"""
import psycopg2
import re
from pathlib import Path
DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test'
def get_db_columns(conn, schema):
"""从数据库获取所有表和列"""
sql = """
SELECT table_name, column_name, data_type,
character_maximum_length, numeric_precision, numeric_scale,
is_nullable
FROM information_schema.columns
WHERE table_schema = %s
ORDER BY table_name, ordinal_position
"""
with conn.cursor() as cur:
cur.execute(sql, (schema,))
rows = cur.fetchall()
tables = {}
for row in rows:
table_name = row[0]
col_name = row[1].lower()
data_type = row[2]
char_len = row[3]
num_prec = row[4]
num_scale = row[5]
# 构建类型字符串
if data_type == 'character varying':
type_str = f'VARCHAR({char_len})' if char_len else 'VARCHAR'
elif data_type == 'numeric':
type_str = f'NUMERIC({num_prec},{num_scale})' if num_prec else 'NUMERIC'
elif data_type == 'integer':
type_str = 'INTEGER'
elif data_type == 'bigint':
type_str = 'BIGINT'
elif data_type == 'smallint':
type_str = 'SMALLINT'
elif data_type == 'boolean':
type_str = 'BOOLEAN'
elif data_type == 'text':
type_str = 'TEXT'
elif data_type == 'jsonb':
type_str = 'JSONB'
elif data_type == 'json':
type_str = 'JSON'
elif data_type == 'date':
type_str = 'DATE'
elif data_type == 'timestamp with time zone':
type_str = 'TIMESTAMPTZ'
elif data_type == 'timestamp without time zone':
type_str = 'TIMESTAMP'
else:
type_str = data_type.upper()
if table_name not in tables:
tables[table_name] = {}
tables[table_name][col_name] = type_str
return tables
def parse_ddl_file(filepath, default_schema=None):
"""解析 DDL 文件,提取表和列定义"""
content = Path(filepath).read_text(encoding='utf-8')
tables = {}
# 匹配多种 CREATE TABLE 格式:
# 1. CREATE TABLE schema.table (...)
# 2. CREATE TABLE IF NOT EXISTS schema.table (...)
# 3. CREATE TABLE IF NOT EXISTS table (...) -- 需要 default_schema
# 4. CREATE TABLE table (...) -- 需要 default_schema
table_pattern = re.compile(
r'CREATE TABLE\s+(?:IF NOT EXISTS\s+)?(?:(\w+)\.)?(\w+)\s*\((.*?)\);',
re.DOTALL | re.IGNORECASE
)
for match in table_pattern.finditer(content):
schema = match.group(1) or default_schema
table_name = match.group(2)
columns_block = match.group(3)
columns = {}
# 解析列定义
for line in columns_block.split('\n'):
line = line.strip()
if not line or line.startswith('--'):
continue
# 跳过约束
if line.upper().startswith(('PRIMARY KEY', 'CONSTRAINT', 'UNIQUE', 'FOREIGN KEY', 'CHECK', 'EXCLUDE')):
continue
# 匹配列定义: column_name TYPE ...
col_match = re.match(r'^(\w+)\s+(\w+(?:\s*\([^)]+\))?)', line)
if col_match:
col_name = col_match.group(1).lower()
col_type = col_match.group(2).upper().replace(' ', '')
# 标准化类型
if col_type == 'INT':
col_type = 'INTEGER'
columns[col_name] = col_type
tables[table_name] = columns
return tables
def compare_schemas(db_tables, ddl_tables, schema_name):
"""比较数据库和 DDL 的差异"""
differences = {
'db_only_tables': [],
'ddl_only_tables': [],
'db_only_cols': [],
'ddl_only_cols': [],
'type_diff': []
}
# 检查 DDL 中有但数据库没有的表
for table in ddl_tables:
if table not in db_tables:
differences['ddl_only_tables'].append(f"{schema_name}.{table}")
# 检查数据库中有但 DDL 没有的表
for table in db_tables:
if table not in ddl_tables:
differences['db_only_tables'].append(f"{schema_name}.{table}")
# 检查共有表的列差异
for table in set(db_tables.keys()) & set(ddl_tables.keys()):
db_cols = db_tables[table]
ddl_cols = ddl_tables[table]
# DDL 有但 DB 没有的列
for col in ddl_cols:
if col not in db_cols:
differences['ddl_only_cols'].append(f"{schema_name}.{table}.{col} ({ddl_cols[col]})")
# DB 有但 DDL 没有的列
for col in db_cols:
if col not in ddl_cols:
differences['db_only_cols'].append(f"{schema_name}.{table}.{col} ({db_cols[col]})")
return differences
def main():
conn = psycopg2.connect(DSN)
base_dir = Path(__file__).parent.parent / 'etl_billiards' / 'database'
print("=" * 80)
print("DDL vs DB Structure Comparison")
print("=" * 80)
# 检查 ODS
print("\n### billiards_ods ###\n")
ods_ddl_file = base_dir / 'schema_ODS_doc.sql'
if ods_ddl_file.exists():
db_ods = get_db_columns(conn, 'billiards_ods')
ddl_ods = parse_ddl_file(ods_ddl_file, 'billiards_ods')
print(f"DB tables: {len(db_ods)}")
print(f"DDL tables: {len(ddl_ods)}")
diff_ods = compare_schemas(db_ods, ddl_ods, 'billiards_ods')
total_diff = sum(len(v) for v in diff_ods.values())
if total_diff > 0:
print(f"\nFound {total_diff} differences:")
if diff_ods['db_only_tables']:
print("\n [DB has, DDL missing] Tables:")
for t in sorted(diff_ods['db_only_tables']):
print(f" - {t}")
if diff_ods['ddl_only_tables']:
print("\n [DDL has, DB missing] Tables:")
for t in sorted(diff_ods['ddl_only_tables']):
print(f" - {t}")
if diff_ods['db_only_cols']:
print("\n [DB has, DDL missing] Columns:")
for c in sorted(diff_ods['db_only_cols']):
print(f" - {c}")
if diff_ods['ddl_only_cols']:
print("\n [DDL has, DB missing] Columns:")
for c in sorted(diff_ods['ddl_only_cols']):
print(f" - {c}")
else:
print("\nNo differences found.")
# 检查 DWD
print("\n### billiards_dwd ###\n")
dwd_ddl_file = base_dir / 'schema_dwd_doc.sql'
if dwd_ddl_file.exists():
db_dwd = get_db_columns(conn, 'billiards_dwd')
ddl_dwd = parse_ddl_file(dwd_ddl_file, 'billiards_dwd')
print(f"DB tables: {len(db_dwd)}")
print(f"DDL tables: {len(ddl_dwd)}")
diff_dwd = compare_schemas(db_dwd, ddl_dwd, 'billiards_dwd')
total_diff = sum(len(v) for v in diff_dwd.values())
if total_diff > 0:
print(f"\nFound {total_diff} differences:")
if diff_dwd['db_only_tables']:
print("\n [DB has, DDL missing] Tables:")
for t in sorted(diff_dwd['db_only_tables']):
print(f" - {t}")
if diff_dwd['ddl_only_tables']:
print("\n [DDL has, DB missing] Tables:")
for t in sorted(diff_dwd['ddl_only_tables']):
print(f" - {t}")
if diff_dwd['db_only_cols']:
print("\n [DB has, DDL missing] Columns:")
for c in sorted(diff_dwd['db_only_cols']):
print(f" - {c}")
if diff_dwd['ddl_only_cols']:
print("\n [DDL has, DB missing] Columns:")
for c in sorted(diff_dwd['ddl_only_cols']):
print(f" - {c}")
else:
print("\nNo differences found.")
conn.close()
print("\n" + "=" * 80)
if __name__ == '__main__':
main()