Läs in data inkrementellt från Data Warehouse till Lakehouse
I den här självstudien får du lära dig hur du inkrementellt läser in data från Data Warehouse till Lakehouse.
Översikt
Här är det avancerade diagrammet:
Här är några viktiga steg för att skapa den här lösningen:
Markera vattenstämpelkolumnen. Välj en kolumn i källdatatabellen, som kan användas för att segmentera nya eller uppdaterade poster för varje körning. Vanligtvis ökar data i den markerade kolumnen (till exempel last_modify_time elle ID) när rader skapas eller uppdateras. Det maximala värdet i den här kolumnen används som vattenstämpel.
Förbered en tabell för att lagra det sista vattenstämpelvärdet i ditt informationslager.
Skapa en pipeline med följande arbetsflöde:
Pipelinen i den här lösningen har följande aktiviteter:
- Skapa två sökningsaktiviteter. Använd den första lookup-aktiviteten för att hämta det sista vattenstämpelvärdet. Använd den andra lookup-aktiviteten för att hämta det nya vattenstämpelvärdet. Vattenstämpelvärdena skickas till kopieringsaktiviteten.
- Skapa en kopieringsaktivitet som kopierar rader från källdatatabellen med värdet för vattenstämpelkolumnen som är större än det gamla vattenstämpelvärdet och mindre än det nya vattenstämpelvärdet. Sedan kopieras data från datalagret till Lakehouse som en ny fil.
- Skapa en lagrad proceduraktivitet som uppdaterar det sista vattenstämpelvärdet för nästa pipelinekörning.
Förutsättningar
- Informationslager. Du använder informationslagret som källdatalager. Om du inte har det kan du läsa Skapa ett informationslager för steg för att skapa ett.
- Lakehouse. Du använder Lakehouse som måldatalager. Om du inte har det kan du läsa Skapa en Lakehouse för steg för att skapa en. Skapa en mapp med namnet IncrementalCopy för att lagra kopierade data.
Förbereda källan
Här följer några tabeller och en lagrad procedur som du behöver förbereda i ditt källdatalager innan du konfigurerar den inkrementella kopieringspipelinen.
1. Skapa en datakällatabell i informationslagret
Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet data_source_table som datakälltabell. I den här självstudien använder du den som exempeldata för att göra den inkrementella kopian.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Data i datakällans tabell visas nedan:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
I den här självstudien använder du LastModifytime som vattenstämpelkolumn.
2. Skapa en annan tabell i informationslagret för att lagra det sista vattenstämpelvärdet
Kör följande SQL-kommando i informationslagret för att skapa en tabell med namnet watermarktable för att lagra det sista vattenstämpelvärdet:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Ange standardvärdet för den senaste vattenstämpeln med tabellnamnet för källdatatabellen. I den här självstudien är tabellnamnet data_source_table och standardvärdet är
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Granska data i tabellens vattenstämpeltabell.
Select * from watermarktable
Utdata:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Skapa en lagrad procedur i informationslagret
Kör följande kommando för att skapa en lagrad procedur i informationslagret. Den här lagrade proceduren används för att uppdatera det senaste vattenstämpelvärdet efter den senaste pipelinekörningen.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Konfigurera en pipeline för inkrementell kopiering
Steg 1: Skapa en pipeline
Gå till Power BI.
Välj Power BI-ikonen längst ned till vänster på skärmen och välj sedan Data factory för att öppna startsidan för Data Factory.
Gå till din Microsoft Fabric-arbetsyta.
Välj Datapipeline och ange sedan ett pipelinenamn för att skapa en ny pipeline.
Steg 2: Lägg till en uppslagsaktivitet för den senaste vattenstämpeln
I det här steget skapar du en uppslagsaktivitet för att hämta det sista vattenstämpelvärdet. Standardvärdet 1/1/2010 12:00:00 AM
som angetts innan hämtas.
Välj Lägg till pipelineaktivitet och välj Uppslag i listrutan.
Under fliken Allmänt byter du namn på aktiviteten till LookupOldWaterMarkActivity.
Utför följande konfiguration under fliken Inställningar :
- Datalagertyp: Välj Arbetsyta.
- Datalagertyp för arbetsyta: Välj Informationslager.
- Informationslager: Välj ditt informationslager.
- Använd fråga: Välj Tabell.
- Tabell: Välj dbo.watermarktable.
- Endast första raden: Markerad.
Steg 3: Lägg till en uppslagsaktivitet för den nya vattenstämpeln
I det här steget skapar du en uppslagsaktivitet för att hämta det nya vattenstämpelvärdet. Du använder en fråga för att hämta den nya vattenstämpeln från källdatatabellen. Det maximala värdet i kolumnen LastModifytime i data_source_table hämtas.
I det övre fältet väljer du Uppslag under fliken Aktiviteter för att lägga till den andra uppslagsaktiviteten.
Under fliken Allmänt byter du namn på aktiviteten till LookupNewWaterMarkActivity.
Utför följande konfiguration under fliken Inställningar :
Datalagertyp: Välj Arbetsyta.
Datalagertyp för arbetsyta: Välj Informationslager.
Informationslager: Välj ditt informationslager.
Använd fråga: Välj Fråga.
Fråga: Ange följande fråga för att välja den maximala senast ändrade tiden som den nya vattenstämpeln:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Endast första raden: Markerad.
Steg 4: Lägg till kopieringsaktiviteten för att kopiera inkrementella data
I det här steget lägger du till en kopieringsaktivitet för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln från Data Warehouse till Lakehouse.
Välj Aktiviteter i det övre fältet och välj Kopiera data –> Lägg till på arbetsytan för att hämta kopieringsaktiviteten.
Under fliken Allmänt byter du namn på den här aktiviteten till IncrementalCopyActivity.
Anslut båda sökningsaktiviteterna till kopieringsaktiviteten genom att dra den gröna knappen (Vid lyckat resultat) som är kopplad till uppslagsaktiviteterna till kopieringsaktiviteten. Släpp musknappen när du ser kantfärgen för kopieringsaktiviteten ändras till grön.
Utför följande konfiguration under fliken Källa :
Datalagertyp: Välj Arbetsyta.
Datalagertyp för arbetsyta: Välj Informationslager.
Informationslager: Välj ditt informationslager.
Använd fråga: Välj Fråga.
Fråga: Ange följande fråga för att kopiera inkrementella data mellan den senaste vattenstämpeln och den nya vattenstämpeln.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Utför följande konfiguration under fliken Mål :
- Datalagertyp: Välj Arbetsyta.
- Datalagertyp för arbetsyta: Välj Lakehouse.
- Lakehouse: Välj din Lakehouse.
- Rotmapp: Välj Filer.
- Filsökväg: Ange den mapp som du vill lagra dina kopierade data. Välj Bläddra för att välja din mapp. För filnamnet öppnar du Lägg till dynamiskt innehåll och anger
@CONCAT('Incremental-', pipeline().RunId, '.txt')
i det öppnade fönstret för att skapa filnamn för den kopierade datafilen i Lakehouse. - Filformat: Välj formattypen för dina data.
Steg 5:Lägga till en lagrad proceduraktivitet
I det här steget lägger du till en lagrad proceduraktivitet för att uppdatera det senaste vattenstämpelvärdet för nästa pipelinekörning.
Välj Aktiviteter i det övre fältet och välj Lagrad procedur för att lägga till en lagrad proceduraktivitet.
Under fliken Allmänt byter du namn på aktiviteten till StoredProceduretoWriteWatermarkActivity.
Anslut de gröna (vid lyckade) utdata för kopieringsaktiviteten till aktiviteten för lagrad procedur.
Utför följande konfiguration under fliken Inställningar :
Datalagertyp: Välj Arbetsyta.
Informationslager: Välj ditt informationslager.
Namn på lagrad procedur: Ange den lagrade procedur som du skapade i informationslagret: [dbo].[ usp_write_watermark].
Expandera parametrar för lagrad procedur. Om du vill ange värden för parametrarna för lagrad procedur väljer du Importera och anger följande värden för parametrarna:
Namn Typ Värde LastModifiedtime Datum/tid @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Steg 6:Kör pipelinen och övervaka resultatet
I det övre fältet väljer du Kör under fliken Start . Välj sedan Spara och kör. Pipelinen börjar köras och du kan övervaka pipelinen under fliken Utdata .
Gå till Lakehouse, du hittar datafilen under den mapp som du angav och du kan välja filen för att förhandsgranska kopierade data.
Lägg till mer data för att se resultatet av inkrementell kopiering
När du har slutfört den första pipelinekörningen ska vi försöka lägga till mer data i datalagrets källtabell för att se om den här pipelinen kan kopiera dina inkrementella data.
Steg 1: Lägg till mer data i källan
Infoga nya data i informationslagret genom att köra följande fråga:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Uppdaterade data för data_source_table är:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Steg 2:Utlös en annan pipelinekörning och övervaka resultatet
Gå tillbaka till pipelinesidan. I det övre fältet väljer du Kör under fliken Start igen. Pipelinen börjar köras och du kan övervaka pipelinen under Utdata.
Gå till Lakehouse, du hittar den nya kopierade datafilen under den mapp som du angav och du kan välja filen för att förhandsgranska kopierade data. Du ser att dina inkrementella data visas i den här filen.
Relaterat innehåll
Gå sedan vidare för att lära dig mer om att kopiera från Azure Blob Storage till Lakehouse.