Inkrementelles Laden von Daten aus Data Warehouse in Lakehouse
In diesem Tutorial erfahren Sie, wie Sie Daten inkrementell aus Data Warehouse in ein Lakehouse laden.
Überblick
Allgemeines Lösungsdiagramm:
Hier sind die wesentlichen Schritte beim Erstellen dieser Lösung aufgeführt:
Select the watermark column (Wählen Sie die Grenzwert-Spalte aus) . Wählen Sie eine Spalte in der Quelldatentabelle aus, die verwendet werden kann, um die neuen oder aktualisierten Datensätze für jede Ausführung in Segmente aufzuteilen. Normalerweise steigen die Daten in dieser ausgewählten Spalte (z.B. Last_modify_time oder ID), wenn Zeilen erstellt oder aktualisiert werden. Der maximale Wert in dieser Spalte wird als Grenzwert verwendet.
Bereiten Sie eine Tabelle vor, um den letzten Grenzwert in Ihrem Data Warehouse zu speichern.
Erstellen einer Pipeline mit dem folgenden Workflow:
Die Pipeline in dieser Lösung enthält die folgenden Aktivitäten:
- Erstellen Sie zwei Lookup-Aktivitäten. Verwenden Sie die erste Lookup-Aktivität, um den letzten Grenzwert abzurufen. Verwenden Sie die zweite Lookup-Aktivität, um den neuen Grenzwert abzurufen. Diese Grenzwerte werden an die Kopieraktivität übergeben.
- Erstellen Sie eine Copy-Aktivität, die Zeilen aus der Quelldatentabelle kopiert, wobei der Wert der Grenzwertspalte größer als der alte Grenzwert und kleiner als der neue Grenzwert ist. Anschließend werden die Daten aus dem Data Warehouse als neue Datei in das Lakehouse kopiert.
- Erstellen Sie eine gespeicherte Prozeduraktivität, die den Grenzwert für die nächste Pipelineausführung aktualisiert.
Voraussetzungen
- Data Warehouse: Sie verwenden das Data Warehouse als Quelldatenspeicher. Wenn Sie noch keines haben, lesen Sie Erstellen eines Data Warehouse. Dort finden Sie Schritte zur Erstellung.
- Lakehouse: Sie verwenden ein Lakehouse als Zieldatenspeicher. Wenn Sie noch keines haben, lesen Sie Erstellen eines Lakehouse. Dort finden Sie Schritte zur Erstellung. Erstellen Sie einen Ordner namens IncrementalCopy, um die kopierten Daten zu speichern.
Vorbereiten der Quelle
Im Folgenden finden Sie einige Tabellen und gespeicherte Prozeduren, die Sie in Ihrem Quell-Data Warehouse vorbereiten müssen, bevor Sie die inkrementelle Kopierpipeline konfigurieren.
1. Erstellen einer Datenquellentabelle in Ihrem Data Warehouse
Führen Sie den folgenden SQL-Befehl in Ihrem Data Warehouse aus, um eine Tabelle namens data_source_table als Datenquellentabelle zu erstellen. In diesem Tutorial verwenden Sie sie als Beispieldaten für die inkrementelle Kopie.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Die Daten in der Datenquellentabelle werden unten angezeigt:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
In diesem Tutorial verwenden Sie LastModifytime als die Grenzwertspalte.
2. Erstellen einer anderen Tabelle in Ihrem Data Warehouse zum Speichern des letzten Grenzwerts
Führen Sie den folgenden SQL-Befehl in Ihrem Data Warehouse aus, um eine Tabelle namens watermarktable zu erstellen und den letzten Grenzwert zu speichern:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Legen Sie den Standardwert für den letzten Grenzwert mit dem Tabellennamen der Quelldatentabelle fest. In diesem Tutorial lautet der Tabellenname data_source_table, und der Standardwert ist
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Überprüfen Sie die Daten in der Tabelle watermarktable.
Select * from watermarktable
Ausgabe:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Erstellen einer gespeicherten Prozedur in Ihrem Data Warehouse
Führen Sie den folgenden Befehl zum Erstellen einer gespeicherten Prozedur in Ihrem Data Warehouse aus. Diese gespeicherte Prozedur wird verwendet, um den letzten Grenzwert nach der letzten Pipelineausführung zu aktualisieren.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Konfigurieren einer Pipeline für inkrementelles Kopieren
Schritt 1: Erstellen einer Pipeline
Navigieren Sie zu Power BI.
Wählen Sie unten links auf dem Bildschirm das Power BI-Symbol und dann Data Factory aus, um die Homepage von Data Factory zu öffnen.
Navigieren Sie zu Ihrem Microsoft Fabric-Arbeitsbereich.
Wählen Sie Datenpipeline aus, und geben Sie dann einen Pipelinenamen ein, um eine neue Pipeline zu erstellen.
Schritt 2: Hinzufügen einer Lookupaktivität für den letzten Grenzwert
In diesem Schritt erstellen Sie eine Lookupaktivität, um den letzten Grenzwert abzurufen. Der zuvor festgelegte Standardwert 1/1/2010 12:00:00 AM
wird abgerufen.
Wählen Sie Pipelineaktivität hinzufügen aus, und wählen Sie in der Dropdownliste die Option Lookup aus.
Benennen Sie diese Aktivität auf der Registerkarte Allgemein in LookupOldWaterMarkActivity um.
Führen Sie auf der Registerkarte Einstellungen die folgende Konfiguration aus:
- Datenspeichertyp: Wählen Sie Arbeitsbereich aus.
- Datenspeichertyp des Arbeitsbereichs: Wählen Sie Data Warehouse aus.
- Data Warehouse: Wählen Sie Ihr Data Warehouse aus.
- Abfrage verwenden: Wählen Sie Tabelle aus.
- Tabelle: Wählen Sie dbo.watermarktable aus.
- Nur erste Zeile: Ausgewählt.
Schritt 3: Hinzufügen einer Lookupaktivität für den neuen Grenzwert
In diesem Schritt erstellen Sie eine Lookupaktivität, um den neuen Grenzwert abzurufen. Sie verwenden eine Abfrage, um den neuen Grenzwert aus Ihrer Quelldatentabelle abzurufen. Der Maximalwert in der LastModifytime-Spalte in data_source_table wird abgerufen.
Wählen Sie auf der oberen Leiste unter der Registerkarte Aktivitäten die Option Lookup aus, und fügen Sie die zweite Lookaktivität hinzu.
Benennen Sie diese Aktivität auf der Registerkarte Allgemein in LookupNewWaterMarkActivity um.
Führen Sie auf der Registerkarte Einstellungen die folgende Konfiguration aus:
Datenspeichertyp: Wählen Sie Arbeitsbereich aus.
Datenspeichertyp des Arbeitsbereichs: Wählen Sie Data Warehouse aus.
Data Warehouse: Wählen Sie Ihr Data Warehouse aus.
Abfrage verwenden: Wählen Sie Abfrage aus.
Abfrage: Geben Sie die folgende Abfrage ein, um den maximalen Zeitpunkt der letzten Änderung als neuen Grenzwert auszuwählen:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Nur erste Zeile: Ausgewählt.
Schritt 4: Hinzufügen der Kopieraktivität zum Kopieren inkrementeller Daten
In diesem Schritt fügen Sie eine Kopieraktivität hinzu, um die inkrementellen Daten zwischen dem letzten Grenzwert und dem neuen Grenzwert aus Data Warehouse in das Lakehouse zu kopieren.
Wählen Sie in der oberen Leiste Aktivitäten aus, und wählen Sie Daten kopieren –>In Canvas hinzufügen aus, um die Kopieraktivität abzurufen.
Benennen Sie diese Aktivität auf der Registerkarte Allgemein in IncrementalCopyActivity um.
Verbinden Sie beide Lookupaktivitäten mit der Kopieraktivität, indem Sie die grüne Schaltfläche (bei Erfolg), die den Lookupaktivitäten zugeordnet ist, zur Kopieraktivität ziehen. Lassen Sie die Maustaste los, wenn Sie sehen, dass sich die Rahmenfarbe der Kopieraktivität in grün ändert.
Führen Sie auf der Registerkarte Quelle die folgende Konfiguration aus:
Datenspeichertyp: Wählen Sie Arbeitsbereich aus.
Datenspeichertyp des Arbeitsbereichs: Wählen Sie Data Warehouse aus.
Data Warehouse: Wählen Sie Ihr Data Warehouse aus.
Abfrage verwenden: Wählen Sie Abfrage aus.
Abfrage: Geben Sie die folgende Abfrage ein, um inkrementelle Daten zwischen dem letzten Grenzwert und dem neuen Grenzwert zu kopieren.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Führen Sie auf der Registerkarte Ziel die folgende Konfiguration aus:
- Datenspeichertyp: Wählen Sie Arbeitsbereich aus.
- Datenspeichertyp des Arbeitsbereichs: Wählen Sie Lakehouse aus.
- Lakehouse: Wählen Sie Ihr Lakehouse aus.
- Stammordner: Wählen Sie Dateien aus.
- Dateipfad: Geben Sie den Ordner an, in dem die kopierten Daten gespeichert werden sollen. Wählen Sie Durchsuchen aus, um Ihren Ordner auszuwählen. Öffnen Sie für den Dateinamen Dynamischen Inhalt hinzufügen, und geben Sie
@CONCAT('Incremental-', pipeline().RunId, '.txt')
im geöffneten Fenster ein, um Dateinamen für Ihre kopierte Datendatei im Lakehouse zu erstellen. - Dateiformat: Wählen Sie den Formattyp Ihrer Daten aus.
Schritt 5: Hinzufügen einer Aktivität der gespeicherten Prozedur
In diesem Schritt fügen Sie eine Aktivität der gespeicherten Prozedur hinzu, um den letzten Grenzwert für die nächste Pipelineausführung zu aktualisieren.
Wählen Sie auf der oberen Leiste Aktivitäten und dann Gespeicherte Prozedur aus, um eine Aktivität der gespeicherten Prozeduren hinzuzufügen.
Benennen Sie diese Aktivität auf der Registerkarte Allgemein in StoredProceduretoWriteWatermarkActivity um.
Verbinden Sie die grüne Ausgabe (bei Erfolg) der Kopieraktivität mit der Aktivität der gespeicherten Prozedur.
Führen Sie auf der Registerkarte Einstellungen die folgende Konfiguration aus:
Datenspeichertyp: Wählen Sie Arbeitsbereich aus.
Data Warehouse: Wählen Sie Ihr Data Warehouse aus.
Name der gespeicherten Prozedur: Geben Sie die gespeicherte Prozedur an, die Sie in Ihrem Data Warehouse erstellt haben: [dbo].[ usp_write_watermark].
Erweitern Sie Parameter gespeicherter Prozeduren. Wählen Sie zum Angeben von Werten für die Parameter der gespeicherten Prozedur die Option Import aus, und geben Sie für die Parameter die folgenden Werte ein:
Name type Wert LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Schritt 6: Ausführen der Pipeline und Überwachen des Ergebnisses
Wählen Sie auf der oberen Leiste auf der Registerkarte Startseite die Option Ausführen aus. Wählen Sie dann Speichern und ausführen aus. Die Pipeline startet die Ausführung, und Sie können die Pipeline auf der Registerkarte Ausgabe überwachen.
Wechseln Sie zu Ihrem Lakehouse. Die Datendatei befindet sich in dem von Ihnen angegebenen Ordner, und Sie können die Datei auswählen, um eine Vorschau der kopierten Daten anzuzeigen.
Fügen Sie weitere Daten hinzu, um die Ergebnisse des inkrementellen Kopierens anzuzeigen.
Nachdem Sie die erste Pipelineausführung abgeschlossen haben, versuchen wir, Ihrer Data Warehouse-Quelltabelle weitere Daten hinzuzufügen, um festzustellen, ob diese Pipeline Ihre inkrementellen Daten kopieren kann.
Schritt 1: Hinzufügen weiterer Daten zur Quelle
Fügen Sie neue Daten in Ihr Data Warehouse ein, indem Sie die folgende Abfrage ausführen:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Die aktualisierten Daten für data_source_table sind die folgenden:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Schritt 2: Auslösen einer weiteren Pipelineausführung und Überwachen des Ergebnisses
Wechseln Sie zurück zur Pipelineseite. Wählen Sie auf der oberen Leiste auf der Registerkarte Startseite erneut die Option Ausführen aus. Die Pipeline startet die Ausführung, und Sie können die Pipeline unter Ausgabe überwachen.
Wechseln Sie zu Ihrem Lakehouse. Die neu kopierte Datendatei befindet sich in dem von Ihnen angegebenen Ordner, und Sie können die Datei auswählen, um eine Vorschau der kopierten Daten anzuzeigen. Ihre inkrementellen Daten werden in dieser Datei angezeigt.
Zugehöriger Inhalt
Als Nächstes erfahren Sie mehr über das Kopieren von Azure Blob Storage in das Lakehouse.