Delen via


Delta Lake-wijzigingenfeed gebruiken in Azure Databricks

Met de gegevensfeed voor wijzigingen kan Azure Databricks wijzigingen op rijniveau bijhouden tussen versies van een Delta-tabel. Wanneer deze optie is ingeschakeld voor een Delta-tabel, worden tijdens de runtime gebeurtenissen vastgelegd voor alle gegevens die in de tabel zijn geschreven. Dit omvat de rijgegevens, samen met metagegevens die aangeven of de opgegeven rij is ingevoegd, verwijderd of bijgewerkt.

Belangrijk

Wijzigingen in de gegevensfeed werken samen met de tabelgeschiedenis om wijzigingsinformatie te bieden. Omdat het klonen van een Delta-tabel een afzonderlijke geschiedenis maakt, komt de wijzigingsgegevensfeed op gekloonde tabellen niet overeen met die van de oorspronkelijke tabel.

Wijzigingsgegevens incrementeel verwerken

Databricks raadt aan wijzigingenfeeds te gebruiken in combinatie met Structured Streaming om wijzigingen uit Delta-tabellen incrementeel te verwerken. U moet Structured Streaming voor Azure Databricks gebruiken om automatisch versies bij te houden voor de wijzigingengegevensfeed van uw tabel.

Notitie

DLT biedt functionaliteit voor eenvoudige doorgifte van wijzigingsgegevens en het opslaan van resultaten als SCD -type 1 of type 2-tabellen. Zie De APPLY CHANGES-API's: Het vastleggen van wijzigingen vereenvoudigen met DLT-.

Om de wijzigingsgegevensstroom uit een tabel te lezen, moet u de wijzigingsgegevensstroom voor die tabel inschakelen. Zie Schakel de wijzigingsgegevensfeed in.

Stel de optie readChangeFeed in voor true het configureren van een stream voor een tabel om de wijzigingengegevensfeed te lezen, zoals wordt weergegeven in het volgende syntaxisvoorbeeld:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Standaard retourneert de stream de meest recente momentopname van de tabel wanneer de stream voor het eerst wordt gestart als een INSERT en toekomstige wijzigingen als wijzigingsgegevens.

Wijzig gegevensdoorvoeringen als onderdeel van de Delta Lake-transactie en wordt tegelijkertijd beschikbaar voor de nieuwe gegevensdoorvoeringen in de tabel.

U kunt desgewenst een beginversie opgeven. Zie Moet ik een beginversie opgeven?.

Wijzigingenfeed biedt ook ondersteuning voor batchuitvoering. Hiervoor moet een beginversie worden opgegeven. Zie Wijzigingen lezen in batchqueries.

Opties zoals frequentielimieten (maxFilesPerTrigger, maxBytesPerTrigger) en excludeRegex worden ook ondersteund bij het lezen van wijzigingsgegevens.

Snelheidsbeperking kan atomisch zijn voor andere versies dan de versie van de beginmomentopname. Dat wil zeggen, de volledige commitversie zal beperkt worden of de volledige commit wordt teruggestuurd.

Moet ik een beginversie opgeven?

U kunt desgewenst een beginversie opgeven als u wijzigingen wilt negeren die zijn opgetreden vóór een bepaalde versie. U kunt een versie opgeven met behulp van een tijdstempel of het versie-id-nummer dat is vastgelegd in het Delta-transactielogboek.

Notitie

Een beginversie is vereist voor batchleesbewerkingen en veel batchpatronen kunnen profiteren van het instellen van een optionele eindversie.

Wanneer u Structured Streaming-workloads configureert met betrekking tot wijzigingengegevensfeed, is het belangrijk om te begrijpen hoe het opgeven van een beginversie van invloed is op de verwerking.

Veel streamingworkloads, met name nieuwe pijplijnen voor gegevensverwerking, profiteren van het standaardgedrag. Met het standaardgedrag wordt de eerste batch verwerkt wanneer de stream eerst alle bestaande records in de tabel registreert als INSERT bewerkingen in de wijzigingengegevensfeed.

Als uw doeltabel al alle records met de juiste wijzigingen tot een bepaald punt bevat, geeft u een beginversie op om te voorkomen dat de status van de brontabel als INSERT gebeurtenissen wordt verwerkt.

De volgende voorbeeldsyntaxis die wordt hersteld na een streamingfout waarbij het controlepunt is beschadigd. In dit voorbeeld wordt uitgegaan van de volgende voorwaarden:

  1. De wijzigingsgegevensfeed is bij het aanmaken van de brontabel ingeschakeld.
  2. De downstream-doeltabel heeft alle wijzigingen tot en met versie 75 verwerkt.
  3. Versiegeschiedenis voor de brontabel is beschikbaar voor versies 70 en hoger.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

In dit voorbeeld moet u ook een nieuwe controlepuntlocatie opgeven.

Belangrijk

Als u een beginversie opgeeft, kan de stream niet worden gestart vanaf een nieuw controlepunt als de beginversie niet meer aanwezig is in de tabelgeschiedenis. Delta Lake schoont historische versies automatisch op, wat betekent dat alle opgegeven beginversies uiteindelijk worden verwijderd.

Zie Kan ik de gegevensfeed voor wijzigingen gebruiken om de hele geschiedenis van een tabel opnieuw af te spelen?.

Wijzigingen in batchquery's lezen

U kunt de syntaxis van batchquery's gebruiken om alle wijzigingen te lezen die beginnen met een bepaalde versie of om wijzigingen binnen een opgegeven reeks versies te lezen.

U geeft een versie op als een geheel getal en een tijdstempel als een tekenreeks in de notatie yyyy-MM-dd[ HH:mm:ss[.SSS]].

De begin- en eindversies zijn inclusief in de query's. Als u de wijzigingen van een bepaalde beginversie naar de nieuwste versie van de tabel wilt lezen, geeft u alleen de beginversie op.

Als u een lagere versie of tijdstempel opgeeft die ouder is dan een versie die wijzigingengebeurtenissen heeft geregistreerd( dat wil zeggen wanneer de wijzigingenfeed is ingeschakeld), wordt er een fout gegenereerd die aangeeft dat de wijzigingenfeed niet is ingeschakeld.

In de volgende syntaxisvoorbeelden ziet u hoe u opties voor de begin- en eindversie gebruikt met batchleesbewerkingen:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Notitie

Standaard wordt de fout timestampGreaterThanLatestCommit gegenereerd als een gebruiker een versie of tijdstempel opgeeft die de laatste commit in een tabel overschrijdt. In Databricks Runtime 11.3 LTS en hoger kan de gegevensfeed het geval van de versie buiten bereik verwerken als de gebruiker de volgende configuratie instelt op true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Als u een beginversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere begintijdstempel dan de laatste doorvoering in een tabel, wordt er een leeg leesresultaat geretourneerd wanneer de voorgaande configuratie is ingeschakeld.

Als u een eindversie opgeeft die groter is dan de laatste doorvoering in een tabel of een nieuwere eindtijdstempel dan de laatste doorvoering in een tabel, worden alle wijzigingen tussen de beginversie en de laatste doorvoering geretourneerd wanneer de voorgaande configuratie is ingeschakeld in de batchleesmodus.

Wat is het schema voor de wijzigingsgegevensstroom?

Wanneer u uit de wijzigingengegevensfeed voor een tabel leest, wordt het schema voor de nieuwste tabelversie gebruikt.

Notitie

De meeste schemawijzigings- en evolutiebewerkingen worden volledig ondersteund. Een tabel waarvoor kolomtoewijzing is ingeschakeld, biedt geen ondersteuning voor alle gebruikssituaties en vertoont verschillend gedrag. Zie Beperkingen voor gegevensfeeds bij tabellen met ingeschakelde kolomtoewijzing wijzigen.

Naast de gegevenskolommen uit het schema van de Delta-tabel bevat wijzigingenfeed metagegevenskolommen waarmee het type wijzigingsevenement wordt geïdentificeerd:

Kolomnaam Type Waarden
_change_type Snaar insert, update_preimage, update_postimagedelete(1)
_commit_version Lang Het Delta-logboek of de tabelversie met de wijziging.
_commit_timestamp Tijdstempel Het tijdstempel dat gekoppeld is aan het moment waarop de commit is gemaakt.

(1)preimage is de waarde vóór de update, postimage is de waarde na de update.

Notitie

U kunt de gegevensfeed voor een tabel niet inschakelen als het schema kolommen bevat met dezelfde namen als deze toegevoegde kolommen. Wijzig de naam van kolommen in de tabel om dit conflict op te lossen voordat u de wijzigingenfeed probeert in te schakelen.

Wijzigingenfeed inschakelen

U kunt de wijzigingengegevensfeed alleen lezen voor ingeschakelde tabellen. U moet de optie voor de gegevensfeed voor wijzigingen expliciet inschakelen met behulp van een van de volgende methoden:

  • Nieuwe tabel: Stel de tabeleigenschap delta.enableChangeDataFeed = true in de CREATE TABLE opdracht in.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Bestaande tabel: Stel de tabeleigenschap delta.enableChangeDataFeed = true in de ALTER TABLE opdracht in.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alle nieuwe tabellen:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Belangrijk

Alleen wijzigingen die zijn aangebracht nadat u de wijzigingenfeed hebt ingeschakeld, worden vastgelegd. Eerdere wijzigingen in een tabel worden niet vastgelegd.

Gegevensopslag wijzigen

Het inschakelen van wijzigingengegevensfeed veroorzaakt een kleine toename van de opslagkosten voor een tabel. Wijzigingsgegevensrecords worden gegenereerd wanneer de query wordt uitgevoerd en zijn over het algemeen veel kleiner dan de totale grootte van herschreven bestanden.

Azure Databricks registreert mutatiegegevens voor de UPDATE, DELETE en MERGE bewerkingen in de map _change_data onder de tabel-directory. Sommige bewerkingen, zoals bewerkingen met alleen-invoegen en verwijderingen van volledige partities, genereren geen gegevens in de _change_data map, omdat Azure Databricks de wijzigingenfeed efficiënt kan berekenen vanuit het transactielogboek.

Alle leesbewerkingen voor gegevensbestanden in de _change_data map moeten ondersteunde Delta Lake-API's doorlopen.

De bestanden in de _change_data map volgen het bewaarbeleid van de tabel. Wijzigingen in gegevensfeedgegevens worden verwijderd wanneer de VACUUM opdracht wordt uitgevoerd.

Kan ik de gegevensfeed wijzigen om de hele geschiedenis van een tabel opnieuw af te spelen?

Wijzigingenstroom is niet bedoeld als een permanente registratie van alle wijzigingen in een tabel. Wijzigingen in de gegevensfeed worden alleen vastgelegd nadat deze zijn ingeschakeld.

Met wijzigingen in de gegevensfeed en Delta Lake kunt u altijd een volledige momentopname van een brontabel reconstrueren. Dit betekent dat u een nieuwe streamingbewerking kunt starten op basis van een tabel waarvoor wijzigingen in de gegevensfeed zijn ingeschakeld en de huidige versie van die tabel en alle wijzigingen die zich daarna voordoen, kunt vastleggen.

U moet records in de wijzigingenfeed behandelen als tijdelijk en alleen toegankelijk voor een opgegeven bewaarvenster. Het Delta-transactielogboek verwijdert tabelversies en de bijbehorende wijzigingenfeedversies met regelmatige tussenpozen. Wanneer een versie uit het transactielogboek wordt verwijderd, kunt u de wijzigingengegevensfeed voor die versie niet meer lezen.

Als voor uw use-case een permanente geschiedenis van alle wijzigingen in een tabel moet worden bijgehouden, moet u incrementele logica gebruiken om records van de wijzigingengegevensfeed naar een nieuwe tabel te schrijven. In het volgende codevoorbeeld wordt het gebruik gedemonstreerd. trigger.AvailableNowHierbij wordt gebruikgemaakt van de incrementele verwerking van Structured Streaming, maar worden beschikbare gegevens verwerkt als een batchworkload. U kunt deze werklast asynchroon plannen met uw belangrijkste verwerkingspijplijnen om een back-up van de wijzigingsstroom te maken voor auditdoeleinden of volledige herafspeelbaarheid.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Beperkingen voor gegevensfeeds wijzigen voor tabellen waarvoor kolomtoewijzing is ingeschakeld

Als kolomtoewijzing is ingeschakeld voor een Delta-tabel, kunt u kolommen in de tabel verwijderen of de naam ervan wijzigen zonder dat u gegevensbestanden voor bestaande gegevens hoeft te herschrijven. Als kolomtoewijzing is ingeschakeld, heeft de wijzigingsgegevensstroom beperkingen na het uitvoeren van niet-additieve schemawijzigingen, zoals het hernoemen of verwijderen van een kolom, het wijzigen van het gegevenstype of veranderingen in nullwaardigheid.

Belangrijk

  • U kunt de wijzigingenfeed niet lezen voor een transactie of bereik waarin een niet-additieve schemawijziging plaatsvindt met behulp van batch-semantiek.
  • In Databricks Runtime 12.2 LTS en lager bieden tabellen met ingeschakelde kolomtoewijzing die niet-additieve schemawijzigingen hebben ondergaan geen ondersteuning voor streaming-leesbewerkingen op gegevensfeeds van wijzigingen. Zie Streaming met kolomtoewijzing en schemawijzigingen.
  • In Databricks Runtime 11.3 LTS en oudere versies kunt u de wijzigingsgegevensfeed niet lezen voor tabellen waarvoor kolomtoewijzing is ingeschakeld en waarvan de kolomnamen zijn gewijzigd of verwijderd.

In Databricks Runtime 12.2 LTS en hoger kunt u batchleesbewerkingen uitvoeren op wijzigingengegevensstroom voor tabellen waarvoor kolomtoewijzing is ingeschakeld en die schemawijzigingen zonder toevoegingen hebben ondergaan. In plaats van het schema van de nieuwste versie van de tabel te gebruiken, gebruiken leesbewerkingen het schema van de eindversie van de tabel die is opgegeven in de query. Query's mislukken nog steeds als het opgegeven versiebereik een niet-additieve schemawijziging omvat.