90 lines
2.7 KiB
Python
90 lines
2.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""SCD2 (Slowly Changing Dimension Type 2) 处理逻辑"""
|
|
from datetime import datetime
|
|
|
|
|
|
def _row_to_dict(cursor, row):
|
|
if row is None:
|
|
return None
|
|
columns = [desc[0] for desc in cursor.description]
|
|
return {col: row[idx] for idx, col in enumerate(columns)}
|
|
|
|
|
|
class SCD2Handler:
|
|
"""SCD2历史记录处理"""
|
|
|
|
def __init__(self, db_ops):
|
|
self.db = db_ops
|
|
|
|
def upsert(
|
|
self,
|
|
table_name: str,
|
|
natural_key: list,
|
|
tracked_fields: list,
|
|
record: dict,
|
|
effective_date: datetime = None,
|
|
) -> str:
|
|
"""
|
|
处理SCD2更新
|
|
|
|
Returns:
|
|
操作类型: 'INSERT', 'UPDATE', 'UNCHANGED'
|
|
"""
|
|
effective_date = effective_date or datetime.now()
|
|
|
|
where_clause = " AND ".join([f"{k} = %({k})s" for k in natural_key])
|
|
sql_select = f"""
|
|
SELECT * FROM {table_name}
|
|
WHERE {where_clause}
|
|
AND valid_to IS NULL
|
|
"""
|
|
|
|
with self.db.conn.cursor() as current:
|
|
current.execute(sql_select, record)
|
|
existing = _row_to_dict(current, current.fetchone())
|
|
|
|
if not existing:
|
|
record["valid_from"] = effective_date
|
|
record["valid_to"] = None
|
|
record["is_current"] = True
|
|
|
|
fields = list(record.keys())
|
|
placeholders = ", ".join([f"%({f})s" for f in fields])
|
|
sql_insert = f"""
|
|
INSERT INTO {table_name} ({', '.join(fields)})
|
|
VALUES ({placeholders})
|
|
"""
|
|
current.execute(sql_insert, record)
|
|
return "INSERT"
|
|
|
|
has_changes = any(existing.get(field) != record.get(field) for field in tracked_fields)
|
|
if not has_changes:
|
|
return "UNCHANGED"
|
|
|
|
update_where = " AND ".join([f"{k} = %({k})s" for k in natural_key])
|
|
sql_close = f"""
|
|
UPDATE {table_name}
|
|
SET valid_to = %(effective_date)s,
|
|
is_current = FALSE
|
|
WHERE {update_where}
|
|
AND valid_to IS NULL
|
|
"""
|
|
record["effective_date"] = effective_date
|
|
current.execute(sql_close, record)
|
|
|
|
record["valid_from"] = effective_date
|
|
record["valid_to"] = None
|
|
record["is_current"] = True
|
|
|
|
fields = list(record.keys())
|
|
if "effective_date" in fields:
|
|
fields.remove("effective_date")
|
|
placeholders = ", ".join([f"%({f})s" for f in fields])
|
|
sql_insert = f"""
|
|
INSERT INTO {table_name} ({', '.join(fields)})
|
|
VALUES ({placeholders})
|
|
"""
|
|
current.execute(sql_insert, record)
|
|
|
|
return "UPDATE"
|