Freigeben über


Weitere Informationen finden Sie unter APPLY CHANGES-API: Vereinfachtes CDC (Change Data Capture) in Delta Live Tables

Delta Live Tables vereinfacht die Change Data Capture (CDC) mit den APPLY CHANGES und APPLY CHANGES FROM SNAPSHOT-APIs. Die verwendete Schnittstelle hängt von der Quelle der Änderungsdaten ab:

  • Verwenden Sie APPLY CHANGES, um Änderungen aus einem Änderungsdatenfeed (CDF) zu verarbeiten.
  • Verwenden Sie APPLY CHANGES FROM SNAPSHOT (öffentliche Vorschau) zum Verarbeiten von Änderungen in Datenbankmomentaufnahmen.

Zuvor wurde die MERGE INTO-Anweisung üblicherweise für die Verarbeitung von CDC-Datensätzen auf Azure Databricks verwendet. MERGE INTO kann jedoch falsche Ergebnisse aufgrund von Out-of-Sequence-Datensätzen erzeugen oder komplexe Logik zum erneuten Anordnen von Datensätzen erfordern.

Die APPLY CHANGES-API wird in den SQL- und Python-Schnittstellen Delta Live Tables unterstützt. Die APPLY CHANGES FROM SNAPSHOT-API wird in der Python-Schnittstelle Delta Live Tables unterstützt.

Sowohl APPLY CHANGES als auch APPLY CHANGES FROM SNAPSHOT unterstützen das Aktualisieren von Tabellen mit SCD-Typ 1 und Typ 2:

  • Verwenden Sie SCD-Typ 1, um Datensätze direkt zu aktualisieren. Der Verlauf für aktualisierte Datensätze wird nicht aufbewahrt.
  • Verwenden Sie SCD-Typ 2, um einen Verlauf von Datensätzen beizubehalten, entweder für alle Updates, oder für Updates für einen angegebenen Satz von Spalten.

Syntax und andere Verweise finden Sie unter:

Hinweis

In diesem Artikel wird beschrieben, wie Tabellen in Ihrer Delta Live Tables-Pipeline basierend auf Änderungen in Quelldaten aktualisiert werden. Informationen zum Aufzeichnen und Abfragen von Änderungsinformationen auf Zeilenebene für Delta-Tabellen finden Sie unter Verwenden Sie den Delta Lake-Änderungs-Datenfeed in Azure Databricks.

Anforderungen

Um die CDC-APIs zu verwenden, muss Ihre Pipeline so konfiguriert sein, dass serverlose DLT-Pipelines oder die Delta Live Tables Pro- oder Advanced-Editionen verwendet werden.

Wie wird CDC mit der APPLY CHANGES-API implementiert?

Durch die automatische Verarbeitung von Out-of-Sequence-Datensätzen stellt die APPLY CHANGES-API in Delta-Live Tables die korrekte Verarbeitung von CDC-Datensätzen sicher und entfernt die Notwendigkeit, komplexe Logik für die Behandlung von Out-of-Sequence-Datensätzen zu entwickeln. Sie müssen eine Spalte in den Quelldaten angeben, für die Datensätze sequenziert werden sollen, die Delta Live Tables als monoton zunehmende Darstellung der richtigen Reihenfolge der Quelldaten interpretiert. Delta Live Tables verarbeitet automatisch Daten, die außerhalb der Reihenfolge ankommen. Für SCD-Typ 2-Änderungen verteilt Delta Live Tables die entsprechenden Sequenzierungswerte an dien Spalten __START_AT und __END_AT der Zieltabelle. Für jeden Sequenzierungswert sollte ein eindeutiges Update pro Schlüssel vorhanden sein, und NULL-Sequenzierungswerte werden nicht unterstützt.

Zum Ausführen der CDC-Verarbeitung mit APPLY CHANGES erstellen Sie zuerst eine Streamingtabelle und verwenden dann die APPLY CHANGES INTO-Anweisung in SQL oder die apply_changes()-Funktion in Python, um die Quelle, Schlüssel und Sequenzierung für den Änderungsfeed anzugeben. Verwenden Sie zum Erstellen der Zielstreamingtabelle die CREATE OR REFRESH STREAMING TABLE -Anweisung in SQL oder die create_streaming_table()-Funktion in Python. Weitere Informationen finden Sie in den Beispielen SCD Typ 1 und Typ 2 für die Verarbeitung.

Ausführliche Informationen zur Syntax finden Sie in der Delta Live Tables SQL-Referenz oder Python-Referenz.

Wie wird CDC mit der APPLY CHANGES FROM SNAPSHOT-API implementiert?

Wichtig

Die APPLY CHANGES FROM SNAPSHOT-API befindet sich in der öffentlichen Vorschau.

APPLY CHANGES FROM SNAPSHOT ist eine deklarative API, mit der Änderungen an Quelldaten effizient ermittelt werden, indem eine Reihe von Momentaufnahmen in Reihenfolge verglichen und dann die für die CDC-Verarbeitung der Datensätze in den Momentaufnahmen erforderliche Verarbeitung ausgeführt wird. APPLY CHANGES FROM SNAPSHOT wird nur von der Python-Schnittstelle Delta Live Tables unterstützt.

APPLY CHANGES FROM SNAPSHOT unterstützt das Aufnehmen von Momentaufnahmen aus mehreren Quelltypen:

  • Verwenden Sie die regelmäßige Erfassung von Momentaufnahmen, um Momentaufnahmen aus einer vorhandenen Tabelle oder Ansicht aufzunehmen. APPLY CHANGES FROM SNAPSHOT verfügt über eine einfache, optimierte Schnittstelle, die das regelmäßige Aufnehmen von Momentaufnahmen aus einem vorhandenen Datenbankobjekt unterstützt. Eine neue Momentaufnahme wird mit jedem Pipelineupdate aufgenommen, und die Aufnahmezeit wird als Momentaufnahmeversion verwendet. Wenn eine Pipeline im fortlaufenden Modus ausgeführt wird, werden mehrere Momentaufnahmen mit jeder Pipelineaktualisierung in einem Zeitraum aufgenommen, der durch die Triggerintervall-Einstellung für den Fluss bestimmt wird, der die APPLY CHANGES FROM SNAPSHOT-Verarbeitung enthält.
  • Verwenden Sie historische Momentaufnahmen zum Verarbeiten von Dateien, die Datenbankmomentaufnahmen enthalten, z. B. Momentaufnahmen, die aus einer Oracle- oder MySQL-Datenbank oder einem Data Warehouse generiert wurden.

Zum Ausführen der CDC-Verarbeitung von einem beliebigen Quelltyp mit APPLY CHANGES FROM SNAPSHOTerstellen Sie zuerst eine Streamingtabelle und verwenden dann die apply_changes_from_snapshot()-Funktion in Python, um die Momentaufnahme, Schlüssel und andere Argumente anzugeben, die zum Implementieren der Verarbeitung erforderlich sind. Sehen Sie sich die Beispiele für die regelmäßige Aufnahme von Momentaufnahmen und historische Aufnahme von Momentaufnahmen an.

Die Momentaufnahmen, die an die API übergeben werden, müssen in aufsteigender Reihenfolge nach Version sein. Wenn Delta Live Tables eine Momentaufnahme aus der Reihenfolge erkennt, wird ein Fehler ausgelöst.

Ausführliche Informationen zur Syntax finden Sie in der Python-Referenz zu Delta Live Tables.

Begrenzungen

Die für die Sequenzierung verwendete Spalte muss ein sortierbarer Datentyp sein.

Beispiel: SCD-Typ 1- und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten

Die folgenden Abschnitte enthalten Beispiele für Delta Live Tables SCD-Typ 1 und Typ 2-Abfragen, die Zieltabellen basierend auf Quellereignissen aus einem Änderungsdatenfeed aktualisieren:

  1. Erstellt neue Benutzerdatensätze.
  2. Löscht einen Benutzerdatensatz.
  3. Aktualisiert Benutzerdatensätze. In dem Beispiel vom SCD-Typ 1 kommen die letzten UPDATE-Vorgänge zu spät an und werden aus der Zieltabelle gelöscht, um die Behandlung von Ereignissen in nicht ordnungsgemäßer Reihenfolge zu veranschaulichen.

In den folgenden Beispielen wird davon ausgegangen, dass Sie mit der Konfiguration und Aktualisierung von Delta Live Tables-Pipelines vertraut sind. Siehe Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline.

Zum Ausführen dieser Beispiele müssen Sie zunächst ein Beispiel-Dataset erstellen. Siehe Generieren von Testdaten.

Im Folgenden sind die Eingabedatensätze für diese Beispiele aufgeführt:

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null NULL DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Wenn Sie die letzte Zeile in den Beispieldaten auskommentieren, wird der folgende Datensatz eingefügt, der angibt, wo die Datensätze abgeschnitten werden sollen:

userId name city operation sequenceNum
null NULL null TRUNCATE 3

Hinweis

Alle folgenden Beispiele enthalten Optionen zum Angeben von DELETE- und TRUNCATE-Vorgängen, die jedoch optional sind.

Aktualisierungen des SCD-Typs 1 verarbeiten

Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 1-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Nach dem Ausführen des Beispiels vom SCD-Typ 1 enthält die Zieltabelle die folgenden Datensätze:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Nach der Ausführung des SCD-Typ 1-Beispiels mit dem zusätzlichen TRUNCATE-Datensatz werden die Datensätze 124 und 126 aufgrund des TRUNCATE-Vorgangs bei sequenceNum=3 abgeschnitten, und die Zieltabelle enthält den folgenden Datensatz:

userId name city
125 Mercedes Guadalajara

Aktualisierungen des SCD-Typs 2 verarbeiten

Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Nach dem Ausführen des SCD Typ 2-Beispiels enthält die Zieltabelle die folgenden Datensätze:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 NULL

Eine SCD-Typ 2-Abfrage kann auch eine Teilmenge der Ausgabespalten angeben, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Änderungen an anderen Spalten werden aktualisiert, statt dass neue Verlaufsdatensätze generiert werden. Im folgenden Beispiel wird veranschaulicht, dass die Spalte city von der Nachverfolgung ausgeschlossen wird:

Im folgenden Beispiel wird die Verwendung des Verlaufs mit SCD-Typ 2 veranschaulicht:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Nach dem Ausführen dieses Beispiels ohne den zusätzlichen TRUNCATE-Datensatz enthält die Zieltabelle die folgenden Datensätze:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Generieren von Testdaten

Der folgende Code wird bereitgestellt, um ein Beispiel-Dataset für die Verwendung in den Beispielabfragen zu generieren, die in diesem Lernprogramm vorhanden sind. Wenn Sie über die richtigen Anmeldeinformationen verfügen, um ein neues Schema zu erstellen und eine neue Tabelle zu erstellen, können Sie diese Anweisungen entweder mit einem Notebook oder mit Databricks SQL ausführen. Der folgende Code sollte nicht als Teil einer Delta Live Tables-Pipeline ausgeführt werden:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Beispiel: Regelmäßige Momentaufnahmeverarbeitung

Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die Momentaufnahmen einer Tabelle erfasst, die bei mycatalog.myschema.mytable gespeichert wird. Die Ergebnisse der Verarbeitung werden in eine Tabelle mit dem Namen target geschrieben.

mycatalog.myschema.mytable erfasst zum Zeitstempel 2024-01-01 00:00:00

Schlüssel Wert
1 a1
2 a2

mycatalog.myschema.mytable erfasst zum Zeitstempel 2024-01-01 12:00:00

Schlüssel Wert
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:

Schlüssel Wert __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 NULL
3 a3 2024-01-01 12:00:00 NULL

Beispiel: Verarbeitung von historischen Momentaufnahmen

Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die eine Zieltabelle basierend auf Quellereignissen aus zwei Momentaufnahmen aktualisiert, die in einem Cloudspeichersystem gespeichert sind:

Momentaufnahme bei timestamp, gespeichert in /<PATH>/filename1.csv

Schlüssel TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Momentaufnahme bei timestamp + 5, gespeichert in /<PATH>/filename2.csv

Schlüssel TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates mit diesen Momentaufnahmen veranschaulicht:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:

Schlüssel TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 NULL
3 a3 b3 2 NULL
4 a4 b4_new 1 NULL

Hinzufügen, Ändern oder Löschen von Daten in einer Zielstreamingtabelle

Wenn Ihre Pipeline Tabellen im Unity Catalog veröffentlicht, können Sie Datenbearbeitungssprache (DML)-Anweisungen verwenden, einschließlich Einfüge-, Aktualisierungs-, Lösch- und Zusammenführungsanweisungen, um die von APPLY CHANGES INTO-Anweisungen erstellten Zielstreamingtabellen zu ändern.

Hinweis

  • DML-Anweisungen, die das Tabellenschema einer Streamingtabelle ändern, werden nicht unterstützt. Stellen Sie sicher, dass Ihre DML-Anweisungen nicht versuchen, das Tabellenschema weiterzuentwickeln.
  • DML-Anweisungen, die eine Streamingtabelle aktualisieren, können nur in einem freigegebenen Unity Catalog-Cluster oder einem SQL-Warehouse mit Databricks Runtime 13.3 LTS und höher ausgeführt werden.
  • Da für das Streaming Datenquellen im Nur-Anfügen-Modus benötigt werden, setzen Sie das Flag skipChangeCommits beim Lesen der Streaming-Quelltabelle, wenn Ihre Verarbeitung Streaming aus einer Streaming-Quelltabelle mit Änderungen (z. B. durch DML-Anweisungen) erfordert. Wenn skipChangeCommits festgelegt ist, werden Transaktionen, die Datensätze in der Quelltabelle löschen oder ändern, ignoriert. Wenn Ihre Verarbeitung keine Streaming-Tabelle erfordert, können Sie eine materialisierte Ansicht (die nicht die Einschränkung „nur Anhängen“ hat) als Zieltabelle verwenden.

Da Delta Live Tables eine angegebene SEQUENCE BY-Spalte verwendet und geeignete Sequenzierungswerte an die __START_AT- und __END_AT-Spalten der Zieltabelle (für SCD-Typ 2) verteilt, müssen Sie sicherstellen, dass DML-Anweisungen gültige Werte für diese Spalten verwenden, um die richtige Reihenfolge von Datensätzen aufrechtzuerhalten. Lesen Sie Wie wird CDC mit der APPLY CHANGES-API implementiert?.

Weitere Informationen zum Verwenden von DML-Anweisungen mit Streamingtabellen finden Sie unter Hinzufügen, Ändern oder Löschen von Daten in einer Streamingtabelle.

Im folgenden Beispiel wird ein aktiver Datensatz mit einer Startsequenz von 5 eingefügt:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Lesen eines Änderungsdatenfeeds aus einer APPLY CHANGES-Zieltabelle

In Databricks Runtime 15.2 und höher können Sie einen Änderungsdatenfeed aus einer Streamingtabelle, die das Ziel von APPLY CHANGES- oder APPLY CHANGES FROM SNAPSHOT-Abfragen ist, auf die gleiche Weise lesen, wie Sie einen Änderungsdatenfeed aus anderen Delta-Tabellen lesen. Nachfolgend finden Sie Informationen zum Lesen des Änderungsdatenfeeds aus einer Zielstreamingtabelle:

  • Die Zielstreamingtabelle muss in Unity Catalog veröffentlicht werden. Weitere Informationen finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.
  • Um den Änderungsdatenfeed aus der Zielstreamingtabelle zu lesen, müssen Sie Databricks Runtime 15.2 oder höher verwenden. Um den Änderungsdatenfeed in einer anderen Delta Live Tables-Pipeline zu lesen, muss die Pipeline für die Verwendung von Databricks Runtime 15.2 oder höher konfiguriert werden.

Das Lesen des Änderungsdatenfeeds aus einer Zielstreamingtabelle, die in einer Delta Live Tables-Pipeline erstellt wurde, funktioniert genauso wie das Lesen eines Änderungsdatenfeeds aus anderen Delta-Tabellen. Weitere Informationen zur Verwendung der Delta-Änderungsdatenfeedfunktionen, einschließlich Beispielen in Python und SQL, finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.

Hinweis

Der Datensatz für Änderungsdatenfeed enthält Metadaten, die den Typ des Änderungsereignisses identifizieren. Wenn ein Datensatz in einer Tabelle aktualisiert wird, enthalten die Metadaten für die zugehörigen Änderungsdatensätze in der Regel _change_type-Werte, die auf update_preimage- und update_postimage-Ereignisse festgelegt sind.

Die _change_type-Werte unterscheiden sich jedoch, wenn Aktualisierungen an der Zielstreamingtabelle vorgenommen werden, die Änderungen der Primärschlüsselwerte umfassen. Wenn Änderungen Aktualisierungen an Primärschlüsseln enthalten, werden die _change_type-Metadatenfelder auf insert- und delete-Ereignisse festgelegt. Änderungen an Primärschlüsseln können auftreten, wenn manuelle Aktualisierungen an einem der Schlüsselfelder mit einer UPDATE- oder MERGE-Anweisung oder für SCD-Typ 2-Tabellen vorgenommen werden, wenn sich das __start_at-Feld ändert, um einen früheren Startsequenzwert widerzuspiegeln.

Die APPLY CHANGES-Abfrage bestimmt die Primärschlüsselwerte, die sich für die SCD-Typ 1- und SCD-Typ 2-Verarbeitung unterscheiden:

  • Bei der SCD-Typ 1-Verarbeitung und der Delta Live Tables-Schnittstelle für Python ist der Primärschlüssel der Wert des keys-Parameters in der apply_changes()-Funktion. Für die Delta Live Tables-Schnittstelle für SQL ist der Primärschlüssel die Spalten, die durch die KEYS-Klausel in der APPLY CHANGES INTO-Anweisung definiert werden.
  • Bei SCD-Typ 2 ist der Primärschlüssel der keys-Parameter oder die KEYS-Klausel sowie der Rückgabewert des coalesce(__START_AT, __END_AT)-Vorgangs, wobei __START_AT und __END_AT die entsprechenden Spalten aus der Zielstreamingtabelle sind.

Abrufen von Daten zu Datensätzen, die von einer Delta Live Tables-CDC-Abfrage verarbeitet werden

Hinweis

Die folgenden Metriken werden nur von APPLY CHANGES-Abfragen und nicht von APPLY CHANGES FROM SNAPSHOT-Abfragen erfasst.

Die folgenden Metriken werden von APPLY CHANGES-Abfragen erfasst:

  • num_upserted_rows: Die Anzahl der Ausgabezeilen, für die während einer Aktualisierung ein Upsertvorgang im Dataset ausgeführt wird.
  • num_deleted_rows: Die Anzahl vorhandener Ausgabezeilen, die während einer Aktualisierung aus dem Dataset gelöscht wurden.

Die Metrik num_output_rows, die für Nicht-CDC-Flows ausgegeben wird, wird nicht für apply changes-Abfragen erfasst.

Welche Datenobjekte werden für die CDC-Verarbeitung von Delta Live Tables verwendet?

Hinweis: Die folgenden Datenstrukturen gelten nur für APPLY CHANGES-Verarbeitung, nicht für APPLY CHANGES FROM SNAPSHOT-Verarbeitung.

Wenn Sie die Zieltabelle im Hive-Metastore deklarieren, werden zwei Datenstrukturen erstellt:

  • Eine Ansicht mit dem Namen, der der Zieltabelle zugewiesen ist.
  • Eine interne Sicherungstabelle, die von Delta Live Tables zum Verwalten der CDC-Verarbeitung verwendet wird. Diese Tabelle wird benannt, indem dem Namen der Zieltabelle __apply_changes_storage_ vorangestellt wird.

Wenn Sie beispielsweise eine Zieltabelle mit dem Namen dlt_cdc_targetdeklarieren, wird eine Ansicht mit dem Namen dlt_cdc_target und eine Tabelle mit dem Namen __apply_changes_storage_dlt_cdc_target im Metastore angezeigt. Durch das Erstellen einer Ansicht können Delta Live Tables die zusätzlichen Informationen (z. B. Tombstones und Versionen) herausfiltern, die für die Verarbeitung von nicht ordnungsgemäßen Daten erforderlich sind. Um die verarbeiteten Daten anzuzeigen, fragen Sie die Zielansicht ab. Da sich das Schema der __apply_changes_storage_-Tabelle möglicherweise ändert, um zukünftige Features oder Verbesserungen zu unterstützen, sollten Sie die Tabelle nicht für die Produktionsverwendung abfragen. Wenn Sie der Tabelle Daten manuell hinzufügen, werden die Datensätze angenommen, bevor andere Änderungen vorgenommen werden, da die Versionsspalten fehlen.

Wenn eine Pipeline im Unity Catalog veröffentlicht wird, sind die internen Sicherungstabellen für Benutzer nicht zugänglich.