Delen via


Incrementeel gegevens laden van Data Warehouse naar Lakehouse

In deze zelfstudie leert u hoe u incrementeel gegevens laadt van Data Warehouse naar Lakehouse.

Overzicht

Dit is het diagram van een oplossing op hoog niveau:

Diagram van incrementeel laden van gegevenslogica.

Dit zijn de belangrijke stappen voor het maken van deze oplossing:

  1. Selecteer de grenswaardekolom. Selecteer één kolom in de brongegevenstabel, die kan worden gebruikt om de nieuwe of bijgewerkte records voor elke uitvoering te segmenteren. Normaal gesproken nemen de gegevens in deze geselecteerde kolom (bijvoorbeeld, last_modify_time of id) toe wanneer de rijen worden gemaakt of bijgewerkt. De maximale waarde in deze kolom wordt gebruikt als grenswaarde.

  2. Bereid een tabel voor om de laatste grenswaarde in uw datawarehouse op te slaan.

  3. Maak een pijplijn met de volgende werkstroom:

    De pijplijn in deze oplossing heeft de volgende activiteiten:

    • Maak twee opzoekactiviteiten. Gebruik de eerste opzoekactiviteit om de laatste grenswaarde op te halen. Gebruik de tweede opzoekactiviteit om de nieuwe grenswaarde op te halen. Deze grenswaarden worden doorgegeven aan de kopieeractiviteit.
    • Maak een kopieeractiviteit waarmee rijen uit de brongegevenstabel worden gekopieerd met de waarde van de watermerkkolom die groter is dan de oude grenswaarde en kleiner is dan de nieuwe grenswaarde. Vervolgens worden de gegevens van het datawarehouse naar Lakehouse gekopieerd als een nieuw bestand.
    • Maak een opgeslagen procedureactiviteit waarmee de laatste grenswaarde voor de volgende pijplijnuitvoering wordt bijgewerkt.

Vereisten

  • Datawarehouse. U gebruikt het datawarehouse als het brongegevensarchief. Als u deze niet hebt, raadpleegt u Een datawarehouse maken voor stappen om er een te maken.
  • Lakehouse. U gebruikt Lakehouse als doelgegevensarchief. Als u deze niet hebt, raadpleegt u Een Lakehouse maken voor stappen om er een te maken. Maak een map met de naam IncrementalCopy om de gekopieerde gegevens op te slaan.

Uw bron voorbereiden

Hier volgen enkele tabellen en opgeslagen procedures die u moet voorbereiden in uw brondatawarehouse voordat u de incrementele kopieerpijplijn configureert.

1. Een gegevensbrontabel maken in uw datawarehouse

Voer de volgende SQL-opdracht uit in uw Data Warehouse om een tabel met de naam data_source_table te maken als de gegevensbrontabel. In deze zelfstudie gebruikt u deze als voorbeeldgegevens om de incrementele kopie uit te voeren.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

De gegevens in de gegevensbrontabel worden hieronder weergegeven:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

In deze zelfstudie gebruikt u LastModifytime als de watermerkkolom.

2. Maak een andere tabel in uw datawarehouse om de laatste grenswaarde op te slaan

  1. Voer de volgende SQL-opdracht uit in uw Data Warehouse om een tabel met de naam watermerktabel te maken om de laatste grenswaarde op te slaan:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Stel de standaardwaarde van het laatste watermerk in met de tabelnaam van de brongegevenstabel. In deze zelfstudie wordt de tabelnaam data_source_table en de standaardwaarde is 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Controleer de gegevens in de watermerktabel van de tabel.

    Select * from watermarktable
    

    Uitvoer:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Maak een opgeslagen procedure in uw datawarehouse

Voer de volgende opdracht uit om een opgeslagen procedure te maken in uw datawarehouse. Deze opgeslagen procedure wordt gebruikt om de laatste grenswaarde bij te werken na de laatste pijplijnuitvoering.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Een pijplijn configureren voor incrementele kopie

Stap 1: Een pijplijn maken

  1. Navigeer naar Power BI.

  2. Selecteer het Power BI-pictogram in de linkerbenedenhoek van het scherm en selecteer vervolgens Data factory om de startpagina van Data Factory te openen.

  3. Navigeer naar uw Microsoft Fabric-werkruimte.

  4. Selecteer Gegevenspijplijn en voer vervolgens een pijplijnnaam in om een nieuwe pijplijn te maken.

    Schermopname van de knop Nieuwe gegevenspijplijn in de zojuist gemaakte werkruimte.

    Schermopname van de naam van het maken van een nieuwe pijplijn.

Stap 2: Een opzoekactiviteit toevoegen voor het laatste watermerk

In deze stap maakt u een opzoekactiviteit om de laatste grenswaarde op te halen. De eerder ingestelde standaardwaarde 1/1/2010 12:00:00 AM wordt verkregen.

  1. Selecteer Pijplijnactiviteit toevoegen en selecteer Opzoeken in de vervolgkeuzelijst.

  2. Wijzig de naam van deze activiteit op het tabblad Algemeen in LookupOldWaterMarkActivity.

  3. Voer op het tabblad Instellingen de volgende configuratie uit:

    • Gegevensarchieftype: Werkruimte selecteren.
    • Type werkruimtegegevensarchief: Selecteer Data Warehouse.
    • Datawarehouse: Selecteer uw datawarehouse.
    • Query gebruiken: Tabel kiezen.
    • Tabel: kies dbo.watermarktable.
    • Alleen eerste rij: Geselecteerd.

    Schermopname van het oude watermerk opzoeken.

Stap 3: Een opzoekactiviteit toevoegen voor het nieuwe watermerk

In deze stap maakt u een opzoekactiviteit om de nieuwe grenswaarde op te halen. U gebruikt een query om het nieuwe watermerk op te halen uit de brongegevenstabel. De maximumwaarde in kolom LastModifytime in tabel data_source_table wordt verkregen.

  1. Selecteer op de bovenste balk Opzoeken onder het tabblad Activiteiten om de tweede opzoekactiviteit toe te voegen.

  2. Wijzig op het tabblad Algemeen de naam van deze activiteit in LookupNewWaterMarkActivity.

  3. Voer op het tabblad Instellingen de volgende configuratie uit:

    • Gegevensarchieftype: Werkruimte selecteren.

    • Type werkruimtegegevensarchief: Selecteer Data Warehouse.

    • Datawarehouse: Selecteer uw datawarehouse.

    • Query gebruiken: Kies Query.

    • Query: Voer de volgende query in om het maximale tijdstip van de laatste wijziging te kiezen als het nieuwe watermerk:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Alleen eerste rij: Geselecteerd.

    Schermopname van het nieuwe watermerk opzoeken.

Stap 4: De kopieeractiviteit toevoegen om incrementele gegevens te kopiëren

In deze stap voegt u een kopieeractiviteit toe om de incrementele gegevens tussen het laatste watermerk en het nieuwe watermerk van Data Warehouse naar Lakehouse te kopiëren.

  1. Selecteer Activiteiten op de bovenste balk en selecteer Gegevens kopiëren -> Toevoegen aan canvas om de kopieeractiviteit op te halen.

  2. Wijzig de naam van deze activiteit op het tabblad Algemeen in IncrementalCopyActivity.

  3. Verbind beide opzoekactiviteiten met de kopieeractiviteit door de groene knop (Bij geslaagd) te slepen die is gekoppeld aan de opzoekactiviteiten naar de kopieeractiviteit. Laat de muisknop los wanneer u de randkleur van de kopieeractiviteit groen ziet.

    Schermopname van het verbinden van opzoek- en kopieeractiviteiten.

  4. Voer op het tabblad Bron de volgende configuratie uit:

    • Gegevensarchieftype: Werkruimte selecteren.

    • Type werkruimtegegevensarchief: Selecteer Data Warehouse.

    • Datawarehouse: Selecteer uw datawarehouse.

    • Query gebruiken: Kies Query.

    • Query: Voer de volgende query in om incrementele gegevens te kopiëren tussen het laatste watermerk en het nieuwe watermerk.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Schermopname van de configuratie van de kopieerbron.

  5. Voer op het tabblad Doel de volgende configuratie uit:

    • Gegevensarchieftype: Werkruimte selecteren.
    • Type werkruimtegegevensarchief: Selecteer Lakehouse.
    • Lakehouse: Selecteer uw Lakehouse.
    • Hoofdmap: Kies Bestanden.
    • Bestandspad: Geef de map op waarin u de gekopieerde gegevens wilt opslaan. Selecteer Bladeren om uw map te selecteren. Open Dynamische inhoud toevoegen voor de bestandsnaam en voer @CONCAT('Incremental-', pipeline().RunId, '.txt') in het geopende venster in om bestandsnamen te maken voor het gekopieerde gegevensbestand in Lakehouse.
    • Bestandsindeling: Selecteer het indelingstype van uw gegevens.

    Schermopname van de configuratie van het kopieerdoel.

Stap 5: Een opgeslagen procedureactiviteit toevoegen

In deze stap voegt u een opgeslagen procedureactiviteit toe om de laatste grenswaarde voor de volgende pijplijnuitvoering bij te werken.

  1. Selecteer Activiteiten op de bovenste balk en selecteer Opgeslagen procedure om een opgeslagen procedureactiviteit toe te voegen.

  2. Wijzig de naam van deze activiteit op het tabblad Algemeen in StoredProceduretoWriteWatermarkActivity.

  3. Verbind de groene uitvoer van de kopieeractiviteit (bij succes) met de opgeslagen procedureactiviteit.

  4. Voer op het tabblad Instellingen de volgende configuratie uit:

    • Gegevensarchieftype: Werkruimte selecteren.

    • Datawarehouse: Selecteer uw datawarehouse.

    • Naam van opgeslagen procedure: geef de opgeslagen procedure op die u hebt gemaakt in uw datawarehouse: [dbo].[ usp_write_watermark].

    • Vouw opgeslagen procedureparameters uit. Als u waarden voor de opgeslagen procedureparameters wilt opgeven, selecteert u Importeren en voert u de volgende waarden in voor de parameters:

      Name Type Weergegeven als
      LastModifiedtime Datum en tijd @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Schermopname van de configuratie van de opgeslagen procedureactiviteit.

Stap 6: Voer de pijplijn uit en bewaak het resultaat

Selecteer Uitvoeren op het tabblad Start op de bovenste balk. Selecteer Vervolgens Opslaan en uitvoeren. De pijplijn wordt uitgevoerd en u kunt de pijplijn bewaken op het tabblad Uitvoer .

Schermopname van resultaten van pijplijnuitvoering.

Ga naar uw Lakehouse, u vindt dat het gegevensbestand zich bevindt onder de map die u hebt opgegeven en u kunt het bestand selecteren om een voorbeeld van de gekopieerde gegevens te bekijken.

Schermopname van lakehouse-gegevens voor de eerste pijplijnuitvoering.

Schermopname van lakehouse-gegevensvoorbeeld voor de eerste pijplijnuitvoering.

Meer gegevens toevoegen om de incrementele kopieerresultaten weer te geven

Nadat u de eerste pijplijnuitvoering hebt voltooid, gaan we proberen om meer gegevens toe te voegen in uw datawarehouse-brontabel om te zien of deze pijplijn uw incrementele gegevens kan kopiëren.

Stap 1: Meer gegevens toevoegen aan bron

Voeg nieuwe gegevens in uw datawarehouse in door de volgende query uit te voeren:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

De bijgewerkte gegevens voor data_source_table zijn:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Stap 2: Een andere pijplijnuitvoering activeren en het resultaat bewaken

Ga terug naar uw pijplijnpagina. Selecteer op de bovenste balk opnieuw Uitvoeren onder het tabblad Start . De pijplijn wordt uitgevoerd en u kunt de pijplijn bewaken onder Uitvoer.

Ga naar uw Lakehouse, u vindt dat het nieuwe gekopieerde gegevensbestand zich bevindt onder de map die u hebt opgegeven en u kunt het bestand selecteren om een voorbeeld van de gekopieerde gegevens te bekijken. U ziet dat uw incrementele gegevens worden weergegeven in dit bestand.

Schermopname van lakehouse-gegevens voor de tweede pijplijnuitvoering.

Schermopname van lakehouse-gegevensvoorbeeld voor de tweede pijplijnuitvoering.

Ga vervolgens verder voor meer informatie over kopiëren van Azure Blob Storage naar Lakehouse.