Files
feiqiu-ETL/etl_billiards/scd/scd2_handler.py
2025-11-30 07:19:05 +08:00

90 lines
2.7 KiB
Python

# -*- coding: utf-8 -*-
"""SCD2 (Slowly Changing Dimension Type 2) 处理逻辑"""
from datetime import datetime
def _row_to_dict(cursor, row):
if row is None:
return None
columns = [desc[0] for desc in cursor.description]
return {col: row[idx] for idx, col in enumerate(columns)}
class SCD2Handler:
"""SCD2历史记录处理"""
def __init__(self, db_ops):
self.db = db_ops
def upsert(
self,
table_name: str,
natural_key: list,
tracked_fields: list,
record: dict,
effective_date: datetime = None,
) -> str:
"""
处理SCD2更新
Returns:
操作类型: 'INSERT', 'UPDATE', 'UNCHANGED'
"""
effective_date = effective_date or datetime.now()
where_clause = " AND ".join([f"{k} = %({k})s" for k in natural_key])
sql_select = f"""
SELECT * FROM {table_name}
WHERE {where_clause}
AND valid_to IS NULL
"""
with self.db.conn.cursor() as current:
current.execute(sql_select, record)
existing = _row_to_dict(current, current.fetchone())
if not existing:
record["valid_from"] = effective_date
record["valid_to"] = None
record["is_current"] = True
fields = list(record.keys())
placeholders = ", ".join([f"%({f})s" for f in fields])
sql_insert = f"""
INSERT INTO {table_name} ({', '.join(fields)})
VALUES ({placeholders})
"""
current.execute(sql_insert, record)
return "INSERT"
has_changes = any(existing.get(field) != record.get(field) for field in tracked_fields)
if not has_changes:
return "UNCHANGED"
update_where = " AND ".join([f"{k} = %({k})s" for k in natural_key])
sql_close = f"""
UPDATE {table_name}
SET valid_to = %(effective_date)s,
is_current = FALSE
WHERE {update_where}
AND valid_to IS NULL
"""
record["effective_date"] = effective_date
current.execute(sql_close, record)
record["valid_from"] = effective_date
record["valid_to"] = None
record["is_current"] = True
fields = list(record.keys())
if "effective_date" in fields:
fields.remove("effective_date")
placeholders = ", ".join([f"%({f})s" for f in fields])
sql_insert = f"""
INSERT INTO {table_name} ({', '.join(fields)})
VALUES ({placeholders})
"""
current.execute(sql_insert, record)
return "UPDATE"