Caricare dati in modo incrementale da Data Warehouse a Lakehouse
In questa esercitazione viene mostrato come caricare in modo incrementale i dati da Data Warehouse a Lakehouse.
Panoramica
Il diagramma generale della soluzione è il seguente:
Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:
Selezionare la colonna del limite. Seleziona una colonna nella tabella dati di origine, che può essere usata per un’analisi dettagliata dei record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.
Prepara una tabella per archiviare l'ultimo valore limite nel data warehouse.
Creare una pipeline con il flusso di lavoro seguente:
La pipeline di questa soluzione contiene le attività seguenti:
- Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.
- Crea un'attività di copia per copiare le righe dalla tabella dati di origine con il valore limite della colonna maggiore del valore limite precedente e minore del nuovo valore limite. Copia quindi i dati dal data warehouse a Lakehouse come nuovo file.
- Crea un'attività stored procedure per aggiornare l’ultimo valore limite per l’esecuzione della pipeline successiva.
Prerequisiti
- Data warehouse. Uso del data warehouse come archivio dati di origine. Se non è disponibile, vedere Crea un Data warehouse per crearne uno.
- Lakehouse. Usare Lakehouse come archivio dati di destinazione. Se non è disponibile, vedere Crea un Lakehouse per crearne uno. Crea una cartella denominata IncrementalCopy per archiviare i dati copiati.
Preparazione dell’origine
Ecco alcune tabelle e stored procedure che devi preparare nel tuo data warehouse di origine prima di configurare la pipeline di copia incrementale.
1. Crea una tabella di origine dati nel data warehouse
Esegui il comando SQL seguente in Data Warehouse per creare una tabella denominata data_source_table come tabella dell'origine dati. In questa esercitazione, dovrai usarla come dati di esempio per eseguire la copia incrementale.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
I dati nella tabella dell'origine dati sono visualizzati di seguito:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
In questa esercitazione, si usa LastModifytime come colonna del valore limite.
2. Crea un'altra tabella nel data warehouse per archiviare l’ultimo valore limite
Esegui il comando SQL seguente nel data warehouse per creare una tabella denominata watermarktable in cui archiviare l’ultimo valore limite:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Impostare il valore predefinito dell’ultimo limite con il nome tabella della tabella dati di origine. In questa esercitazione, il nome della tabella è data_source_table, e il valore predefinito è
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Rivedi i dati nella tabella watermarktable.
Select * from watermarktable
Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Crea una stored procedure nel data warehouse
Esegui il comando seguente per creare una stored procedure nel data warehouse. Questa stored procedure viene usata per aggiornare l'ultimo valore limite dopo l'ultima esecuzione della pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configurare una pipeline per la copia incrementale
Passaggio 1: creare una pipeline
Accedere a Power BI.
Selezionare l'icona di Power BI nella parte in basso a sinistra dello schermo, quindi selezionare Data Factory per aprire la home page di Data Factory.
Accedere all'area di lavoro di Microsoft Fabric.
Selezionare Pipeline di dati e quindi immettere un nome di pipeline per creare una nuova pipeline.
Passaggio 2: aggiungi un'attività di ricerca per l'ultimo valore limite
In questo passaggio, si crea un'attività di ricerca per ottenere l'ultimo valore limite. Si otterrà il valore predefinito 1/1/2010 12:00:00 AM
impostato prima.
Seleziona Aggiungi attività della pipeline e seleziona Ricerca dall'elenco a discesa.
Nella scheda Generale rinomina questa attività in LookupOldWaterMarkActivity.
Nella scheda Impostazioni, esegui la configurazione seguente:
- Tipo di archivio dati: seleziona Area di lavoro.
- Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
- Data Warehouse: seleziona il data warehouse.
- Usa query: scegli Tabella.
- Tabella: scegli dbo.watermarktable.
- Solo prima riga: selezionata.
Passaggio 3: aggiungi un'attività di ricerca per il nuovo valore limite
In questo passaggio, si crea un'attività di ricerca per ottenere il nuovo valore limite. Sarà necessario usare una query per ottenere il nuovo valore limite dalla tabella dati di origine. Verrà ottenuto il valore massimo nella colonna LastModifytime in data_source_table.
Nella barra superiore, seleziona Ricerca nella scheda Attività per aggiungere la seconda attività di ricerca.
Nella scheda Generale, rinomina questa attività in LookupNewWaterMarkActivity.
Nella scheda Impostazioni, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
Data Warehouse: seleziona il data warehouse.
Usa query: scegli Query.
Query: immetti la query seguente per prendere l'ultima ora massima modifica come limite:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Solo prima riga: selezionata.
Passaggio 4: aggiungi l'attività di copia per copiare i dati incrementali
In questo passaggio si aggiunge un'attività di copia per copiare i dati incrementali tra l'ultimo limite e quello nuovo, da Data Warehouse a Lakehouse.
Seleziona Attività nella barra superiore e successivamente Copia dati ->Aggiungi all'area di lavoro per ottenere l'attività di copia.
Nella scheda Generale, rinomina questa attività in IncrementalCopyActivity.
Connettere entrambe le attività di Ricerca all'attività Copia trascinando il pulsante verde (operazione riuscita) in corrispondenza delle attività di Ricerca all'attività Copia. Rilasciare il pulsante del mouse quando il bordo dell'attività di copia diventa di colore verde.
Nella scheda Origine, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Tipo di archivio dati dell'area di lavoro: seleziona Data Warehouse.
Data Warehouse: seleziona il data warehouse.
Usa query: scegli Query.
Query: immetti la query seguente per copiare i dati incrementali tra l'ultimo limite e il nuovo limite.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Nella scheda Descrizione, esegui la configurazione seguente:
- Tipo di archivio dati: seleziona Area di lavoro.
- Tipo di archivio dati dell'area di lavoro: seleziona Lakehouse.
- Lakehouse: seleziona la tua Lakehouse.
- Cartella radice: scegli File.
- Percorso file: specifica la cartella in cui archiviare i dati copiati. Fai clic su Sfoglia per selezionare la cartella. Per il nome del file, apri Aggiungi contenuto dinamico e immetti
@CONCAT('Incremental-', pipeline().RunId, '.txt')
nella finestra aperta per creare nomi di file per il file di dati copiato in Lakehouse. - Formato file: seleziona il tipo di formato dei dati.
Passaggio 5: aggiungi un'attività stored procedure
In questo passaggio, si aggiunge un'attività stored procedure per aggiornare l'ultimo valore limite per l'esecuzione di pipeline successiva.
Seleziona Attività nella barra superiore e successivamente Stored procedure per aggiungere un'attività stored procedure.
Nella scheda Generale, rinomina questa attività in StoredProceduretoWriteWatermarkActivity.
Connetti l'output contrassegnato in verde (operazione riuscita) dell'attività di copia all'attività Stored procedure.
Nella scheda Impostazioni, esegui la configurazione seguente:
Tipo di archivio dati: seleziona Area di lavoro.
Data Warehouse: seleziona il data warehouse.
Nome stored procedure: specifica la stored procedure creata nel data warehouse: [dbo].[ usp_write_watermark].
Espandi Parametri di stored procedure. Per specificare i valori dei parametri della stored procedure, seleziona Importa e immetti i seguenti valori per i parametri:
Nome Type Valore LastModifiedtime Data/Ora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Passaggio 6: esegui la pipeline e monitora il risultato
Nella barra superiore, seleziona Esegui nella scheda Home. Seleziona quindi Salva ed esegui. La pipeline avvia l'esecuzione e potrai monitorare la pipeline nella scheda Output.
Vai a Lakehouse, dove troverai il file di dati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati.
Aggiunge altri dati per visualizzare i risultati della copia incrementale
Dopo aver completato la prima esecuzione della pipeline, prova ad aggiungere altri dati nella tabella di origine del data warehouse per vedere se questa pipeline può copiare i dati incrementali.
Passaggio 1: aggiungi altri dati all'origine
Inserisci nuovi dati nel data warehouse eseguendo la query seguente:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
I dati aggiornati per data_source_table sono:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Passaggio 2: attiva un'altra esecuzione della pipeline e monitora il risultato
Torna alla pagina della pipeline. Nella barra superiore, seleziona nuovamente Esegui nella scheda Home. La pipeline avvia l'esecuzione e puoi monitorare la pipeline in Output.
Vai a Lakehouse, dove troverai il nuovo file di dati copiati nella cartella specificata; seleziona quindi il file per visualizzare in anteprima i dati copiati. In questo file vengono visualizzati i dati incrementali.
Contenuto correlato
Passare quindi a ulteriori informazioni sulla copia da Archiviazione BLOB di Azure a Lakehouse.