# -*- coding: utf-8 -*- """SCD2 (Slowly Changing Dimension Type 2) 处理逻辑""" from datetime import datetime def _row_to_dict(cursor, row): if row is None: return None columns = [desc[0] for desc in cursor.description] return {col: row[idx] for idx, col in enumerate(columns)} class SCD2Handler: """SCD2历史记录处理""" def __init__(self, db_ops): self.db = db_ops def upsert( self, table_name: str, natural_key: list, tracked_fields: list, record: dict, effective_date: datetime = None, ) -> str: """ 处理SCD2更新 Returns: 操作类型: 'INSERT', 'UPDATE', 'UNCHANGED' """ effective_date = effective_date or datetime.now() where_clause = " AND ".join([f"{k} = %({k})s" for k in natural_key]) sql_select = f""" SELECT * FROM {table_name} WHERE {where_clause} AND valid_to IS NULL """ with self.db.conn.cursor() as current: current.execute(sql_select, record) existing = _row_to_dict(current, current.fetchone()) if not existing: record["valid_from"] = effective_date record["valid_to"] = None record["is_current"] = True fields = list(record.keys()) placeholders = ", ".join([f"%({f})s" for f in fields]) sql_insert = f""" INSERT INTO {table_name} ({', '.join(fields)}) VALUES ({placeholders}) """ current.execute(sql_insert, record) return "INSERT" has_changes = any(existing.get(field) != record.get(field) for field in tracked_fields) if not has_changes: return "UNCHANGED" update_where = " AND ".join([f"{k} = %({k})s" for k in natural_key]) sql_close = f""" UPDATE {table_name} SET valid_to = %(effective_date)s, is_current = FALSE WHERE {update_where} AND valid_to IS NULL """ record["effective_date"] = effective_date current.execute(sql_close, record) record["valid_from"] = effective_date record["valid_to"] = None record["is_current"] = True fields = list(record.keys()) if "effective_date" in fields: fields.remove("effective_date") placeholders = ", ".join([f"%({f})s" for f in fields]) sql_insert = f""" INSERT INTO {table_name} ({', '.join(fields)}) VALUES ({placeholders}) """ current.execute(sql_insert, record) return "UPDATE"