Indlæs data trinvist fra Data Warehouse til Lakehouse
I dette selvstudium lærer du, hvordan du trinvist indlæser data fra Data Warehouse til Lakehouse.
Oversigt
Her er løsningsdiagrammet på højt niveau:
Her er de vigtige trin til at oprette denne løsning:
Vælg kolonnen vandmærke. Vælg én kolonne i kildedatatabellen, som kan bruges til at opdele de nye eller opdaterede poster for hver kørsel. Normalt bliver dataene i denne valgte kolonne (f.eks. last_modify_time eller id) stadig større, når rækker oprettes eller opdateres. Den maksimale værdi i denne kolonne bruges som et vandmærke.
Forbered en tabel til at gemme den sidste vandmærkeværdi i dit Data Warehouse.
Opret en pipeline med følgende arbejdsproces:
Pipelinen i denne løsning har følgende aktiviteter:
- Opret to opslagsaktiviteter. Brug den første opslagsaktivitet til at hente den sidste vandmærkeværdi. Brug den anden opslagsaktivitet til at hente den nye vandmærkeværdi. Disse vandmærkeværdier overføres til kopiaktiviteten.
- Opret en kopiaktivitet, der kopierer rækker fra kildedatatabellen med værdien af vandmærkekolonnen større end den gamle vandmærkeværdi og mindre end den nye vandmærkeværdi. Derefter kopieres dataene fra Data Warehouse til Lakehouse som en ny fil.
- Opret en gemt procedureaktivitet, der opdaterer den sidste vandmærkeværdi for den næste pipelinekørsel.
Forudsætninger
- Data Warehouse. Du kan bruge Data Warehouse som kildedatalager. Hvis du ikke har det, skal du se Opret et data warehouse for at få trin til at oprette et.
- Lakehouse. Du bruger Lakehouse som destinationsdatalager. Hvis du ikke har den, skal du se Opret en Lakehouse for at få trin til at oprette en. Opret en mappe med navnet IncrementalCopy for at gemme de kopierede data.
Forbereder din kilde
Her er nogle tabeller og en lagret procedure, som du skal forberede i datakildens Data Warehouse, før du konfigurerer den trinvise kopipipeline.
1. Opret en datakildetabel i dit Data Warehouse
Kør følgende SQL-kommando i Data Warehouse for at oprette en tabel med navnet data_source_table som datakildetabellen. I dette selvstudium skal du bruge den som eksempeldata til at udføre den trinvise kopi.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Dataene i datakildetabellen vises nedenfor:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
I dette selvstudium skal du bruge LastModifytime som kolonnen vandmærke.
2. Opret en anden tabel i Data Warehouse for at gemme den sidste vandmærkeværdi
Kør følgende SQL-kommando i dit Data Warehouse for at oprette en tabel med navnet watermarktable for at gemme den sidste vandmærkeværdi:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Angiv standardværdien for det sidste vandmærke med tabelnavnet på kildedatatabellen. I dette selvstudium er tabelnavnet data_source_table, og standardværdien er
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Gennemse dataene i tabellens vandmærketabel.
Select * from watermarktable
Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Opret en lagret procedure i dit data warehouse
Kør følgende kommando for at oprette en lagret procedure i dit Data Warehouse. Denne lagrede procedure bruges til at hjælpe med at opdatere den sidste vandmærkeværdi efter sidste pipelinekørsel.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Konfigurer en pipeline til trinvis kopiering
Trin 1: Opret en pipeline
Gå til Power BI.
Vælg Power BI-ikonet nederst til venstre på skærmen, og vælg derefter Datafabrik for at åbne startsiden for Data Factory.
Gå til dit Microsoft Fabric-arbejdsområde.
Vælg Datapipeline , og angiv derefter et pipelinenavn for at oprette en ny pipeline.
Trin 2: Tilføj en opslagsaktivitet for det sidste vandmærke
I dette trin skal du oprette en opslagsaktivitet for at få den sidste vandmærkeværdi. Standardværdien 1/1/2010 12:00:00 AM
, der er angivet før, hentes.
Vælg Tilføj pipelineaktivitet , og vælg Opslag på rullelisten.
Under fanen Generelt skal du omdøbe denne aktivitet til LookupOldWaterMarkActivity.
Udfør følgende konfiguration under fanen Indstillinger :
- Datalagertype: Vælg arbejdsområde.
- Datalagertype for arbejdsområde: Vælg Data Warehouse.
- Data Warehouse: Vælg dit data warehouse.
- Brug forespørgsel: Vælg tabel.
- Tabel: Vælg dbo.watermarktable.
- Kun første række: Markeret.
Trin 3: Tilføj en opslagsaktivitet for det nye vandmærke
I dette trin skal du oprette en opslagsaktivitet for at hente den nye vandmærkeværdi. Du kan bruge en forespørgsel til at hente det nye vandmærke fra din kildedatatabel. Den maksimale værdi i kolonnen LastModifytime i data_source_table hentes.
På den øverste linje skal du vælge Opslag under fanen Aktiviteter for at tilføje den anden opslagsaktivitet.
Under fanen Generelt skal du omdøbe denne aktivitet til LookupNewWaterMarkActivity.
Udfør følgende konfiguration under fanen Indstillinger :
Datalagertype: Vælg arbejdsområde.
Datalagertype for arbejdsområde: Vælg Data Warehouse.
Data Warehouse: Vælg dit data warehouse.
Brug forespørgsel: Vælg forespørgsel.
Forespørgsel: Angiv følgende forespørgsel for at vælge det maksimale tidspunkt for seneste ændring som det nye vandmærke:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Kun første række: Markeret.
Trin 4: Tilføj kopiaktiviteten for at kopiere trinvise data
I dette trin skal du tilføje en kopiaktivitet for at kopiere de trinvise data mellem det sidste vandmærke og det nye vandmærke fra Data Warehouse til Lakehouse.
Vælg Aktiviteter på den øverste linje, og vælg Kopiér data –> Føj til lærredet for at hente kopiaktiviteten.
Under fanen Generelt skal du omdøbe denne aktivitet til IncrementalCopyActivity.
Forbind begge opslagsaktiviteter til kopiaktiviteten ved at trække den grønne knap (ved vellykket handling), der er knyttet til opslagsaktiviteterne, til kopiaktiviteten. Slip museknappen, når kantfarven for kopiaktiviteten ændres til grøn.
Udfør følgende konfiguration under fanen Kilde :
Datalagertype: Vælg arbejdsområde.
Datalagertype for arbejdsområde: Vælg Data Warehouse.
Data Warehouse: Vælg dit data warehouse.
Brug forespørgsel: Vælg forespørgsel.
Forespørgsel: Angiv følgende forespørgsel for at kopiere trinvise data mellem sidste vandmærke og nyt vandmærke.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Udfør følgende konfiguration under fanen Destination :
- Datalagertype: Vælg arbejdsområde.
- Datalagertype for arbejdsområde: Vælg Lakehouse.
- Lakehouse: Vælg din Lakehouse.
- Rodmappe: Vælg filer.
-
Filsti: Angiv den mappe, du vil gemme de kopierede data i. Vælg Gennemse for at vælge din mappe. For filnavnet skal du åbne Tilføj dynamisk indhold og angive
@CONCAT('Incremental-', pipeline().RunId, '.txt')
i det åbne vindue for at oprette filnavne til den kopierede datafil i Lakehouse. - Filformat: Vælg dataformattypen.
Trin 5:Tilføj en gemt procedureaktivitet
I dette trin skal du tilføje en gemt procedureaktivitet for at opdatere den sidste vandmærkeværdi for den næste pipelinekørsel.
Vælg Aktiviteter på den øverste linje, og vælg Lagret procedure for at tilføje en gemt procedureaktivitet.
Under fanen Generelt skal du omdøbe denne aktivitet til StoredProceduretoWriteWatermarkActivity.
Forbind det grønne output (ved vellykket) af kopiaktiviteten til den gemte procedureaktivitet.
Udfør følgende konfiguration under fanen Indstillinger :
Datalagertype: Vælg arbejdsområde.
Data Warehouse: Vælg dit data warehouse.
Navn på lagret procedure: Angiv den lagrede procedure, du har oprettet i dit Data Warehouse: [dbo].[ usp_write_watermark].
Udvid Parametre for lagret procedure. Hvis du vil angive værdier for parametrene for den lagrede procedure, skal du vælge Importér og angive følgende værdier for parametrene:
Name Skriv Værdi LastModifiedtime Dato/klokkeslæt @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Trin 6:Kør pipelinen, og overvåg resultatet
Vælg Kør under fanen Hjem på den øverste linje. Vælg derefter Gem og kør. Pipelinen starter kørslen, og du kan overvåge pipelinen under fanen Output .
Gå til lakehouse, find datafilen under den mappe, du har angivet, og du kan vælge filen for at få vist de kopierede data.
Tilføj flere data for at se resultaterne af den trinvise kopi
Når du er færdig med den første pipelinekørsel, kan vi prøve at tilføje flere data i datakildetabellen i Data Warehouse for at se, om denne pipeline kan kopiere dine trinvise data.
Trin 1: Føj flere data til kilden
Indsæt nye data i dit Data Warehouse ved at køre følgende forespørgsel:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
De opdaterede data for data_source_table er:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Trin 2:Udløs en anden pipeline, og overvåg resultatet
Gå tilbage til din pipelineside. På den øverste linje skal du vælge Kør under fanen Hjem igen. Pipelinen starter kørslen, og du kan overvåge pipelinen under Output.
Gå til lakehouse, find den nye kopierede datafil under den mappe, du har angivet, og du kan vælge filen for at få vist de kopierede data. Du kan se, at dine trinvise data vises i denne fil.
Relateret indhold
Gå derefter videre for at få mere at vide om kopiér fra Azure Blob Storage til Lakehouse.