Delta table streaming-lees- en schrijfbewerkingen
Delta Lake is diep geïntegreerd met Spark Structured Streaming via readStream
en writeStream
. Delta Lake overwint veel van de beperkingen die doorgaans worden geassocieerd met streamingsystemen en bestanden, waaronder:
- Het samenvoegen van kleine bestanden die worden geproduceerd door opname met lage latentie.
- Het 'exactly-once' verwerken met meer dan één stroom (of gelijktijdige batchtaken).
- Efficiënt detecteren welke bestanden nieuw zijn bij het gebruik van bestanden als bron voor een stream.
Notitie
In dit artikel wordt beschreven hoe u Delta Lake tables gebruikt als streamingbronnen en sinks. Zie Gegevens laden met behulp van streaming tables in Databricks SQL voor meer informatie over het laden van gegevens met behulp van streaming-tables in Databricks SQL.
Zie Stream-static joins voor meer informatie over stream-statische joins met Delta Lake.
Delta-table als bron
Structured Streaming leest delta-tablesincrementeel. Hoewel een streamingquery actief is voor een Delta-table, worden nieuwe records idempotent verwerkt als nieuwe table versies aan de bron-tableworden toegewijd.
In de volgende codevoorbeelden ziet u hoe u een streaming-leesbewerking configureert met behulp van de table naam of bestandspad.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Belangrijk
Als de schema voor een Delta-table wordt gewijzigd nadat een streaming-leesbewerking begint met de table, mislukt de query. Voor de meeste schema wijzigingen kunt u de stream opnieuw starten om schema mismatch op te lossen en om verder te gaan met verwerken.
In Databricks Runtime 12.2 LTS en hieronder kun je niet streamen vanuit een Delta-table met ingeschakelde column-mapping die een niet-additieve schema-evolutie heeft ondergaan, zoals het hernoemen of verwijderen van een columns. Zie Streaming met column toewijzing en schema wijzigingenvoor meer informatie.
Limit invoersnelheid
De volgende opties zijn beschikbaar voor het beheren van microbatches:
-
maxFilesPerTrigger
: hoeveel nieuwe bestanden in elke microbatch moeten worden overwogen. De standaardwaarde is 1000. -
maxBytesPerTrigger
: Hoeveel gegevens worden verwerkt in elke microbatch. Met deze optie wordt een 'voorlopig maximum' ingesteld, wat betekent dat een batch ongeveer deze hoeveelheid gegevens verwerkt en meer dan de limit kan verwerken om de streamingquery vooruit te laten gaan in gevallen waarin de kleinste invoereenheid groter is dan deze limit. Dit is niet standaard set.
Als u maxBytesPerTrigger
gebruikt in combinatie met maxFilesPerTrigger
, verwerkt de microbatch gegevens totdat de maxFilesPerTrigger
of maxBytesPerTrigger
limit is bereikt.
Notitie
In gevallen waarin de bron-table transacties worden opgeschoond vanwege de logRetentionDuration
configuratie en de streamingquery probeert deze versies te verwerken, kan de query standaard geen gegevensverlies voorkomen. U kunt de optie set naar failOnDataLoss
gebruiken om verloren gegevens false
te negeren en door te gaan met verwerken.
Een CDC-feed (Delta Lake Change Data Capture) streamen
Delta Lake wijzigingen in de gegevensfeed registreert wijzigingen in een Delta-table, inclusief updates en verwijderingen. Wanneer deze functie is ingeschakeld, kunt u streamen vanuit een wijzigingengegevensfeed en logica schrijven om invoegingen, updates en verwijderingen te verwerken in downstream-tables. Hoewel de uitvoer van wijzigingsgegevensstromen enigszins verschilt van de Delta-table die het beschrijft, biedt dit een oplossing voor het verspreiden van incrementele wijzigingen naar downstream-tables in een medaillonarchitectuur.
Belangrijk
In Databricks Runtime 12.2 LTS en lager kunt u niet streamen vanuit de change data feed voor een Delta-table waarvoor column-toewijzing is ingeschakeld en die niet-additieve schema-evolutie heeft ondergaan, zoals het hernoemen of verwijderen van columns. Zie Streaming met column toewijzing en schema wijzigingen.
Updates en verwijderingen negeren
Structured Streaming verwerkt geen invoer die geen toevoeging is en geeft een uitzondering als er wijzigingen optreden in de table die als bron wordt gebruikt. Er zijn twee belangrijke strategieën voor het verwerken van wijzigingen die niet automatisch downstream kunnen worden doorgegeven:
- U kunt de uitvoer en het controlepunt verwijderen en de stream opnieuw starten vanaf het begin.
- U kunt een van de volgende twee opties set:
-
ignoreDeletes
: transacties negeren die gegevens op partition grenzen verwijderen. -
skipChangeCommits
: transacties negeren die bestaande records verwijderen of wijzigen.skipChangeCommits
ondergaatignoreDeletes
.
-
Notitie
In Databricks Runtime 12.2 LTS en hoger skipChangeCommits
wordt de vorige instelling ignoreChanges
afgeschaft. In Databricks Runtime 11.3 LTS en lager ignoreChanges
is dit de enige ondersteunde optie.
De semantiek voor ignoreChanges
verschilt sterk van skipChangeCommits
. Als ignoreChanges
ingeschakeld, worden herschreven gegevensbestanden in de bron-table opnieuw verzonden na een bewerking voor het wijzigen van gegevens, zoals UPDATE
, MERGE INTO
, DELETE
(binnen partities) of OVERWRITE
. Ongewijzigde rijen worden vaak samen met nieuwe rijen verzonden, dus downstreamgebruikers moeten dubbele waarden kunnen verwerken. Verwijderingen worden niet downstream doorgegeven.
ignoreChanges
ondergaat ignoreDeletes
.
skipChangeCommits
negeert bewerkingen voor het wijzigen van bestanden volledig. Gegevensbestanden die in de bron table worden herschreven vanwege een bewerking voor het wijzigen van gegevens, zoals UPDATE
, MERGE INTO
, DELETE
en OVERWRITE
worden volledig genegeerd. Als u wijzigingen in upstream-bron tableswilt weergeven, moet u afzonderlijke logica implementeren om deze wijzigingen door te geven.
Workloads die zijn geconfigureerd met ignoreChanges
continue werking met behulp van bekende semantiek, maar Databricks raadt het gebruik skipChangeCommits
aan voor alle nieuwe workloads. Voor het migreren van workloads die worden gebruikt ignoreChanges
om herstructureringslogica te skipChangeCommits
vereisen.
Opmerking
Stel dat u een tableuser_events
hebt met date
, user_email
en action
columns die door date
is gepartitioneerd. U streamt uit de user_events
table en u moet er gegevens uit verwijderen vanwege de AVG.
Wanneer u bij partition grenzen verwijdert (dat wil gezegd, de WHERE
zich op een partitioncolumnbevindt), worden de bestanden al gesegmenteerd op waarde, zodat de verwijdering deze bestanden alleen uit de metagegevens verwijdert. Wanneer u een volledige partition met gegevens verwijdert, kunt u het volgende gebruiken:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Als u gegevens in meerdere partities verwijdert (in dit voorbeeld filteren op user_email
), gebruikt u de volgende syntaxis:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Als u een user_email
update met de instructie UPDATE
, wordt het bestand met de user_email
in kwestie herschreven. Gebruik skipChangeCommits
dit om de gewijzigde gegevensbestanden te negeren.
Beginpositie opgeven
U kunt de volgende opties gebruiken om het beginpunt van de Delta Lake-streamingbron op te geven zonder de volledige tablete verwerken.
startingVersion
: De Delta Lake-versie om van te beginnen. Databricks raadt aan deze optie voor de meeste workloads weg te laten. Indien niet set, begint de stream bij de meest recente beschikbare versie, inclusief een volledige momentopname van de table op dat moment.Indien opgegeven, leest de stream alle wijzigingen in de Delta-table vanaf de opgegeven versie (inclusief). Als de opgegeven versie niet meer beschikbaar is, kan de stream niet worden gestart. U kunt de doorvoerversies verkrijgen uit de
version
column van de DESCRIBE HISTORY opdrachtuitvoer.Als u alleen de meest recente wijzigingen wilt retourneren, geeft u op
latest
.startingTimestamp
: het tijdstempel waaruit moet worden gestart. Alle table wijzigingen die zijn doorgevoerd op of na de tijdstempel (inclusief de tijdstempel zelf) worden gelezen door de streamlezer. Als de opgegeven tijdstempel voorafgaat aan alle table doorvoeringen, begint de streaming-leesbewerking met de vroegste beschikbare tijdstempel. Een van de volgende:- Een tijdstempeltekenreeks. Bijvoorbeeld:
"2019-01-01T00:00:00.000Z"
. - Een datumtekenreeks. Bijvoorbeeld:
"2019-01-01"
.
- Een tijdstempeltekenreeks. Bijvoorbeeld:
U kunt beide opties niet tegelijkertijd uitvoeren (set). Ze worden alleen van kracht wanneer een nieuwe streamingquery wordt gestart. Als een streamingquery is gestart en de voortgang is vastgelegd in het controlepunt, worden deze opties genegeerd.
Belangrijk
Hoewel u de streamingbron vanuit een opgegeven versie of tijdstempel kunt starten, is de schema van de streamingbron altijd de meest recente schema van de Delta-table. U moet ervoor zorgen dat er na de opgegeven versie of tijdstempel geen incompatibele schema-wijziging voor de Delta table is. Anders kan de streamingbron onjuiste resultaten retourneren bij het lezen van de gegevens met een onjuiste schema.
Opmerking
Stel dat u een tableuser_events
hebt. Als u wijzigingen wilt lezen sinds versie 5, gebruikt u:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Als u wijzigingen wilt lezen sinds 2018-10-18, gebruikt u:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Eerste momentopname verwerken zonder dat gegevens worden verwijderd
Notitie
Deze functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger. Deze functie is beschikbaar als openbare preview.
Wanneer u een Delta-table als een stroombron gebruikt, verwerkt de query eerst alle gegevens die aanwezig zijn in de table. Delta table in deze versie wordt de eerste momentopname genoemd. Standaard worden de gegevensbestanden van delta tableverwerkt op basis van welk bestand het laatst is gewijzigd. De laatste wijzigingstijd vertegenwoordigt echter niet noodzakelijkerwijs de volgorde van de gebeurtenistijd van de record.
In een stateful streaming-query met een gedefinieerde watermarkkan het verwerken van bestanden op basis van wijzigingstijd ertoe leiden dat records in de verkeerde volgorde worden verwerkt. Dit kan ertoe leiden dat records worden verwijderd als latere gebeurtenissen door de watermark.
U kunt het probleem met gegevensuitval voorkomen door de volgende optie in te schakelen:
- withEventTimeOrder: Hiermee wordt aangegeven of de eerste momentopname moet worden verwerkt met gebeurtenistijdvolgorde.
Als gebeurtenistijdvolgorde is ingeschakeld, wordt het tijdsbereik van de eerste momentopnamegegevens onderverdeeld in tijdbuckets. Elke microbatch verwerkt een bucket door gegevens binnen het tijdsbereik te filteren. De configuratieopties maxFilesPerTrigger en maxBytesPerTrigger zijn nog steeds van toepassing om de microbatchgrootte te beheren, maar alleen op een geschatte manier vanwege de aard van de verwerking.
In de onderstaande afbeelding ziet u dit proces:
Belangrijke informatie over deze functie:
- Het probleem met het verwijderen van gegevens treedt alleen op wanneer de eerste Delta-momentopname van een stateful streaming-query wordt verwerkt in de standaardvolgorde.
- U kunt niet wijzigen
withEventTimeOrder
zodra de streamquery is gestart terwijl de eerste momentopname nog steeds wordt verwerkt. Als u opnieuw wilt opstarten metwithEventTimeOrder
gewijzigd, moet u het controlepunt verwijderen. - Als u een streamquery uitvoert waarvoorEventTimeOrder is ingeschakeld, kunt u deze niet downgraden naar een DBR-versie die deze functie pas ondersteunt als de eerste momentopnameverwerking is voltooid. Als u een downgrade wilt uitvoeren, kunt u wachten tot de eerste momentopname is voltooid of het controlepunt verwijderen en de query opnieuw starten.
- Deze functie wordt niet ondersteund in de volgende ongebruikelijke scenario's:
- De gebeurtenistijd column is een gegenereerde column en er zijn niet-projectietransformaties tussen de Delta-bron en watermark.
- Er is een watermark met meer dan één Delta-bron in de stroomquery.
- Als gebeurtenistijdvolgorde is ingeschakeld, kunnen de prestaties van de initiële verwerking van de Delta-momentopname langzamer zijn.
- Elke microbatch scant de eerste momentopname om gegevens binnen het bijbehorende tijdsbereik van de gebeurtenis te filteren. Voor een snellere filteractie is het raadzaam om een Delta-bron te gebruiken column als gebeurtenistijd, zodat gegevens kunnen worden overgeslagen (controleer Gegevens overslaan op Delta Lake wanneer deze van toepassing is). Daarnaast kan table partitionering langs de gebeurtenistijd column de verwerking verder versnellen. U kunt de Spark-gebruikersinterface controleren om te zien hoeveel Delta-bestanden worden gescand op een specifieke microbatch.
Opmerking
Stel dat u een tableuser_events
hebt met een event_time
column. Uw streamingquery is een aggregatiequery. Als u er zeker van wilt zijn dat er geen gegevens worden wegvallen tijdens de eerste momentopnameverwerking, kunt u het volgende gebruiken:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Notitie
U kunt dit ook inschakelen met Spark-configuratie op het cluster dat van toepassing is op alle streamingquery's: spark.databricks.delta.withEventTimeOrder.enabled true
Delta-table als opvang
U kunt ook gegevens naar een Delta-table schrijven met behulp van Structured Streaming. Met het transactielogboek kan Delta Lake exact één keer verwerken, zelfs wanneer er andere streams of batchquery's gelijktijdig worden uitgevoerd op de table.
Notitie
De functie Delta Lake VACUUM
verwijdert alle bestanden die niet worden beheerd door Delta Lake, maar slaat alle mappen over die beginnen met _
. U kunt controlepunten veilig opslaan naast andere gegevens en metagegevens voor een Delta-table met behulp van een mapstructuur zoals <table-name>/_checkpoints
.
Metrische gegevens
U vindt het aantal bytes en het aantal bestanden dat nog moet worden verwerkt in een streamingqueryproces als de numBytesOutstanding
metrische numFilesOutstanding
gegevens. Aanvullende metrische gegevens zijn onder andere:
-
numNewListedFiles
: Het aantal Delta Lake-bestanden dat is vermeld om de achterstand voor deze batch te berekenen.-
backlogEndOffset
: de table versie die wordt gebruikt om de achterstand te berekenen.
-
Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Toevoegmodus
Streams worden standaard uitgevoerd in de toevoegmodus, waarmee nieuwe records worden toegevoegd aan de table.
Gebruik de methode toTable
bij het streamen naar tables, zoals in het volgende voorbeeld:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Volledige modus
U kunt ook Structured Streaming gebruiken om de gehele table te vervangen met elke batch. Een voorbeeld van een use case is het berekenen van een samenvatting met behulp van aggregatie:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
In het voorgaande voorbeeld wordt continu een table bijgewerkt die het geaggregeerde aantal gebeurtenissen per klant bevat.
Voor toepassingen met meer soepele latentievereisten kunt u rekenresources besparen met eenmalige triggers. Gebruik deze om samenvattingsaggregatie te updatetables volgens een bepaald schema, waarbij alleen nieuwe gegevens worden verwerkt die sinds de laatste updatezijn aangekomen.
Upsert van streamingquery's met behulp van foreachBatch
U kunt een combinatie van merge
en foreachBatch
gebruiken om complexe upserts van een streamingquery naar een Delta-tablete schrijven. Zie foreachBatch gebruiken om naar willekeurige gegevens-sinks te schrijven.
Dit patroon heeft veel toepassingen, waaronder de volgende:
- schrijf streaming-aggregaten in modus Update: Dit is veel efficiënter dan de Complete Mode.
-
Een stroom databasewijzigingen schrijven naar een Delta table: de merge-query voor het schrijven van veranderingsgegevens kan in
foreachBatch
worden gebruikt om continu een stroom wijzigingen toe te passen op een Delta table. -
Een gegevensstroom naar Delta table schrijven met ontdubbeling: de insert-only merge-query voor ontdubbeling kan in
foreachBatch
worden gebruikt om continu gegevens (met duplicaten) naar een Delta-table te schrijven met automatische ontdubbeling.
Notitie
- Zorg ervoor dat uw
merge
instructieforeachBatch
idempotent is als het opnieuw opstarten van de streamingquery de bewerking meerdere keren op dezelfde batch met gegevens kan toepassen. - Wanneer
merge
deze wordt gebruiktforeachBatch
, kan de invoergegevenssnelheid van de streamingquery (gerapporteerd viaStreamingQueryProgress
en zichtbaar in de grafiek van de notebooksnelheid) worden gerapporteerd als een veelvoud van de werkelijke snelheid waarmee gegevens worden gegenereerd bij de bron. Dit komt doordatmerge
de invoergegevens meerdere keren leest, waardoor de metrische invoergegevens worden vermenigvuldigd. Als dit een knelpunt is, kunt u de batch DataFrame vóórmerge
in de cache opslaan en de cache vervolgens namerge
uit de cache halen.
In het volgende voorbeeld ziet u hoe u SQL kunt gebruiken om foreachBatch
deze taak uit te voeren:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
U kunt er ook voor kiezen om de Delta Lake-API's te gebruiken om streaming-upserts uit te voeren, zoals in het volgende voorbeeld:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Idempotent table schrijft in foreachBatch
Notitie
Databricks raadt u aan een afzonderlijke streaming-schrijfbewerking te configureren voor elke sink die u wilt update. Het gebruik van foreachBatch
om te schrijven naar meerdere tables serialiseert schrijfbewerkingen, waardoor parallelle uitvoering wordt verminderd en de algehele latentie toeneemt.
Delta tables ondersteunt de volgende DataFrameWriter
opties om schrijfbewerkingen naar meerdere tables te maken binnen foreachBatch
idempotent:
-
txnAppId
: Een unieke tekenreeks die u kunt doorgeven aan elke DataFrame-schrijfbewerking. U kunt bijvoorbeeld de StreamingQuery-id gebruiken alstxnAppId
. -
txnVersion
: Een monotonisch toenemend getal dat fungeert als transactieversie.
Delta Lake gebruikt de combinatie van txnAppId
en txnVersion
om dubbele schrijfbewerkingen te identificeren en te negeren.
Als een batch-schrijfbewerking wordt onderbroken door een fout, gebruikt het opnieuw uitvoeren van de batch dezelfde toepassing en batch-id om de runtime te helpen dubbele schrijfbewerkingen correct te identificeren en te negeren. Toepassings-id (txnAppId
) kan elke door de gebruiker gegenereerde unieke tekenreeks zijn en hoeft niet gerelateerd te zijn aan de stream-id. Zie foreachBatch gebruiken om naar willekeurige gegevens-sinks te schrijven.
Waarschuwing
Als u het streamingcontrolepunt verwijdert en de query opnieuw start met een nieuw controlepunt, moet u een ander txnAppId
item opgeven. Nieuwe controlepunten beginnen met een batch-id van 0
. Delta Lake maakt gebruik van de batch-id en txnAppId
als een unieke sleutel en slaat batches met reeds geziene valuesover.
In het volgende codevoorbeeld ziet u dit patroon:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}