Caricare i dati in modo incrementale da Database SQL di Azure ad Archiviazione BLOB di Azure con il portale di Azure
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
In questa esercitazione si crea un'istanza di Azure Data Factory con una pipeline che carica i dati differenziali di una tabella di Database SQL di Azure in archiviazione BLOB di Azure.
In questa esercitazione vengono completati i passaggi seguenti:
- Preparare l'archivio dati per l'archiviazione del valore limite.
- Creare una data factory.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare una pipeline.
- Esegui la pipeline.
- Monitorare l'esecuzione della pipeline.
- Verificare i risultati
- Aggiungere altri dati all'origine.
- Eseguire di nuovo la pipeline.
- Monitorare la seconda esecuzione della pipeline
- Esaminare i risultati della seconda esecuzione.
Panoramica
Il diagramma generale della soluzione è il seguente:
Di seguito sono descritti i passaggi fondamentali per la creazione di questa soluzione:
Selezionare la colonna del limite. Selezionare una colonna nell'archivio dati di origine, che può essere usata per analizzare approfonditamente i record nuovi o aggiornati per ogni esecuzione. I dati della colonna selezionata (ad esempio last_modify_time o ID) continuano in genere ad aumentare quando le righe vengono create o aggiornate. Il valore massimo di questa colonna viene usato come limite.
Preparare un archivio dati per l'archiviazione del valore limite. In questa esercitazione si archivia il valore limite in un database SQL.
Creare una pipeline con il flusso di lavoro seguente:
La pipeline di questa soluzione contiene le attività seguenti:
- Creare due attività di ricerca. Usare la prima attività di ricerca per recuperare l'ultimo valore limite. Usare la seconda attività di ricerca per recuperare il nuovo valore limite. Questi valori limite vengono passati all'attività di copia.
- Creare un'attività di copia che copi le righe dell'archivio dati di origine con il valore della colonna del limite maggiore del valore limite precedente e minore del nuovo valore limite. L'attività copierà quindi i dati delta dall'archivio dati di origine a un archivio BLOB come nuovo file.
- Creare un'attività stored procedure che aggiorni il valore limite per la pipeline che verrà eseguita la volta successiva.
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
- Database SQL di Azure. Usare il database come archivio dati di origine. Se non si ha un database in Database SQL di Azure, vedere la procedura per crearne uno descritta in Creare un database in Database SQL di Azure.
- Archiviazione di Azure. Usare l'archivio BLOB come archivio dati sink. Se non si ha un account di archiviazione, vedere Creare un account di archiviazione per informazioni su come crearne uno. Creare un contenitore denominato adftutorial.
Creare una tabella di origine dati nel database SQL
Aprire SQL Server Management Studio. In Esplora server fare clic con il pulsante destro del mouse sul database e scegliere Nuova query.
Eseguire questo comando SQL sul database SQL per creare una tabella denominata
data_source_table
come archivio dell'origine dati:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
In questa esercitazione si usa LastModifytime come colonna del valore limite. I dati nell'archivio dell'origine dati sono visualizzati nella tabella seguente:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Creare un'altra tabella nel database SQL per archiviare il valore del limite massimo
Eseguire questo comando SQL sul database SQL per creare una tabella denominata
watermarktable
in cui archiviare il valore limite:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Impostare il valore predefinito del limite massimo con il nome tabella dell'archivio dei dati di origine. In questa esercitazione, il nome della tabella è data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Esaminare i dati nella tabella
watermarktable
.Select * from watermarktable
Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Creare una stored procedure nel database SQL
Eseguire questo comando per creare una stored procedure nel database SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Creare una data factory
Avviare il Web browser Microsoft Edge o Google Chrome. L'interfaccia utente di Data Factory è attualmente supportata solo nei Web browser Microsoft Edge e Google Chrome.
Nel menu sinistro selezionare Crea una risorsa>Integrazione>Data factory:
Nella pagina Nuova data factory immettere ADFIncCopyTutorialDF per Nome.
Il nome dell'istanza di Azure Data Factory deve essere univoco globale. Se viene visualizzato un punto esclamativo rosso con l'errore seguente, modificare il nome della data factory, ad esempio nomeutenteADFIncCopyTutorialDF, e riprovare. Per informazioni sulle regole di denominazione per gli elementi di Data Factory, vedere l'articolo Data Factory - Regole di denominazione.
Il nome della data factory "ADFIncCopyTutorialDF" non è disponibile
Selezionare la sottoscrizione di Azure in cui creare la data factory.
Per il gruppo di risorse, eseguire una di queste operazioni:
Selezionare Usa esistentee scegliere un gruppo di risorse esistente dall'elenco a discesa.
Selezionare Crea nuovoe immettere un nome per il gruppo di risorse.
Per informazioni sui gruppi di risorse, vedere l'articolo relativo all'uso di gruppi di risorse per la gestione delle risorse di Azure.
Selezionare V2 per version.
Selezionare la località per la data factory. Nell'elenco a discesa vengono mostrate solo le località supportate. Gli archivi dati (Archiviazione di Azure, Database SQL di Azure, Istanza gestita di SQL di Azure e così via) e le risorse di calcolo (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.
Cliccare su Crea.
Al termine della creazione verrà visualizzata la pagina Data factory, come illustrato nell'immagine.
Selezionare Apri nel riquadro Apri Azure Data Factory Studio per avviare l'interfaccia utente di Azure Data Factory in una scheda separata.
Creare una pipeline
In questa esercitazione si crea una pipeline con due attività di ricerca, un'attività di copia e un'attività di stored procedure concatenate in una pipeline.
Nella home page dell'interfaccia utente di Data Factory fare clic sul riquadro Orchestrate .On the home page of Data Factory UI, click the Orchestrate tile.
Nel pannello Generale in Proprietà specificare IncrementalCopyPipeline per Nome. Comprimere quindi il pannello facendo clic sull'icona Proprietà nell'angolo in alto a destra.
Verrà ora aggiunta la prima attività di ricerca per recuperare il valore limite precedente. Nella casella degli strumenti Attività espandere Generale e trascinare l'attività Cerca nell'area di progettazione della pipeline. Modificare il nome dell'attività in LookupOldWaterMarkActivity.
Passare alla scheda Impostazioni e fare clic su + Nuovo in corrispondenza di Source Dataset (Set di dati di origine). In questo passaggio viene creato un set di dati per rappresentare i dati in watermarktable. Questa tabella contiene il limite precedente che è stato usato nella precedente operazione di copia.
Nella finestra Nuovo set di dati selezionare Database SQL di Azure e fare clic su Continua. Verrà aperta una nuova finestra per il set di dati.
Nella finestra Imposta proprietà per il set di dati immettere WatermarkDataset per Nome.
Per Servizio collegato, selezionare Nuovo e quindi seguire questa procedura:
Immettere AzureSqlDatabaseLinkedService per Nome.
Selezionare il server per Nome server.
Selezionare il Nome del database dall'elenco a discesa.
Immettere il nome utente e la password.
Per testare la connessione al database SQL di Azure, fare clic su Test connessione.
Fare clic su Fine.
Verificare che per Servizio collegato sia selezionato AzureSqlDatabaseLinkedService.
Selezionare Fine.
Nella scheda Connessione selezionare [dbo].[watermarktable] per Tabella. Se si vuole visualizzare un'anteprima dei dati nella tabella, fare clic su Anteprima dati.
Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra. Nella finestra delle proprietà per l'attività Ricerca verificare che nel campo Source Dataset (Set di dati di origine) sia selezionato WatermarkDataset.
Nella casella degli strumenti Attività espandere Generale, trascinare un'altra attività Cerca nell'area di progettazione della pipeline e impostare il nome su LookupNewWaterMarkActivity nella scheda Generale della finestra delle proprietà. Questa attività di ricerca recupera il nuovo valore limite dalla tabella con i dati di origine da copiare nella destinazione.
Nella finestra delle proprietà per la seconda attività Cerca passare alla scheda Impostazioni e fare clic su Nuovo. Si creerà un set di dati per fare riferimento alla tabella di origine contenente il nuovo valore limite, ossia il valore massimo di LastModifyTime.
Nella finestra Nuovo set di dati selezionare Database SQL di Azure e fare clic su Continua.
Nella finestra Imposta proprietà immettere SourceDataset per Nome. Selezionare AzureSqlDatabaseLinkedService per Servizio collegato.
Selezionare [dbo].[data_source_table] per Tabella. Più avanti nell'esercitazione si specificherà una query su questo set di dati. La query avrà la precedenza rispetto alla tabella specificata in questo passaggio.
Selezionare Fine.
Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra. Nella finestra delle proprietà per l'attività Cerca verificare che nel campo Source Dataset (Set di dati di origine) sia selezionato SourceDataset.
Selezionare Query per il campo Use Query (Usa query) e immettere la query seguente, con cui si seleziona solo il valore massimo di LastModifytime da data_source_table. Assicurarsi di aver selezionato anche Solo prima riga.
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Nella casella degli strumenti Attività espandere Move & Transform (Sposta e trasforma), trascinare l'attività Copia dalla casella degli strumenti Attività e impostare il nome su IncrementalCopyActivity.
Connettere entrambe le attività di ricerca all'attività di copia trascinando il pulsante verde associato alle attività di ricerca sull'attività di copia. Rilasciare il pulsante del mouse quando il bordo dell'attività di copia diventa di colore blu.
Selezionare l'attività Copia e verificare che le relative proprietà vengano visualizzate nella finestra Proprietà.
Passare alla scheda Origine nella finestra Proprietà e seguire questa procedura:
Selezionare SourceDataset nel campo Source Dataset (Set di dati di origine).
Selezionare Query per il campo Use Query (Usa query).
Immettere la query SQL seguente nel campo Query.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Passare alla scheda Sink e fare clic su + Nuovo in corrispondenza del campo Sink Dataset (Set di dati sink).
In questa esercitazione, l'archivio di dati sink è un archivio BLOB di Azure. Selezionare quindi Archiviazione BLOB di Azure e fare clic su Continua nella finestra Nuovo set di dati.
Nella pagina Select Format (Seleziona formato) selezionare il tipo di formato dei dati e fare clic su Continua.
Nella finestra Imposta proprietà immettere SinkDataset per Nome. Per Servizio collegato selezionare + Nuovo. In questo passaggio si crea una connessione all'archivio BLOB di Azure, ossia un servizio collegato.
Nella finestra New Linked Service (Azure Blog Storage) (Nuovo servizio collegato - Archiviazione BLOB di Azure), procedere come segue:
- Immettere AzureStorageLinkedService per Nome.
- Selezionare il proprio account di archiviazione di Azure per Nome account di archiviazione.
- Verificare la connessione e quindi fare clic su Fine.
Nella finestra Imposta proprietà verificare che per Servizio collegato sia selezionato AzureStorageLinkedService. Quindi selezionare Fine.
Passare alla scheda Connessione di SinkDataset e seguire questa procedura:
- Per il campo Percorso file, immettere adftutorial/incrementalcopy. adftutorial è il nome del contenitore BLOB e incrementalcopy è il nome della cartella. Questo frammento di codice presuppone che nell'archivio BLOB sia presente un contenitore BLOB denominato adftutorial. Creare il contenitore se non esiste oppure impostare il nome di un contenitore esistente. Se non esiste, la cartella di output incrementalcopy viene creata automaticamente da Azure Data Factory. È anche possibile usare il pulsante Sfoglia per Percorso file per passare a una cartella in un contenitore BLOB.
- Per la parte File del campo Percorso file, fare clic su Aggiungi contenuto dinamico [AL+P] e quindi immettere
@CONCAT('Incremental-', pipeline().RunId, '.txt')
nella finestra che viene aperta. Quindi selezionare Fine. Il nome file viene generato in modo dinamico usando l'espressione. Ogni esecuzione della pipeline ha un ID univoco, che viene usato dall'attività di copia per generare il nome file.
Passare all'editor di pipeline facendo clic sulla scheda della pipeline in alto oppure sul nome della pipeline nella visualizzazione albero a sinistra.
Nella casella degli strumenti Attività espandere Generale e trascinare l'attività Stored procedure dalla casella degli strumenti Attività all'area di progettazione della pipeline. Connettere l'output contrassegnato in verde (come operazione riuscita) dell'attività Copia all'attività Stored procedure.
Selezionare l'attività Stored procedure nella finestra di progettazione della pipeline e modificarne il nome in StoredProceduretoWriteWatermarkActivity.
Passare alla scheda Account SQL e selezionare AzureSqlDatabaseLinkedService per Servizio collegato.
Passare alla scheda Stored procedure e seguire questa procedura:
In Nome stored procedure selezionare usp_write_watermark.
Per specificare i valori dei parametri della stored procedure, fare clic su Import parameter (Importa parametro) e immettere i valori seguenti per i parametri:
Nome Type Valore LastModifiedtime Data/Ora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Per convalidare le impostazioni della pipeline, fare clic su Convalida sulla barra degli strumenti. Verificare che non siano presenti errori di convalida. Per chiudere la finestra Report di convalida della pipeline, fare clic su >>.
Pubblicare le entità (servizi collegati, set di dati e pipeline) nel servizio Azure Data Factory selezionando il pulsante Pubblica tutti. Attendere fino alla visualizzazione del messaggio che informa che la pubblicazione è riuscita.
Attivare un'esecuzione della pipeline
Fare clic su Aggiungi trigger sulla barra degli strumenti e quindi su Trigger Now (Attiva adesso).
Nella finestra Pipeline Run (Esecuzione di pipeline) selezionare Fine.
Monitorare l'esecuzione della pipeline
Passare alla scheda Monitoraggio a sinistra. Verrà visualizzato lo stato dell'esecuzione della pipeline attivata da un trigger manuale. È possibile usare i collegamenti nella colonna NOME PIPELINE per visualizzare i dettagli dell'esecuzione ed eseguire di nuovo la pipeline.
Per visualizzare le esecuzioni di attività associate all'esecuzione della pipeline, selezionare il collegamento sotto la colonna NOME PIPELINE. Per i dettagli sulle esecuzioni di attività, selezionare il collegamento Dettagli (icona degli occhiali) sotto la colonna NOME ATTIVITÀ. Per tornare alla visualizzazione Esecuzioni della pipeline, selezionare Tutte le esecuzioni di pipeline in alto. Per aggiornare la visualizzazione, selezionare Aggiorna.
Esaminare i risultati
Connettersi all'account di archiviazione di Azure usando strumenti come Azure Storage Explorer. Verificare che sia stato creato un file di output nella cartella incrementalcopy del contenitore adftutorial.
Aprire il file di output. Come si può notare, tutti i dati sono stati copiati da data_source_table al file BLOB.
1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
Verificare il valore più recente da
watermarktable
. Si noterà che il valore limite è stato aggiornato.Select * from watermarktable
Ecco l'output:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-05 8:06:00.000 |
Aggiungere altri dati all'origine
Inserire nuovi dati nel database (archivio dell'origine dati).
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
I dati aggiornati nel database sono i seguenti:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Attivare un'altra esecuzione della pipeline
Passare alla scheda Modifica . Fare clic sulla pipeline nella visualizzazione albero se non è aperta nella finestra di progettazione.
Fare clic su Aggiungi trigger sulla barra degli strumenti e quindi su Trigger Now (Attiva adesso).
Monitorare la seconda esecuzione della pipeline
Passare alla scheda Monitoraggio a sinistra. Verrà visualizzato lo stato dell'esecuzione della pipeline attivata da un trigger manuale. È possibile usare i collegamenti nella colonna NOME PIPELINE per visualizzare i dettagli delle attività ed eseguire di nuovo la pipeline.
Per visualizzare le esecuzioni di attività associate all'esecuzione della pipeline, selezionare il collegamento sotto la colonna NOME PIPELINE. Per i dettagli sulle esecuzioni di attività, selezionare il collegamento Dettagli (icona degli occhiali) sotto la colonna NOME ATTIVITÀ. Per tornare alla visualizzazione Esecuzioni della pipeline, selezionare Tutte le esecuzioni di pipeline in alto. Per aggiornare la visualizzazione, selezionare Aggiorna.
Verificare il secondo output
Nell'archivio BLOB è stato creato un altro file. In questa esercitazione, il nome del nuovo file è
Incremental-<GUID>.txt
. Aprendo il file si noteranno due righe di record.6,newdata,2017-09-06 02:23:00.0000000 7,newdata,2017-09-07 09:01:00.0000000
Verificare il valore più recente da
watermarktable
. Si noterà che il valore limite è stato aggiornato di nuovo.Select * from watermarktable
Output di esempio:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-07 09:01:00.000 |
Contenuto correlato
In questa esercitazione sono stati eseguiti i passaggi seguenti:
- Preparare l'archivio dati per l'archiviazione del valore limite.
- Creare una data factory.
- Creare servizi collegati.
- Creare i set di dati di origine, sink e limite.
- Creare una pipeline.
- Esegui la pipeline.
- Monitorare l'esecuzione della pipeline.
- Verificare i risultati
- Aggiungere altri dati all'origine.
- Eseguire di nuovo la pipeline.
- Monitorare la seconda esecuzione della pipeline
- Esaminare i risultati della seconda esecuzione.
In questa esercitazione la pipeline ha copiato dati da una singola tabella di Database SQL ad Archiviazione BLOB. Passare all'esercitazione successiva per informazioni sulla copia di dati da più tabelle di un database di SQL Server a un database SQL.