181 lines
5.8 KiB
Python
181 lines
5.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""自动修复 bd_manual 文档中的类型不匹配问题"""
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
def fix_type_in_doc(doc_path, type_mismatches):
|
|
"""修复文档中的类型不匹配"""
|
|
if not Path(doc_path).exists():
|
|
print(f" SKIP: {doc_path} not found")
|
|
return False
|
|
|
|
content = Path(doc_path).read_text(encoding='utf-8')
|
|
original = content
|
|
|
|
for m in type_mismatches:
|
|
col_name = m['column']
|
|
old_type = m['doc_type']
|
|
new_type = m['db_type']
|
|
|
|
# 匹配字段行并替换类型
|
|
# 格式: | 序号 | 字段名 | 类型 | 可空 | ...
|
|
pattern = rf'(\|\s*\d+\s*\|\s*{col_name}\s*\|\s*){re.escape(old_type)}(\s*\|)'
|
|
replacement = rf'\g<1>{new_type}\g<2>'
|
|
content, count = re.subn(pattern, replacement, content)
|
|
|
|
if count > 0:
|
|
print(f" Fixed: {col_name}: {old_type} -> {new_type}")
|
|
else:
|
|
# 尝试更宽松的匹配
|
|
pattern2 = rf'(\|\s*{col_name}\s*\|\s*){re.escape(old_type)}(\s*\|)'
|
|
content, count = re.subn(pattern2, replacement.replace(r'\g<1>', r'\1').replace(r'\g<2>', r'\2'), content)
|
|
if count > 0:
|
|
print(f" Fixed (alt): {col_name}: {old_type} -> {new_type}")
|
|
else:
|
|
print(f" WARN: Could not fix {col_name}")
|
|
|
|
if content != original:
|
|
Path(doc_path).write_text(content, encoding='utf-8')
|
|
return True
|
|
return False
|
|
|
|
def add_missing_field(doc_path, table_name, field_name, db_schema):
|
|
"""向文档中添加缺失的字段"""
|
|
if not Path(doc_path).exists():
|
|
return False
|
|
|
|
# 从 db_schema 获取字段信息
|
|
field_info = None
|
|
for col in db_schema.get(table_name, []):
|
|
if col['column'] == field_name:
|
|
field_info = col
|
|
break
|
|
|
|
if not field_info:
|
|
print(f" WARN: Could not find {field_name} in db_schema")
|
|
return False
|
|
|
|
content = Path(doc_path).read_text(encoding='utf-8')
|
|
|
|
# 找到字段表格的最后一行,在其后添加新字段
|
|
# 格式: | 序号 | 字段名 | 类型 | 可空 | 主键 | 说明 |
|
|
lines = content.split('\n')
|
|
insert_idx = None
|
|
last_seq = 0
|
|
|
|
for i, line in enumerate(lines):
|
|
# 匹配字段行
|
|
match = re.match(r'\|\s*(\d+)\s*\|\s*(\w+)\s*\|', line)
|
|
if match:
|
|
seq = int(match.group(1))
|
|
if seq > last_seq:
|
|
last_seq = seq
|
|
insert_idx = i
|
|
|
|
if insert_idx is not None:
|
|
new_seq = last_seq + 1
|
|
nullable = 'YES' if field_info['nullable'] == 'YES' else 'NO'
|
|
new_line = f"| {new_seq} | {field_name} | {field_info['type']} | {nullable} | | 调整时间 |"
|
|
lines.insert(insert_idx + 1, new_line)
|
|
|
|
Path(doc_path).write_text('\n'.join(lines), encoding='utf-8')
|
|
print(f" Added: {field_name} (type: {field_info['type']})")
|
|
return True
|
|
|
|
return False
|
|
|
|
def main():
|
|
# 加载差异数据
|
|
with open('tmp/bd_manual_diff.json', 'r', encoding='utf-8') as f:
|
|
diffs = json.load(f)
|
|
|
|
# 加载数据库 schema (需要重新获取带详细信息的)
|
|
import psycopg2
|
|
DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test'
|
|
|
|
conn = psycopg2.connect(DSN)
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT table_name, column_name, data_type, is_nullable,
|
|
COALESCE(character_maximum_length, numeric_precision) as max_length,
|
|
numeric_scale
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'billiards_dwd'
|
|
ORDER BY table_name, ordinal_position
|
|
""")
|
|
|
|
db_schema = {}
|
|
TYPE_MAP = {
|
|
'bigint': 'BIGINT',
|
|
'integer': 'INTEGER',
|
|
'smallint': 'SMALLINT',
|
|
'numeric': 'NUMERIC',
|
|
'text': 'TEXT',
|
|
'character varying': 'VARCHAR',
|
|
'boolean': 'BOOLEAN',
|
|
'timestamp with time zone': 'TIMESTAMPTZ',
|
|
'timestamp without time zone': 'TIMESTAMP',
|
|
'date': 'DATE',
|
|
'jsonb': 'JSONB',
|
|
'json': 'JSON',
|
|
}
|
|
|
|
for row in cur.fetchall():
|
|
table_name, col_name, data_type, nullable, max_len, scale = row
|
|
if table_name not in db_schema:
|
|
db_schema[table_name] = []
|
|
|
|
type_str = TYPE_MAP.get(data_type, data_type.upper())
|
|
if data_type == 'numeric' and max_len and scale is not None:
|
|
type_str = f'NUMERIC({max_len},{scale})'
|
|
elif data_type == 'character varying' and max_len:
|
|
type_str = f'VARCHAR({max_len})'
|
|
|
|
db_schema[table_name].append({
|
|
'column': col_name,
|
|
'type': type_str,
|
|
'nullable': nullable,
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
print("=" * 80)
|
|
print("Fixing BD Manual Documents")
|
|
print("=" * 80)
|
|
|
|
fixed_count = 0
|
|
|
|
for diff in diffs:
|
|
table = diff['table']
|
|
doc_path = diff.get('doc_path', '')
|
|
|
|
if not doc_path:
|
|
continue
|
|
|
|
has_changes = False
|
|
|
|
# 修复类型不匹配
|
|
if diff.get('type_mismatches'):
|
|
print(f"\n### {table} (type fixes) ###")
|
|
if fix_type_in_doc(doc_path, diff['type_mismatches']):
|
|
has_changes = True
|
|
|
|
# 添加缺失字段
|
|
if diff.get('missing_in_doc'):
|
|
print(f"\n### {table} (missing fields) ###")
|
|
for field in diff['missing_in_doc']:
|
|
if add_missing_field(doc_path, table, field, db_schema):
|
|
has_changes = True
|
|
|
|
if has_changes:
|
|
fixed_count += 1
|
|
|
|
print("\n" + "=" * 80)
|
|
print(f"Fixed {fixed_count} documents")
|
|
print("=" * 80)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|