232 lines
8.1 KiB
Python
232 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
检查 DDL 文件与数据库实际结构的差异
|
|
"""
|
|
import psycopg2
|
|
import re
|
|
from pathlib import Path
|
|
|
|
DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test'
|
|
|
|
def get_db_columns(conn, schema):
|
|
"""从数据库获取所有表和列"""
|
|
sql = """
|
|
SELECT table_name, column_name, data_type,
|
|
character_maximum_length, numeric_precision, numeric_scale,
|
|
is_nullable
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s
|
|
ORDER BY table_name, ordinal_position
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, (schema,))
|
|
rows = cur.fetchall()
|
|
|
|
tables = {}
|
|
for row in rows:
|
|
table_name = row[0]
|
|
col_name = row[1].lower()
|
|
data_type = row[2]
|
|
char_len = row[3]
|
|
num_prec = row[4]
|
|
num_scale = row[5]
|
|
|
|
# 构建类型字符串
|
|
if data_type == 'character varying':
|
|
type_str = f'VARCHAR({char_len})' if char_len else 'VARCHAR'
|
|
elif data_type == 'numeric':
|
|
type_str = f'NUMERIC({num_prec},{num_scale})' if num_prec else 'NUMERIC'
|
|
elif data_type == 'integer':
|
|
type_str = 'INTEGER'
|
|
elif data_type == 'bigint':
|
|
type_str = 'BIGINT'
|
|
elif data_type == 'smallint':
|
|
type_str = 'SMALLINT'
|
|
elif data_type == 'boolean':
|
|
type_str = 'BOOLEAN'
|
|
elif data_type == 'text':
|
|
type_str = 'TEXT'
|
|
elif data_type == 'jsonb':
|
|
type_str = 'JSONB'
|
|
elif data_type == 'json':
|
|
type_str = 'JSON'
|
|
elif data_type == 'date':
|
|
type_str = 'DATE'
|
|
elif data_type == 'timestamp with time zone':
|
|
type_str = 'TIMESTAMPTZ'
|
|
elif data_type == 'timestamp without time zone':
|
|
type_str = 'TIMESTAMP'
|
|
else:
|
|
type_str = data_type.upper()
|
|
|
|
if table_name not in tables:
|
|
tables[table_name] = {}
|
|
tables[table_name][col_name] = type_str
|
|
|
|
return tables
|
|
|
|
def parse_ddl_file(filepath, default_schema=None):
|
|
"""解析 DDL 文件,提取表和列定义"""
|
|
content = Path(filepath).read_text(encoding='utf-8')
|
|
|
|
tables = {}
|
|
|
|
# 匹配多种 CREATE TABLE 格式:
|
|
# 1. CREATE TABLE schema.table (...)
|
|
# 2. CREATE TABLE IF NOT EXISTS schema.table (...)
|
|
# 3. CREATE TABLE IF NOT EXISTS table (...) -- 需要 default_schema
|
|
# 4. CREATE TABLE table (...) -- 需要 default_schema
|
|
table_pattern = re.compile(
|
|
r'CREATE TABLE\s+(?:IF NOT EXISTS\s+)?(?:(\w+)\.)?(\w+)\s*\((.*?)\);',
|
|
re.DOTALL | re.IGNORECASE
|
|
)
|
|
|
|
for match in table_pattern.finditer(content):
|
|
schema = match.group(1) or default_schema
|
|
table_name = match.group(2)
|
|
columns_block = match.group(3)
|
|
|
|
columns = {}
|
|
# 解析列定义
|
|
for line in columns_block.split('\n'):
|
|
line = line.strip()
|
|
if not line or line.startswith('--'):
|
|
continue
|
|
# 跳过约束
|
|
if line.upper().startswith(('PRIMARY KEY', 'CONSTRAINT', 'UNIQUE', 'FOREIGN KEY', 'CHECK', 'EXCLUDE')):
|
|
continue
|
|
|
|
# 匹配列定义: column_name TYPE ...
|
|
col_match = re.match(r'^(\w+)\s+(\w+(?:\s*\([^)]+\))?)', line)
|
|
if col_match:
|
|
col_name = col_match.group(1).lower()
|
|
col_type = col_match.group(2).upper().replace(' ', '')
|
|
# 标准化类型
|
|
if col_type == 'INT':
|
|
col_type = 'INTEGER'
|
|
columns[col_name] = col_type
|
|
|
|
tables[table_name] = columns
|
|
|
|
return tables
|
|
|
|
def compare_schemas(db_tables, ddl_tables, schema_name):
|
|
"""比较数据库和 DDL 的差异"""
|
|
differences = {
|
|
'db_only_tables': [],
|
|
'ddl_only_tables': [],
|
|
'db_only_cols': [],
|
|
'ddl_only_cols': [],
|
|
'type_diff': []
|
|
}
|
|
|
|
# 检查 DDL 中有但数据库没有的表
|
|
for table in ddl_tables:
|
|
if table not in db_tables:
|
|
differences['ddl_only_tables'].append(f"{schema_name}.{table}")
|
|
|
|
# 检查数据库中有但 DDL 没有的表
|
|
for table in db_tables:
|
|
if table not in ddl_tables:
|
|
differences['db_only_tables'].append(f"{schema_name}.{table}")
|
|
|
|
# 检查共有表的列差异
|
|
for table in set(db_tables.keys()) & set(ddl_tables.keys()):
|
|
db_cols = db_tables[table]
|
|
ddl_cols = ddl_tables[table]
|
|
|
|
# DDL 有但 DB 没有的列
|
|
for col in ddl_cols:
|
|
if col not in db_cols:
|
|
differences['ddl_only_cols'].append(f"{schema_name}.{table}.{col} ({ddl_cols[col]})")
|
|
|
|
# DB 有但 DDL 没有的列
|
|
for col in db_cols:
|
|
if col not in ddl_cols:
|
|
differences['db_only_cols'].append(f"{schema_name}.{table}.{col} ({db_cols[col]})")
|
|
|
|
return differences
|
|
|
|
def main():
|
|
conn = psycopg2.connect(DSN)
|
|
|
|
base_dir = Path(__file__).parent.parent / 'etl_billiards' / 'database'
|
|
|
|
print("=" * 80)
|
|
print("DDL vs DB Structure Comparison")
|
|
print("=" * 80)
|
|
|
|
# 检查 ODS
|
|
print("\n### billiards_ods ###\n")
|
|
ods_ddl_file = base_dir / 'schema_ODS_doc.sql'
|
|
if ods_ddl_file.exists():
|
|
db_ods = get_db_columns(conn, 'billiards_ods')
|
|
ddl_ods = parse_ddl_file(ods_ddl_file, 'billiards_ods')
|
|
|
|
print(f"DB tables: {len(db_ods)}")
|
|
print(f"DDL tables: {len(ddl_ods)}")
|
|
|
|
diff_ods = compare_schemas(db_ods, ddl_ods, 'billiards_ods')
|
|
|
|
total_diff = sum(len(v) for v in diff_ods.values())
|
|
if total_diff > 0:
|
|
print(f"\nFound {total_diff} differences:")
|
|
if diff_ods['db_only_tables']:
|
|
print("\n [DB has, DDL missing] Tables:")
|
|
for t in sorted(diff_ods['db_only_tables']):
|
|
print(f" - {t}")
|
|
if diff_ods['ddl_only_tables']:
|
|
print("\n [DDL has, DB missing] Tables:")
|
|
for t in sorted(diff_ods['ddl_only_tables']):
|
|
print(f" - {t}")
|
|
if diff_ods['db_only_cols']:
|
|
print("\n [DB has, DDL missing] Columns:")
|
|
for c in sorted(diff_ods['db_only_cols']):
|
|
print(f" - {c}")
|
|
if diff_ods['ddl_only_cols']:
|
|
print("\n [DDL has, DB missing] Columns:")
|
|
for c in sorted(diff_ods['ddl_only_cols']):
|
|
print(f" - {c}")
|
|
else:
|
|
print("\nNo differences found.")
|
|
|
|
# 检查 DWD
|
|
print("\n### billiards_dwd ###\n")
|
|
dwd_ddl_file = base_dir / 'schema_dwd_doc.sql'
|
|
if dwd_ddl_file.exists():
|
|
db_dwd = get_db_columns(conn, 'billiards_dwd')
|
|
ddl_dwd = parse_ddl_file(dwd_ddl_file, 'billiards_dwd')
|
|
|
|
print(f"DB tables: {len(db_dwd)}")
|
|
print(f"DDL tables: {len(ddl_dwd)}")
|
|
|
|
diff_dwd = compare_schemas(db_dwd, ddl_dwd, 'billiards_dwd')
|
|
|
|
total_diff = sum(len(v) for v in diff_dwd.values())
|
|
if total_diff > 0:
|
|
print(f"\nFound {total_diff} differences:")
|
|
if diff_dwd['db_only_tables']:
|
|
print("\n [DB has, DDL missing] Tables:")
|
|
for t in sorted(diff_dwd['db_only_tables']):
|
|
print(f" - {t}")
|
|
if diff_dwd['ddl_only_tables']:
|
|
print("\n [DDL has, DB missing] Tables:")
|
|
for t in sorted(diff_dwd['ddl_only_tables']):
|
|
print(f" - {t}")
|
|
if diff_dwd['db_only_cols']:
|
|
print("\n [DB has, DDL missing] Columns:")
|
|
for c in sorted(diff_dwd['db_only_cols']):
|
|
print(f" - {c}")
|
|
if diff_dwd['ddl_only_cols']:
|
|
print("\n [DDL has, DB missing] Columns:")
|
|
for c in sorted(diff_dwd['ddl_only_cols']):
|
|
print(f" - {c}")
|
|
else:
|
|
print("\nNo differences found.")
|
|
|
|
conn.close()
|
|
print("\n" + "=" * 80)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|