Sdílet prostřednictvím


Začínáme: Příprava dat na dodržování předpisů GDPR

Obecné nařízení o ochraně osobních údajů (GDPR) a Zákon o ochraně osobních údajů v Kalifornii (CCPA) jsou nařízení o ochraně osobních údajů a zabezpečení dat, která vyžadují, aby společnosti trvale a zcela odstranily všechny identifikovatelné osobní údaje (PII) shromážděné o zákazníkovi na jejich výslovné žádosti. Žádosti o odstranění se také označují jako "právo na zapomenutí" (RTBF) nebo "právo na vymazání dat", musí být žádosti o odstranění provedeny během zadaného období (například do jednoho kalendářního měsíce).

Tento článek vás provede implementací RTBF na data uložená v Databricks. Příklad, který je součástí tohoto článku, modeluje datové sady pro společnost elektronického obchodování a ukazuje, jak odstranit data ve zdrojových tabulkách a rozšířit tyto změny do podřízených tabulek.

Podrobný plán pro implementaci "práva být zapomenuto"

Následující diagram znázorňuje, jak implementovat právo na zapomenutí.

diagram, který ukazuje, jak implementovat dodržování předpisů GDPR.

odstranění bodu pomocí Delta Lake

Delta Lake zrychluje odstraňování konkrétních záznamů ve velkých datových jezerech s transakcemi ACID, což umožňuje vyhledat a odebrat osobní údaje (PII) v reakci na žádosti podle GDPR nebo CCPA.

Delta Lake uchovává historii tabulek a zpřístupňuje ji pro dotazy k určitému bodu v čase a vrácení zpět. Funkce VACUUM odebere datové soubory, na které už tabulka Delta neodkazuje a které jsou starší než zadaná prahová hodnota uchovávání, trvale odstraní data. Další informace o výchozích nastaveních a doporučeních najdete v tématu Práce s historií tabulek Delta Lake.

Poznámka:

U tabulek s povolenými vektory odstranění musíte také spustit REORG TABLE ... APPLY (PURGE), aby se trvale odstranily podkladové záznamy. Viz Použijte změny na datové soubory Parquet.

Odstranění dat v upstreamových zdrojích

GDPR a CCPA se vztahují na všechna data, včetně dat ve zdrojích mimo Delta Lake, jako jsou Kafka, soubory a databáze. Kromě odstranění dat v Databricks musíte také pamatovat na odstranění dat v upstreamových zdrojích, jako jsou fronty a cloudové úložiště.

Úplné odstranění je vhodnější než obfuskace.

Musíte si vybrat mezi odstraněním dat a jejich obfuzením. Obfuskace lze implementovat pomocí pseudonymizace, maskování dat atd. Nejbezpečnější možností je však úplné vymazání, protože v praxi odstranění rizika opětovného identifikace často vyžaduje úplné odstranění údajů PII.

Odstranění dat v bronzové vrstvě a následné šíření odstranění do stříbrných a zlatých vrstev

Doporučujeme, abyste dodržování předpisů GDPR a CCPA zahájili odstraněním dat v bronzové vrstvě. Toto odstranění by mělo být řízeno naplánovanou úlohou, která dotazuje na řídicí tabulku s žádostmi o odstranění. Po odstranění dat z bronzové vrstvy je možné změny rozšířit do stříbrných a zlatých vrstev.

Pravidelné udržování tabulek pro odebrání dat z historických souborů

Delta Lake ve výchozím nastavení uchovává historii tabulek, včetně odstraněných záznamů, po dobu 30 dnů a zpřístupňuje ji pro časovou cestu a vrácení zpět. I když se ale předchozí verze dat odeberou, data se stále uchovávají v cloudovém úložišti. Proto byste měli pravidelně udržovat tabulky a zobrazení, abyste odebrali předchozí verze dat. Doporučeným způsobem je prediktivní optimalizace spravovaných tabulek v katalogu Unity, která inteligentně udržuje streamované tabulky i materializovaná zobrazení. DLT automaticky provádí úlohy údržby do 24 hodin po aktualizaci streamovaných tabulek a materializovaných zobrazení.

Pokud nepoužíváte prediktivní optimalizaci nebo DLT, měli byste na tabulkách Delta spustit příkaz VACUUM, který trvale odebere předchozí verze dat. Ve výchozím nastavení se tím zkrátí možnosti časového cestování na 7 dní, což je konfigurovatelné nastavenía odebere se z cloudového úložiště také historické verze dat.

Odstranění dat PII z bronzové vrstvy

V závislosti na návrhu vašeho lakehousu byste mohli být schopni odpojit propojení mezi PII a daty uživatelů bez PII. Pokud například používáte jiný než přirozený klíč, například user_id místo přirozeného klíče, jako je e-mail, můžete odstranit data PII, která ponechá data bez PII.

Zbytek tohoto článku zpracovává RTBF tím, že zcela odstraní záznamy uživatelů ze všech bronzových tabulek. Data můžete odstranit spuštěním příkazu DELETE, jak je znázorněno v následujícím kódu:

spark.sql("DELETE FROM bronze.users WHERE user_id = 5")

Při odstraňování velkého počtu záznamů současně doporučujeme použít příkaz MERGE. Následující kód předpokládá, že máte řídicí tabulku s názvem gdpr_control_table, která obsahuje sloupec user_id. Do této tabulky vložíte záznam pro každého uživatele, který požádal o "právo, aby se zapomnělo" do této tabulky.

Příkaz MERGE určuje podmínku pro odpovídající řádky. V tomto příkladu se záznamy z target_table shodují se záznamy v gdpr_control_table podle user_id. Pokud existuje shoda (například user_id v target_table i gdpr_control_table), řádek v target_table se odstraní. Po úspěšném provedení tohoto příkazu MERGE aktualizujte řídicí tabulku, abyste potvrdili, že žádost byla zpracována.

spark.sql("""
  MERGE INTO target
  USING (
    SELECT user_id
    FROM gdpr_control_table
  ) AS source
  ON target.user_id = source.user_id
  WHEN MATCHED THEN DELETE
""")

Propagovat změny z bronzové do stříbrné a zlaté vrstvy

Po odstranění dat v bronzové vrstvě je nutné změny rozšířit do tabulek ve stříbrných a zlatých vrstvách.

Materializovaná zobrazení: Automatické řešení odstranění

Materializovaná zobrazení automaticky zpracovávají odstranění ve zdrojích. Proto nemusíte dělat nic zvláštního, abyste zajistili, že materializované zobrazení neobsahuje data odstraněná ze zdroje. Je nutné aktualizovat materializované zobrazení a spustit údržbu, aby se zajistilo, že se odstranění kompletně zpracuje.

Materializované zobrazení vždy vrátí správný výsledek, protože používá přírůstkové výpočty, pokud je levnější než úplné výpočty, ale nikdy za cenu správnosti. Jinými slovy, odstranění dat ze zdroje by mohlo způsobit úplné překomputování materializovaného zobrazení.

diagram, který znázorňuje, jak automaticky zpracovávat odstranění

Streamované tabulky: Smazání dat a čtení streamovacího zdroje pomocí funkce skipChangeCommits

Streamované tabulky mohou zpracovávat pouze přidávaná data. To znamená, že streamované tabulky očekávají, že se ve zdroji streamování zobrazí jenom nové řádky dat. Jakákoli jiná operace, například aktualizace nebo odstranění jakéhokoli záznamu ze zdrojové tabulky používané pro streamování, není podporovaná a přeruší stream.

diagram, který ukazuje, jak zpracovávat odstranění v streamovaných tabulkách

Vzhledem k tomu, že streamování zpracovává pouze nová data, musíte zpracovávat změny dat sami. Doporučenou metodou je: (1) odstranit data ve zdroji streamování, (2) odstranit data z tabulky streamování a poté (3) aktualizovat načítání streamování tak, aby používalo skipChangeCommits. Tato značka dává najevo Databricks, že streamovaná tabulka by měla přeskočit vše kromě vložení, jako jsou aktualizace nebo odstranění.

Diagram znázorňující metodu dodržování předpisů GDPR, která používá skipChangeCommits.

Případně můžete (1) odstranit data ze zdroje, (2) je odstranit ze streamované tabulky a pak (3) plně aktualizovat streamovací tabulku. Když plně aktualizujete streamovací tabulku, vymaže stav streamování tabulky a znovu zpracuje všechna data. Jakýkoli nadřazený zdroj dat, který přesahuje dobu uchovávání dat (například Kafka téma, které po 7 dnech automaticky maže data), se znovu nezpracuje, což by mohlo způsobit ztrátu dat. Tuto možnost doporučujeme pro streamované tabulky pouze ve scénáři, kdy jsou historická data k dispozici a jejich zpracování nebudou nákladné.

Diagram znázorňující metodu dodržování předpisů GDPR, která provádí úplnou aktualizaci tabulky streamování

Tabulky Delta: Správa odstranění pomocí readChangeFeed

Běžné tabulky Delta neobsahují žádné zvláštní zpracování odstranění ze vstupního zdroje. Místo toho je potřeba napsat vlastní kód pro přenos mazání do nich (například spark.readStream.option("readChangeFeed", true).table("source_table")).

Příklad: Dodržování předpisů GDPR a kalifornského zákona na ochranu soukromí spotřebitelů (CCPA) pro společnost elektronického obchodování

Následující diagram znázorňuje medailonovou architekturu pro e-commerce společnost, ve které je potřeba implementovat dodržování předpisů GDPR & a CCPA. I když se data uživatele odstraní, můžete chtít spočítat aktivity v podřízených agregacích.

Diagram, který znázorňuje příklad dodržování předpisů GDPR a CCPA pro e-commerce společnost.

  • bronzová vrstva
    • users – dimenze uživatele Obsahuje PII (například e-mailovou adresu).
    • clickstream – klikněte na události. Obsahuje PII (například IP adresu).
    • gdpr_requests – řídicí tabulka obsahující ID uživatelů, na která se vztahuje "právo na zapomenutí".
  • Stříbrná vrstva
    • clicks_hourly – celkový počet kliknutí za hodinu Pokud odstraníte uživatele, stále chcete spočítat jejich kliknutí.
    • clicks_by_user – celkový počet kliknutí na uživatele Pokud odstraníte uživatele, nechcete spočítat jejich kliknutí.
  • zlatá vrstva
    • revenue_by_user – celkové útraty jednotlivých uživatelů

Krok 1: Naplnění tabulek ukázkovými daty

Následující kód vytvoří tyto dvě tabulky:

  • source_users obsahuje dimenzionální data o uživatelích. Tato tabulka obsahuje sloupec PII s názvem email.
  • source_clicks obsahuje data událostí o aktivitách prováděných uživateli. Obsahuje sloupec PII s názvem ip_address.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DateType

catalog = "users"
schema = "name"

# Create table containing sample users
users_schema = StructType([
   StructField('user_id', IntegerType(), False),
   StructField('username', StringType(), True),
   StructField('email', StringType(), True),
   StructField('registration_date', StringType(), True),
   StructField('user_preferences', MapType(StringType(), StringType()), True)
])

users_data = [
   (1, 'alice', 'alice@example.com', '2021-01-01', {'theme': 'dark', 'language': 'en'}),
   (2, 'bob', 'bob@example.com', '2021-02-15', {'theme': 'light', 'language': 'fr'}),
   (3, 'charlie', 'charlie@example.com', '2021-03-10', {'theme': 'dark', 'language': 'es'}),
   (4, 'david', 'david@example.com', '2021-04-20', {'theme': 'light', 'language': 'de'}),
   (5, 'eve', 'eve@example.com', '2021-05-25', {'theme': 'dark', 'language': 'it'})
]

users_df = spark.createDataFrame(users_data, schema=users_schema)
users_df.write..mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_users")

# Create table containing clickstream (i.e. user activities)
from pyspark.sql.types import TimestampType

clicks_schema = StructType([
   StructField('click_id', IntegerType(), False),
   StructField('user_id', IntegerType(), True),
   StructField('url_clicked', StringType(), True),
   StructField('click_timestamp', StringType(), True),
   StructField('device_type', StringType(), True),
   StructField('ip_address', StringType(), True)
])

clicks_data = [
   (1001, 1, 'https://example.com/home', '2021-06-01T12:00:00', 'mobile', '192.168.1.1'),
   (1002, 1, 'https://example.com/about', '2021-06-01T12:05:00', 'desktop', '192.168.1.1'),
   (1003, 2, 'https://example.com/contact', '2021-06-02T14:00:00', 'tablet', '192.168.1.2'),
   (1004, 3, 'https://example.com/products', '2021-06-03T16:30:00', 'mobile', '192.168.1.3'),
   (1005, 4, 'https://example.com/services', '2021-06-04T10:15:00', 'desktop', '192.168.1.4'),
   (1006, 5, 'https://example.com/blog', '2021-06-05T09:45:00', 'tablet', '192.168.1.5')
]

clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
clicks_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.source_clicks")

Krok 2: Vytvoření kanálu, který zpracovává data PII

Následující kód vytvoří bronzovou, stříbrnou a zlatou vrstvu architektury medailonu uvedené výše.

import dlt
from pyspark.sql.functions import col, concat_ws, count, countDistinct, avg, when, expr

catalog = "users"
schema = "name"

# ----------------------------
# Bronze Layer - Raw Data Ingestion
# ----------------------------

@dlt.table(
   name=f"{catalog}.{schema}.users_bronze",
   comment='Raw users data loaded from source'
)
def users_bronze():
   return (
     spark.readStream.table(f"{catalog}.{schema}.source_users")
   )

@dlt.table(
   name=f"{catalog}.{schema}.clicks_bronze",
   comment='Raw clicks data loaded from source'
)
def clicks_bronze():
   return (
       spark.readStream.table(f"{catalog}.{schema}.source_clicks")
   )

# ----------------------------
# Silver Layer - Data Cleaning and Enrichment
# ----------------------------

@dlt.table(
   name=f"{catalog}.{schema}.users_silver",
   comment='Cleaned and standardized users data'
)
@dlt.expect_or_drop('valid_email', "email IS NOT NULL")
def users_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.users_bronze")
           .withColumn('registration_date', col('registration_date').cast('timestamp'))
           .dropDuplicates(['user_id', 'registration_date'])
           .select('user_id', 'username', 'email', 'registration_date', 'user_preferences')
   )

@dlt.table(
   name=f"{catalog}.{schema}.clicks_silver",
   comment='Cleaned and standardized clicks data'
)
@dlt.expect_or_drop('valid_click_timestamp', "click_timestamp IS NOT NULL")
def clicks_silver():
   return (
       spark.readStream
           .table(f"{catalog}.{schema}.clicks_bronze")
           .withColumn('click_timestamp', col('click_timestamp').cast('timestamp'))
           .withWatermark('click_timestamp', '10 minutes')
           .dropDuplicates(['click_id'])
           .select('click_id', 'user_id', 'url_clicked', 'click_timestamp', 'device_type', 'ip_address')
   )

@dlt.table(
   name=f"{catalog}.{schema}.user_clicks_silver",
   comment='Joined users and clicks data on user_id'
)
def user_clicks_silver():
   # Read users_silver as a static DataFrame
   users = spark.read.table(f"{catalog}.{schema}.users_silver")

   # Read clicks_silver as a streaming DataFrame
   clicks = spark.readStream \
       .table('clicks_silver')

   # Perform the join
   joined_df = clicks.join(users, on='user_id', how='inner')

   return joined_df

# ----------------------------
# Gold Layer - Aggregated and Business-Level Data
# ----------------------------

@dlt.table(
   name=f"{catalog}.{schema}.user_behavior_gold",
   comment='Aggregated user behavior metrics'
)
def user_behavior_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_clicks_silver")
   return (
       df.groupBy('user_id')
         .agg(
             count('click_id').alias('total_clicks'),
             countDistinct('url_clicked').alias('unique_urls')
         )
   )

@dlt.table(
   name=f"{catalog}.{schema}.marketing_insights_gold",
   comment='User segments for marketing insights'
)
def marketing_insights_gold():
   df = spark.read.table(f"{catalog}.{schema}.user_behavior_gold")
   return (
       df.withColumn(
           'engagement_segment',
           when(col('total_clicks') >= 100, 'High Engagement')
           .when((col('total_clicks') >= 50) & (col('total_clicks') < 100), 'Medium Engagement')
           .otherwise('Low Engagement')
       )
   )

Krok 3: Odstranění dat ve zdrojových tabulkách

V tomto kroku deletujete data ve všech tabulkách, ve kterých se nachází PII.

catalog = "users"
schema = "name"

def apply_gdpr_delete(user_id):
 tables_with_pii = ["clicks_bronze", "users_bronze", "clicks_silver", "users_silver", "user_clicks_silver"]

 for table in tables_with_pii:
   print(f"Deleting user_id {user_id} from table {table}")
   spark.sql(f"""
     DELETE FROM {catalog}.{schema}.{table}
     WHERE user_id = {user_id}
   """)

Krok 4: Přidání skipChangeCommits do definic ovlivněných streamovaných tabulek

V tomto kroku musíte říct DLT, aby přeskočil řádky, které nejsou určené pro přidání. Přidejte možnost skipChangeCommits do následujících metod. Nemusíte aktualizovat definice materializovaných zobrazení, protože automaticky zpracovávají aktualizace a odstranění:

  • users_bronze
  • users_silver
  • clicks_bronze
  • clicks_silver
  • user_clicks_silver

Následující kód ukazuje, jak aktualizovat metodu users_bronze:

def users_bronze():
   return (
     spark.readStream.option('skipChangeCommits', 'true').table(f"{catalog}.{schema}.source_users")
   )

Po opětovném spuštění pipeline proběhne úspěšná aktualizace.