Incrementeel gegevens laden van Data Warehouse naar Lakehouse
In deze zelfstudie leert u hoe u incrementeel gegevens laadt van Data Warehouse naar Lakehouse.
Overzicht
Dit is het diagram van een oplossing op hoog niveau:
Dit zijn de belangrijke stappen voor het maken van deze oplossing:
Selecteer de grenswaardekolom. Selecteer één kolom in de brongegevenstabel, die kan worden gebruikt om de nieuwe of bijgewerkte records voor elke uitvoering te segmenteren. Normaal gesproken nemen de gegevens in deze geselecteerde kolom (bijvoorbeeld, last_modify_time of id) toe wanneer de rijen worden gemaakt of bijgewerkt. De maximale waarde in deze kolom wordt gebruikt als grenswaarde.
Bereid een tabel voor om de laatste grenswaarde in uw datawarehouse op te slaan.
Maak een pijplijn met de volgende werkstroom:
De pijplijn in deze oplossing heeft de volgende activiteiten:
- Maak twee opzoekactiviteiten. Gebruik de eerste opzoekactiviteit om de laatste grenswaarde op te halen. Gebruik de tweede opzoekactiviteit om de nieuwe grenswaarde op te halen. Deze grenswaarden worden doorgegeven aan de kopieeractiviteit.
- Maak een kopieeractiviteit waarmee rijen uit de brongegevenstabel worden gekopieerd met de waarde van de watermerkkolom die groter is dan de oude grenswaarde en kleiner is dan de nieuwe grenswaarde. Vervolgens worden de gegevens van het datawarehouse naar Lakehouse gekopieerd als een nieuw bestand.
- Maak een opgeslagen procedureactiviteit waarmee de laatste grenswaarde voor de volgende pijplijnuitvoering wordt bijgewerkt.
Vereisten
- Datawarehouse. U gebruikt het datawarehouse als het brongegevensarchief. Als u deze niet hebt, raadpleegt u Een datawarehouse maken voor stappen om er een te maken.
- Lakehouse. U gebruikt Lakehouse als doelgegevensarchief. Als u deze niet hebt, raadpleegt u Een Lakehouse maken voor stappen om er een te maken. Maak een map met de naam IncrementalCopy om de gekopieerde gegevens op te slaan.
Uw bron voorbereiden
Hier volgen enkele tabellen en opgeslagen procedures die u moet voorbereiden in uw brondatawarehouse voordat u de incrementele kopieerpijplijn configureert.
1. Een gegevensbrontabel maken in uw datawarehouse
Voer de volgende SQL-opdracht uit in uw Data Warehouse om een tabel met de naam data_source_table te maken als de gegevensbrontabel. In deze zelfstudie gebruikt u deze als voorbeeldgegevens om de incrementele kopie uit te voeren.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
De gegevens in de gegevensbrontabel worden hieronder weergegeven:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
In deze zelfstudie gebruikt u LastModifytime als de watermerkkolom.
2. Maak een andere tabel in uw datawarehouse om de laatste grenswaarde op te slaan
Voer de volgende SQL-opdracht uit in uw Data Warehouse om een tabel met de naam watermerktabel te maken om de laatste grenswaarde op te slaan:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Stel de standaardwaarde van het laatste watermerk in met de tabelnaam van de brongegevenstabel. In deze zelfstudie wordt de tabelnaam data_source_table en de standaardwaarde is
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Controleer de gegevens in de watermerktabel van de tabel.
Select * from watermarktable
Uitvoer:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Maak een opgeslagen procedure in uw datawarehouse
Voer de volgende opdracht uit om een opgeslagen procedure te maken in uw datawarehouse. Deze opgeslagen procedure wordt gebruikt om de laatste grenswaarde bij te werken na de laatste pijplijnuitvoering.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Een pijplijn configureren voor incrementele kopie
Stap 1: Een pijplijn maken
Navigeer naar Power BI.
Selecteer het Power BI-pictogram in de linkerbenedenhoek van het scherm en selecteer vervolgens Data factory om de startpagina van Data Factory te openen.
Navigeer naar uw Microsoft Fabric-werkruimte.
Selecteer Gegevenspijplijn en voer vervolgens een pijplijnnaam in om een nieuwe pijplijn te maken.
Stap 2: Een opzoekactiviteit toevoegen voor het laatste watermerk
In deze stap maakt u een opzoekactiviteit om de laatste grenswaarde op te halen. De eerder ingestelde standaardwaarde 1/1/2010 12:00:00 AM
wordt verkregen.
Selecteer Pijplijnactiviteit toevoegen en selecteer Opzoeken in de vervolgkeuzelijst.
Wijzig de naam van deze activiteit op het tabblad Algemeen in LookupOldWaterMarkActivity.
Voer op het tabblad Instellingen de volgende configuratie uit:
- Gegevensarchieftype: Werkruimte selecteren.
- Type werkruimtegegevensarchief: Selecteer Data Warehouse.
- Datawarehouse: Selecteer uw datawarehouse.
- Query gebruiken: Tabel kiezen.
- Tabel: kies dbo.watermarktable.
- Alleen eerste rij: Geselecteerd.
Stap 3: Een opzoekactiviteit toevoegen voor het nieuwe watermerk
In deze stap maakt u een opzoekactiviteit om de nieuwe grenswaarde op te halen. U gebruikt een query om het nieuwe watermerk op te halen uit de brongegevenstabel. De maximumwaarde in kolom LastModifytime in tabel data_source_table wordt verkregen.
Selecteer op de bovenste balk Opzoeken onder het tabblad Activiteiten om de tweede opzoekactiviteit toe te voegen.
Wijzig op het tabblad Algemeen de naam van deze activiteit in LookupNewWaterMarkActivity.
Voer op het tabblad Instellingen de volgende configuratie uit:
Gegevensarchieftype: Werkruimte selecteren.
Type werkruimtegegevensarchief: Selecteer Data Warehouse.
Datawarehouse: Selecteer uw datawarehouse.
Query gebruiken: Kies Query.
Query: Voer de volgende query in om het maximale tijdstip van de laatste wijziging te kiezen als het nieuwe watermerk:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Alleen eerste rij: Geselecteerd.
Stap 4: De kopieeractiviteit toevoegen om incrementele gegevens te kopiëren
In deze stap voegt u een kopieeractiviteit toe om de incrementele gegevens tussen het laatste watermerk en het nieuwe watermerk van Data Warehouse naar Lakehouse te kopiëren.
Selecteer Activiteiten op de bovenste balk en selecteer Gegevens kopiëren -> Toevoegen aan canvas om de kopieeractiviteit op te halen.
Wijzig de naam van deze activiteit op het tabblad Algemeen in IncrementalCopyActivity.
Verbind beide opzoekactiviteiten met de kopieeractiviteit door de groene knop (Bij geslaagd) te slepen die is gekoppeld aan de opzoekactiviteiten naar de kopieeractiviteit. Laat de muisknop los wanneer u de randkleur van de kopieeractiviteit groen ziet.
Voer op het tabblad Bron de volgende configuratie uit:
Gegevensarchieftype: Werkruimte selecteren.
Type werkruimtegegevensarchief: Selecteer Data Warehouse.
Datawarehouse: Selecteer uw datawarehouse.
Query gebruiken: Kies Query.
Query: Voer de volgende query in om incrementele gegevens te kopiëren tussen het laatste watermerk en het nieuwe watermerk.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Voer op het tabblad Doel de volgende configuratie uit:
- Gegevensarchieftype: Werkruimte selecteren.
- Type werkruimtegegevensarchief: Selecteer Lakehouse.
- Lakehouse: Selecteer uw Lakehouse.
- Hoofdmap: Kies Bestanden.
-
Bestandspad: Geef de map op waarin u de gekopieerde gegevens wilt opslaan. Selecteer Bladeren om uw map te selecteren. Open Dynamische inhoud toevoegen voor de bestandsnaam en voer
@CONCAT('Incremental-', pipeline().RunId, '.txt')
in het geopende venster in om bestandsnamen te maken voor het gekopieerde gegevensbestand in Lakehouse. - Bestandsindeling: Selecteer het indelingstype van uw gegevens.
Stap 5: Een opgeslagen procedureactiviteit toevoegen
In deze stap voegt u een opgeslagen procedureactiviteit toe om de laatste grenswaarde voor de volgende pijplijnuitvoering bij te werken.
Selecteer Activiteiten op de bovenste balk en selecteer Opgeslagen procedure om een opgeslagen procedureactiviteit toe te voegen.
Wijzig de naam van deze activiteit op het tabblad Algemeen in StoredProceduretoWriteWatermarkActivity.
Verbind de groene uitvoer van de kopieeractiviteit (bij succes) met de opgeslagen procedureactiviteit.
Voer op het tabblad Instellingen de volgende configuratie uit:
Gegevensarchieftype: Werkruimte selecteren.
Datawarehouse: Selecteer uw datawarehouse.
Naam van opgeslagen procedure: geef de opgeslagen procedure op die u hebt gemaakt in uw datawarehouse: [dbo].[ usp_write_watermark].
Vouw opgeslagen procedureparameters uit. Als u waarden voor de opgeslagen procedureparameters wilt opgeven, selecteert u Importeren en voert u de volgende waarden in voor de parameters:
Name Type Weergegeven als LastModifiedtime Datum en tijd @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Stap 6: Voer de pijplijn uit en bewaak het resultaat
Selecteer Uitvoeren op het tabblad Start op de bovenste balk. Selecteer Vervolgens Opslaan en uitvoeren. De pijplijn wordt uitgevoerd en u kunt de pijplijn bewaken op het tabblad Uitvoer .
Ga naar uw Lakehouse, u vindt dat het gegevensbestand zich bevindt onder de map die u hebt opgegeven en u kunt het bestand selecteren om een voorbeeld van de gekopieerde gegevens te bekijken.
Meer gegevens toevoegen om de incrementele kopieerresultaten weer te geven
Nadat u de eerste pijplijnuitvoering hebt voltooid, gaan we proberen om meer gegevens toe te voegen in uw datawarehouse-brontabel om te zien of deze pijplijn uw incrementele gegevens kan kopiëren.
Stap 1: Meer gegevens toevoegen aan bron
Voeg nieuwe gegevens in uw datawarehouse in door de volgende query uit te voeren:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
De bijgewerkte gegevens voor data_source_table zijn:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Stap 2: Een andere pijplijnuitvoering activeren en het resultaat bewaken
Ga terug naar uw pijplijnpagina. Selecteer op de bovenste balk opnieuw Uitvoeren onder het tabblad Start . De pijplijn wordt uitgevoerd en u kunt de pijplijn bewaken onder Uitvoer.
Ga naar uw Lakehouse, u vindt dat het nieuwe gekopieerde gegevensbestand zich bevindt onder de map die u hebt opgegeven en u kunt het bestand selecteren om een voorbeeld van de gekopieerde gegevens te bekijken. U ziet dat uw incrementele gegevens worden weergegeven in dit bestand.
Gerelateerde inhoud
Ga vervolgens verder voor meer informatie over kopiëren van Azure Blob Storage naar Lakehouse.