Delen via


De APPLY CHANGES-API's: Het vastleggen van wijzigingsgegevens vereenvoudigen met Delta Live Tables

Delta Live Tables vereenvoudigt het vastleggen van wijzigingen van gegevens (CDC) met de APPLY CHANGES- en APPLY CHANGES FROM SNAPSHOT-API's. De interface die u gebruikt, is afhankelijk van de bron van wijzigingsgegevens:

  • Gebruik APPLY CHANGES dit om wijzigingen te verwerken vanuit een CDF (Change Data Feed).
  • Gebruik APPLY CHANGES FROM SNAPSHOT (openbare preview) om wijzigingen in momentopnamen van databases te verwerken.

Voorheen werd de instructie vaak gebruikt voor het MERGE INTO verwerken van CDC-records in Azure Databricks. MERGE INTO Kan echter onjuiste resultaten produceren vanwege niet-opeenvolgende records of complexe logica vereist om records opnieuw te ordenen.

De APPLY CHANGES-API wordt ondersteund in de SQL- en Python-interfaces van Delta Live Tables. De APPLY CHANGES FROM SNAPSHOT-API wordt ondersteund in de Python-interface van Delta Live Tables.

Zowel APPLY CHANGES als APPLY CHANGES FROM SNAPSHOT ondersteunen het bijwerken van tabellen met SCD-type 1 en type 2:

  • Gebruik SCD-type 1 om records rechtstreeks bij te werken. Geschiedenis wordt niet bewaard voor bijgewerkte records.
  • Gebruik SCD-type 2 om een geschiedenis van records te bewaren, op alle updates of op updates voor een opgegeven set kolommen.

Zie voor syntaxis en andere verwijzingen:

Notitie

In dit artikel wordt beschreven hoe u tabellen in uw Delta Live Tables-pijplijn bijwerkt op basis van wijzigingen in brongegevens. Zie Delta Lake-wijzigingenfeed gebruiken in Azure Databricksvoor meer informatie over het vastleggen en opvragen van gegevens op rijniveau voor Delta-tabellen.

Vereisten

Als u de CDC-API's wilt gebruiken, moet uw pijplijn geconfigureerd zijn om gebruik te maken van serverloze DLT-pijplijnen of van Delta Live Tables Pro of Advanced-edities.

Hoe wordt CDC geïmplementeerd met de APPLY CHANGES API?

Door automatisch niet-op-volgorde records af te handelen, zorgt de APPLY CHANGES-API in Delta Live Tables ervoor dat CDC-records correct worden verwerkt en is het niet nodig om complexe logica te ontwikkelen voor het verwerken van niet-op-volgorde records. U moet een kolom opgeven in de brongegevens waarop records moeten worden gesequentieerd, wat Delta Live Tables interpreteert als een monotonische weergave van de juiste volgorde van de brongegevens. Delta Live Tables verwerkt automatisch gegevens die niet in orde zijn. Voor SCD-wijzigingen in type 2 worden in Delta Live-tabellen de juiste volgordewaarden doorgegeven aan de kolommen __START_AT en __END_AT van de doeltabel. Er moet één afzonderlijke update per sleutel zijn bij elke sequentiërende waarde en NULL-sequentiërende waarden worden niet ondersteund.

Als u CDC-verwerking wilt uitvoeren met APPLY CHANGES, maakt u eerst een streamingtabel en gebruikt u vervolgens de instructie APPLY CHANGES INTO in SQL of de apply_changes()-functie in Python om de bron, sleutels en sequentiëren voor de wijzigingenfeed op te geven. Als u de doelstreamingtabel wilt maken, gebruikt u de CREATE OR REFRESH STREAMING TABLE-instructie in SQL of de create_streaming_table()-functie in Python. Zie de scd-type 1 en type 2 verwerkingsvoorbeelden .

Zie de delta livetabellen SQL-verwijzing of Python-verwijzingvoor syntaxisdetails.

Hoe wordt CDC geïmplementeerd met de APPLY CHANGES FROM SNAPSHOT API?

Belangrijk

De APPLY CHANGES FROM SNAPSHOT API bevindt zich in openbare preview.

APPLY CHANGES FROM SNAPSHOT is een declaratieve API die efficiënt wijzigingen in brongegevens bepaalt door een reeks momentopnamen in volgorde te vergelijken en vervolgens de verwerking uit te voeren die nodig is voor CDC-verwerking van de records in de momentopnamen. APPLY CHANGES FROM SNAPSHOT wordt alleen ondersteund door de Python-interface van Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT ondersteunt het opnemen van momentopnamen van meerdere brontypen:

  • Gebruik periodieke opname van momentopnamen om momentopnamen op te nemen uit een bestaande tabel of weergave. APPLY CHANGES FROM SNAPSHOT heeft een eenvoudige, gestroomlijnde interface voor het periodiek opnemen van momentopnamen van een bestaand databaseobject. Er wordt een nieuwe momentopname opgenomen bij elke pijplijnupdate en de opnametijd wordt gebruikt als de momentopnameversie. Wanneer een pijplijn in continue modus wordt uitgevoerd, worden meerdere momentopnamen opgenomen bij elke pijplijnupdate, met een interval dat wordt bepaald door de triggerinterval instelling voor de stroom die de verwerking van WIJZIGINGEN TOEPASSEN VANUIT MOMENTOPNAME bevat.
  • Gebruik historische momentopnameopname voor het verwerken van bestanden met databasemomentopnamen, zoals momentopnamen die zijn gegenereerd op basis van een Oracle- of MySQL-database of een datawarehouse.

Als u CDC-verwerking wilt uitvoeren vanaf elk brontype met APPLY CHANGES FROM SNAPSHOT, maakt u eerst een streamingtabel en gebruikt u vervolgens de functie apply_changes_from_snapshot() in Python om de momentopname, sleutels en andere argumenten op te geven die nodig zijn om de verwerking te implementeren. Zie de voorbeelden van periodieke opname van momentopnamen en historische momentopnamen.

De momentopnamen die aan de API worden doorgegeven, moeten in oplopende volgorde zijn op basis van versie. Als Delta Live Tables een momentopname detecteert die niet op volgorde is, wordt er een fout gegenereerd.

Voor syntaxisdetails, zie de Delta Live Tables Python-referentie.

Beperkingen

De kolom die wordt gebruikt voor sequentiëren, moet een sorteerbaar gegevenstype zijn.

Voorbeeld: SCD-type 1 en SCD-type 2 verwerken met CDF-brongegevens

De volgende secties bevatten voorbeelden van SCD-type 1 van Delta Live Tables en type 2-query's waarmee doeltabellen worden bijgewerkt op basis van brongebeurtenissen uit een wijzigingengegevensfeed die:

  1. Hiermee maakt u nieuwe gebruikersrecords.
  2. Hiermee verwijdert u een gebruikersrecord.
  3. Hiermee worden gebruikersrecords bijgewerkt. In het SCD-type 1-voorbeeld komen de laatste UPDATE bewerkingen te laat en worden verwijderd uit de doeltabel, wat de verwerking van out-of-order gebeurtenissen demonstreert.

In de volgende voorbeelden wordt ervan uitgegaan dat u bekend bent met het configureren en bijwerken van Pijplijnen voor Delta Live Tables. Zie Handleiding: Voer uw eerste Delta Live Tables-pijplijn uit.

Als u deze voorbeelden wilt uitvoeren, moet u beginnen met het maken van een voorbeeldgegevensset. Zie Genereer testgegevens.

Hier volgen de invoerrecords voor deze voorbeelden:

userId naam plaats schakelapparatuur optimaliseren sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lelie Cancun INSERT 2
123 Nul Nul DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Als u de opmerking bij de laatste rij in de voorbeeldgegevens ongedaan maakt, wordt de volgende record ingevoegd die aangeeft waar records moeten worden afgekapt:

userId naam plaats schakelapparatuur optimaliseren sequenceNum
Nul Nul Nul TRUNCATE 3

Notitie

Alle volgende voorbeelden bevatten opties om zowel als DELETETRUNCATE bewerkingen op te geven, maar elk is optioneel.

SCD-updates van type 1 verwerken

In het volgende voorbeeld ziet u hoe SCD type 1-updates worden verwerkt:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Nadat het SCD-type 1-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:

userId naam plaats
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lelie Cancun

Nadat u het SCD-voorbeeldtype 1 hebt uitgevoerd met de extra TRUNCATE record, worden records 124 en 126 afgekapt vanwege de TRUNCATE-bewerking op sequenceNum=3en bevat de doeltabel de volgende record:

userId naam plaats
125 Mercedes Guadalajara

SCD-updates van type 2 verwerken

In het volgende voorbeeld ziet u hoe scd-type 2-updates worden verwerkt:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Nadat het SCD-type 2-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:

userId naam plaats __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 Nul
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 Nul
126 Lelie Cancun 2 Nul

Een SCD-query van het type 2 kan ook een subset van uitvoerkolommen opgeven die moeten worden bijgehouden voor de geschiedenis in de doeltabel. Wijzigingen in andere kolommen worden direct bijgewerkt in plaats van dat er nieuwe geschiedenissen worden aangemaakt. Het volgende voorbeeld laat zien hoe de kolom city van tracking kan worden uitgesloten:

In het volgende voorbeeld ziet u hoe u geschiedenis bijhouden gebruikt met SCD-type 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Nadat u dit voorbeeld hebt uitgevoerd zonder de extra TRUNCATE record, bevat de doeltabel de volgende records:

userId naam plaats __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 Nul
125 Mercedes Guadalajara 2 Nul
126 Lelie Cancun 2 Nul

Testgegevens genereren

De onderstaande code wordt gegeven om een voorbeeldgegevensset te genereren voor gebruik in de voorbeeldquery's die in deze zelfstudie aanwezig zijn. Ervan uitgaande dat u over de juiste referenties beschikt om een nieuw schema te maken en een nieuwe tabel te maken, kunt u deze instructies uitvoeren met een notebook of Databricks SQL. De volgende code is niet bedoeld om te worden uitgevoerd als onderdeel van een Delta Live Tables-pijplijn:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Voorbeeld: Periodieke verwerking van momentopnamen

In het volgende voorbeeld ziet u de SCD-verwerking van het type 2 waarmee momentopnamen van een tabel worden opgenomen die zijn opgeslagen op mycatalog.myschema.mytable. De resultaten van de verwerking worden geschreven naar een tabel met de naam target.

mycatalog.myschema.mytable records op de tijdstempel 2024-01-01 00:00:00

Sleutel Weergegeven als
1 a1
2 a2

mycatalog.myschema.mytable records op de tijdstempel 2024-01-01 12:00:00

Sleutel Weergegeven als
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Nadat de momentopnamen zijn verwerkt, bevat de doeltabel de volgende records:

Sleutel Weergegeven als __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 Nul
3 a3 2024-01-01 12:00:00 Nul

Voorbeeld: Verwerking van historische momentopnamen

In het volgende voorbeeld ziet u de SCD-verwerking van het type 2 waarmee een doeltabel wordt bijgewerkt op basis van brongebeurtenissen van twee momentopnamen die zijn opgeslagen in een cloudopslagsysteem:

Momentopname op timestamp, opgeslagen in /<PATH>/filename1.csv

Sleutel TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Momentopname op timestamp + 5, opgeslagen in /<PATH>/filename2.csv

Sleutel TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

In het volgende codevoorbeeld ziet u hoe scd-type 2-updates worden verwerkt met deze momentopnamen:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Nadat de momentopnamen zijn verwerkt, bevat de doeltabel de volgende records:

Sleutel TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 Nul
3 a3 b3 2 Nul
4 a4 b4_new 1 Nul

gegevens toevoegen, wijzigen of verwijderen in een doelstreamingtabel

Als uw pijplijn tabellen publiceert naar Unity Catalog, kunt u DML-instructies (Data Manipulat Language) gebruiken, waaronder instructies voor invoegen, bijwerken, verwijderen en samenvoegen, om de doelstreamingtabellen te wijzigen die zijn gemaakt door APPLY CHANGES INTO instructies.

Notitie

  • DML-instructies die het tabelschema van een streamingtabel wijzigen, worden niet ondersteund. Zorg ervoor dat uw DML-instructies niet proberen het tabelschema te ontwikkelen.
  • DML-instructies die een streamingtabel bijwerken, kunnen alleen worden uitgevoerd in een gedeeld Unity Catalog-cluster of een SQL-warehouse met Databricks Runtime 13.3 LTS en hoger.
  • Omdat voor streaming alleen toevoeggegevensbronnen zijn vereist, stelt u de vlag SkipChangeCommits in bij het lezen van de bronstreamingtabel als voor de verwerking streamingtabel met wijzigingen (bijvoorbeeld door DML-instructies) moet worden gestreamd. Wanneer skipChangeCommits is ingesteld, worden transacties die records in de brontabel verwijderen of wijzigen genegeerd. Als voor uw verwerking geen streamingtabel is vereist, kunt u een gerealiseerde weergave (die niet beschikt over de beperking voor alleen toevoegen) gebruiken als doeltabel.

Omdat Delta Live Tables gebruikmaakt van een opgegeven SEQUENCE BY kolom en de juiste opeenvolgende waarden doorgeeft aan de __START_AT en __END_AT kolommen van de doeltabel (voor SCD-type 2), moet u ervoor zorgen dat DML-instructies geldige waarden voor deze kolommen gebruiken om de juiste volgorde van records te behouden. Zie Hoe wordt CDC geïmplementeerd met de APPLY CHANGES API?

Zie Gegevens toevoegen, wijzigen of verwijderen in een streamingtabelvoor meer informatie over het gebruik van DML-instructies met streamingtabellen.

In het volgende voorbeeld wordt een actieve record ingevoegd met een beginvolgorde van 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Een wijzigingsgegevensstroom lezen uit een APPLY CHANGES doeltabel

In Databricks Runtime 15.2 en hoger kunt u een gegevensfeed voor wijzigingen lezen uit een streamingtabel die het doel is van APPLY CHANGES of APPLY CHANGES FROM SNAPSHOT query's op dezelfde manier als u een wijzigingsgegevensfeed van andere Delta-tabellen leest. Het volgende is vereist om de wijzigingengegevensfeed te lezen uit een doelstreamingtabel:

  • De doelstreamingtabel moet worden gepubliceerd naar Unity Catalog. Zie Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen.
  • Als u de wijzigingengegevensfeed wilt lezen uit de doelstreamingtabel, moet u Databricks Runtime 15.2 of hoger gebruiken. Als u de wijzigingengegevensfeed in een andere Delta Live Tables-pijplijn wilt lezen, moet de pijplijn zijn geconfigureerd voor het gebruik van Databricks Runtime 15.2 of hoger.

U leest de wijzigingengegevensfeed uit een doelstreamingtabel die is gemaakt in een Delta Live Tables-pijplijn op dezelfde manier als het lezen van een wijzigingengegevensfeed uit andere Delta-tabellen. Zie Delta Lake-wijzigingsgegevensfeeds gebruiken in Azure Databricks voor meer informatie over het gebruik van de delta-functionaliteit voor wijzigingenfeeds, waaronder voorbeelden in Python en SQL.

Notitie

De record voor de wijzigingengegevensfeed bevat metagegevens waarmee het type wijzigingsevenement wordt geïdentificeerd. Wanneer een record wordt bijgewerkt in een tabel, bevatten de metagegevens voor de gekoppelde wijzigingsrecords doorgaans _change_type waarden die zijn ingesteld op update_preimage en update_postimage gebeurtenissen.

De _change_type-waarden verschillen echter als er updates in de doelstreamingtabel worden aangebracht, inclusief het wijzigen van primaire sleutelwaarden. Wanneer wijzigingen updates voor primaire sleutels bevatten, worden de _change_type metagegevensvelden ingesteld op insert en delete gebeurtenissen. Wijzigingen in primaire sleutels kunnen optreden wanneer handmatige updates worden aangebracht in een van de sleutelvelden met een UPDATE- of MERGE-instructie of, voor SCD-type 2-tabellen, wanneer het __start_at veld wordt aangepast aan een eerdere beginvolgordewaarde.

De APPLY CHANGES-query bepaalt de primaire-sleutelwaarden, die verschillen voor de verwerking van SCD-type 1 en SCD-type 2:

  • Voor SCD-verwerking van type 1 en de Python-interface van Delta Live Tables is de primaire sleutel de waarde van de parameter keys in de apply_changes()-functie. Voor de SQL-interface van Delta Live Tables is de primaire sleutel de kolommen die zijn gedefinieerd door de KEYS-component in de APPLY CHANGES INTO-instructie.
  • Voor SCD-type 2 is de primaire sleutel de keys parameter of KEYS component plus de retourwaarde van de coalesce(__START_AT, __END_AT) bewerking, waarbij __START_AT en __END_AT de bijbehorende kolommen uit de doelstreamingtabel zijn.

Gegevens ophalen over records die worden verwerkt door een CDC-query voor Delta Live Tables

Notitie

De volgende metrische gegevens worden alleen vastgelegd door APPLY CHANGES query's en niet door APPLY CHANGES FROM SNAPSHOT query's.

De volgende metrische gegevens worden vastgelegd door APPLY CHANGES query's:

  • num_upserted_rows: het aantal uitvoerrijen dat tijdens een update in de gegevensset wordt opgeslagen.
  • num_deleted_rows: het aantal bestaande uitvoerrijen dat tijdens een update uit de gegevensset is verwijderd.

De num_output_rows metrische gegevens, uitvoer voor niet-CDC-stromen, worden niet vastgelegd voor apply changes query's.

Welke gegevensobjecten worden gebruikt voor CDC-verwerking van Delta Live Tables?

Notitie

  • Deze gegevensstructuren zijn alleen van toepassing op APPLY CHANGES verwerking, niet op APPLY CHANGES FROM SNAPSHOT verwerking.
  • Deze gegevensstructuren zijn alleen van toepassing wanneer de doeltabel wordt gepubliceerd naar de Hive-metastore. Als een pijplijn naar Unity Catalog publiceert, zijn de interne backingtabellen niet toegankelijk voor gebruikers.

Wanneer u de doeltabel in de Hive-metastore declareert, worden er twee gegevensstructuren gemaakt.

  • Een weergave met de naam die is toegewezen aan de doeltabel.
  • Een interne back-uptabel die wordt gebruikt door Delta Live Tables voor het beheren van CDC-verwerking. Deze tabel krijgt een naam door __apply_changes_storage_ vooraf te laten gaan aan de naam van de doeltabel.

Als u bijvoorbeeld een doeltabel met de naam dlt_cdc_targetdeclareert, ziet u een weergave met de naam dlt_cdc_target en een tabel met de naam __apply_changes_storage_dlt_cdc_target in de metastore. Door een weergave te maken, kunnen Delta Live Tables de extra informatie (bijvoorbeeld tombstones en versies) filteren die nodig is voor het verwerken van out-of-ordergegevens. Als u de verwerkte gegevens wilt weergeven, voert u een query uit op de doelweergave. Omdat het schema van de __apply_changes_storage_-tabel kan veranderen om toekomstige functies of verbeteringen te ondersteunen, moet u geen query's uitvoeren op de tabel voor productiegebruik. Als u gegevens handmatig aan de tabel toevoegt, wordt ervan uitgegaan dat de records vóór andere wijzigingen komen omdat de versiekolommen ontbreken.