Condividi tramite


Trasformazione sink nel flusso di dati di mapping

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!

I flussi di dati sono disponibili nelle pipeline sia di Azure Data Factory che di Azure Synapse. Questo articolo si applica ai flussi di dati per mapping. Se non si ha esperienza con le trasformazioni, vedere l'articolo introduttivo Trasformare i dati con un flusso di dati per mapping.

Dopo aver completato la trasformazione dei dati, scriverli in un archivio di destinazione usando la trasformazione sink. Ogni flusso di dati richiede almeno una trasformazione sink, ma è possibile scrivere in tutti i sink necessari per completare il flusso di trasformazione. Per scrivere in sink aggiuntivi, creare nuovi flussi tramite nuovi rami e divisioni condizionali.

Ogni trasformazione sink è associata esattamente a un oggetto set di dati o a un servizio collegato. La trasformazione Sink determina la forma e la posizione dei dati in cui scrivere.

Set di dati inline

Quando si crea una trasformazione sink, scegliere se le informazioni sul sink vengono definite all'interno di un oggetto set di dati o all'interno della trasformazione sink. La maggior parte dei formati è disponibile solo in uno o nell'altro. Per informazioni su come usare un connettore specifico, vedere il documento del connettore appropriato.

Quando un formato è supportato sia inline che in un oggetto set di dati, esistono vantaggi per entrambi. Gli oggetti set di dati sono entità riutilizzabili che possono essere usate in altri flussi di dati e attività, ad esempio Copy. Queste entità riutilizzabili sono particolarmente utili quando si usa uno schema con protezione avanzata. I set di dati non sono basati su Spark. In alcuni casi, potrebbe essere necessario eseguire l'override di determinate impostazioni o proiezione dello schema nella trasformazione sink.

I set di dati inline sono consigliati quando si usano schemi flessibili, istanze di sink uno-off o sink con parametri. Se il sink è fortemente parametrizzato, i set di dati inline consentono di non creare un oggetto "fittizio". I set di dati inline sono basati su Spark e le relative proprietà sono native del flusso di dati.

Per usare un set di dati inline, selezionare il formato desiderato nel selettore Tipo sink. Anziché selezionare un set di dati sink, selezionare il servizio collegato a cui connettersi.

Screenshot che mostra l'opzione Inline selezionata.

Database dell'area di lavoro (solo aree di lavoro di Synapse)

Quando si usano flussi di dati nelle aree di lavoro di Azure Synapse, è possibile eseguire il sink dei dati direttamente in un tipo di database all'interno dell'area di lavoro di Synapse. In questo modo sarà possibile ridurre la necessità di aggiungere servizi o set di dati collegati per tali database. I database creati tramite i modelli di database di Azure Synapse sono accessibili anche quando si seleziona Database dell'area di lavoro.

Nota

Il connettore del database dell'area di lavoro di Azure Synapse è attualmente in anteprima pubblica e può funzionare solo con i database Spark Lake in questo momento

Screenshot che mostra il database dell'area di lavoro selezionato.

Tipi di sink supportati

Il flusso di dati di mapping segue un approccio di estrazione, caricamento e trasformazione (ELT) e funziona con i set di dati di staging tutti in Azure. Attualmente, i set di dati seguenti possono essere usati in una trasformazione sink.

Connector Formato Set di dati/inline
Archiviazione BLOB di Azure Avro
Testo delimitato
Delta
JSON
ORC
Parquet
✓/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Azure Cosmos DB for NoSQL ✓/-
Azure Data Lake Storage Gen1 Avro
Testo delimitato
JSON
ORC
Parquet
✓/-
✓/-
✓/-
✓/✓
✓/-
Azure Data Lake Storage Gen2 Avro
Common Data Model
Testo delimitato
Delta
JSON
ORC
Parquet
✓/✓
-/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Database di Azure per MySQL ✓/✓
Database di Azure per PostgreSQL ✓/✓
Esplora dati di Azure ✓/✓
Database SQL di Azure ✓/✓
Istanza gestita di database SQL di Azure ✓/-
Azure Synapse Analytics ✓/-
Dataverse ✓/✓
Dynamics 365 ✓/✓
Dynamics CRM ✓/✓
Fabric Lakehouse ✓/✓
SFTP Avro
Testo delimitato
JSON
ORC
Parquet
✓/✓
✓/✓
✓/✓
✓/✓
✓/✓
Snowflake ✓/✓
SQL Server ✓/✓

Le impostazioni specifiche di questi connettori si trovano nella scheda Impostazioni . Gli esempi di script del flusso di dati e informazioni su queste impostazioni si trovano nella documentazione del connettore.

Il servizio ha accesso a più di 90 connettori nativi. Per scrivere dati in tali altre origini dal flusso di dati, usare l'attività di copia per caricare tali dati da un sink supportato.

Impostazioni sink

Dopo aver aggiunto un sink, configurare tramite la scheda Sink . Qui è possibile selezionare o creare il set di dati in cui scrive il sink. I valori di sviluppo per i parametri del set di dati possono essere configurati nelle impostazioni di debug. La modalità di debug deve essere attivata.

Il video seguente illustra diverse opzioni di sink per i tipi di file delimitati da testo.

Screenshot che mostra le impostazioni sink.

Deriva dello schema: la deriva dello schema è la possibilità del servizio di gestire in modo nativo schemi flessibili nei flussi di dati senza dover definire in modo esplicito le modifiche alle colonne. Abilitare Consenti deriva dello schema di scrivere colonne aggiuntive oltre a quanto definito nello schema dei dati sink.

Convalida schema: se è selezionato lo schema di convalida, il flusso di dati avrà esito negativo se una colonna nella proiezione sink non viene trovata nell'archivio sink o se i tipi di dati non corrispondono. Usare questa impostazione per imporre che lo schema sink soddisfi il contratto della proiezione definita. È utile negli scenari di sink del database per segnalare che i nomi o i tipi di colonna sono stati modificati.

Sink della cache

Un sink di cache è quando un flusso di dati scrive i dati nella cache spark anziché in un archivio dati. Nei flussi di dati di mapping è possibile fare riferimento a questi dati all'interno dello stesso flusso più volte usando una ricerca nella cache. Ciò è utile quando si desidera fare riferimento ai dati come parte di un'espressione, ma non si vuole unire in modo esplicito le colonne. Esempi comuni in cui un sink della cache può essere utile per cercare un valore massimo in un archivio dati e trovare codici di errore corrispondenti a un database di messaggi di errore.

Per scrivere in un sink della cache, aggiungere una trasformazione sink e selezionare Cache come tipo di sink. A differenza di altri tipi di sink, non è necessario selezionare un set di dati o un servizio collegato perché non si sta scrivendo in un archivio esterno.

Selezionare il sink della cache

Nelle impostazioni del sink è possibile specificare facoltativamente le colonne chiave del sink della cache. Questi vengono usati come condizioni di corrispondenza quando si usa la lookup() funzione in una ricerca nella cache. Se si specificano colonne chiave, non è possibile usare la outputs() funzione in una ricerca nella cache. Per altre informazioni sulla sintassi di ricerca della cache, vedere Ricerche memorizzate nella cache.

Colonne chiave sink della cache

Ad esempio, se si specifica una singola colonna chiave di in un sink della column1 cache denominato cacheExample, la chiamata cacheExample#lookup() avrebbe un parametro che specifica quale riga nel sink della cache deve corrispondere. La funzione restituisce una singola colonna complessa con sottocolumni per ogni colonna mappata.

Nota

Un sink della cache deve trovarsi in un flusso di dati completamente indipendente da qualsiasi trasformazione che vi fa riferimento tramite una ricerca nella cache. Un sink della cache deve anche essere il primo sink scritto.

Scrivere nell'output dell'attività
Il sink cache può facoltativamente scrivere i dati nell'output dell'attività Flusso di dati che può quindi essere usata come input per un'altra attività nella pipeline. In questo modo è possibile passare rapidamente e facilmente i dati dall'attività del flusso di dati senza dover rendere persistenti i dati in un archivio dati.

Si noti che l'output di Flusso di dati inserito direttamente nella pipeline è limitato a 2 MB. Pertanto, Flusso di dati tenterà di aggiungere all'output il maggior numero di righe che può rimanere entro il limite di 2 MB, pertanto a volte potrebbe non essere possibile visualizzare tutte le righe nell'output dell'attività. L'impostazione di "Solo prima riga" a livello di attività Flusso di dati consente anche di limitare l'output dei dati da Flusso di dati, se necessario.

Metodo di aggiornamento

Per i tipi di sink di database, la scheda Impostazioni includerà una proprietà "Metodo di aggiornamento". Il valore predefinito è insert, ma include anche le opzioni della casella di controllo per l'aggiornamento, l'upsert e l'eliminazione. Per utilizzare queste opzioni aggiuntive, è necessario aggiungere una trasformazione Alter Row prima del sink. L'istruzione Alter Row consente di definire le condizioni per ognuna delle azioni del database. Se l'origine è un'origine di abilitazione cdc nativa, è possibile impostare i metodi di aggiornamento senza alter row perché ADF è già a conoscenza dei marcatori di riga per inserimento, aggiornamento, upsert ed eliminazione.

Mapping dei campi

Analogamente a una trasformazione di selezione, nella scheda Mapping del sink è possibile decidere quali colonne in ingresso verranno scritte. Per impostazione predefinita, viene eseguito il mapping di tutte le colonne di input, incluse le colonne deviate. Questo comportamento è noto come mapping automatico.

Quando si disattiva il mapping automatico, è possibile aggiungere mapping fissi basati su colonne o mapping basati su regole. Con i mapping basati su regole, è possibile scrivere espressioni con criteri di ricerca. Il mapping corretto esegue il mapping dei nomi di colonne logiche e fisiche. Per altre informazioni sul mapping basato su regole, vedere Modelli di colonna nel flusso di dati di mapping.

Ordinamento personalizzato dei sink

Per impostazione predefinita, i dati vengono scritti in più sink in un ordine non deterministico. Il motore di esecuzione scrive i dati in parallelo quando viene completata la logica di trasformazione e l'ordinamento del sink può variare a ogni esecuzione. Per specificare un ordine di sink esatto, abilitare Ordinamento sink personalizzato nella scheda Generale del flusso di dati. Se abilitata, i sink vengono scritti in sequenza in ordine crescente.

Screenshot che mostra l'ordinamento del sink personalizzato.

Nota

Quando si usano ricerche memorizzate nella cache, assicurarsi che l'ordinamento del sink abbia i sink memorizzati nella cache impostati su 1, il valore più basso (o primo) nell'ordinamento.

Ordinamento personalizzato dei sink

Gruppi di sink

È possibile raggruppare i sink applicando lo stesso numero di ordine per una serie di sink. Il servizio considererà tali sink come gruppi che possono essere eseguiti in parallelo. Le opzioni per l'esecuzione parallela verranno visualizzate nell'attività del flusso di dati della pipeline.

Errori

Nella scheda Errori sink è possibile configurare la gestione delle righe degli errori per acquisire e reindirizzare l'output per gli errori del driver di database e le asserzioni non riuscite.

Durante la scrittura nei database, alcune righe di dati potrebbero non riuscire a causa di vincoli impostati dalla destinazione. Per impostazione predefinita, un'esecuzione del flusso di dati avrà esito negativo al primo errore che riceve. In alcuni connettori è possibile scegliere Continua in caso di errore che consente il completamento del flusso di dati anche se sono presenti errori nelle singole righe. Attualmente, questa funzionalità è disponibile solo in database SQL di Azure e Azure Synapse. Per altre informazioni, vedere Gestione delle righe degli errori nel database SQL di Azure.

Di seguito è riportata un'esercitazione video su come usare la gestione automatica delle righe degli errori del database nella trasformazione sink.

Per le righe di errore di asserzione, è possibile usare la trasformazione Assert upstream nel flusso di dati e quindi reindirizzare le asserzioni non riuscite a un file di output nella scheda Errori sink. È anche possibile ignorare le righe con errori di asserzione e non restituire tali righe nell'archivio dati di destinazione sink.

Righe di errore di asserzione

Anteprima dei dati nel sink

Quando si recupera un'anteprima dei dati in modalità di debug, nel sink non verranno scritti dati. Verrà restituito uno snapshot dell'aspetto dei dati, ma non verrà scritto nulla nella destinazione. Per testare la scrittura di dati nel sink, eseguire un debug della pipeline dall'area di disegno della pipeline.

Script del flusso di dati

Esempio

Di seguito è riportato un esempio di trasformazione sink e del relativo script del flusso di dati:

sink(input(
		movie as integer,
		title as string,
		genres as string,
		year as integer,
		Rating as integer
	),
	allowSchemaDrift: true,
	validateSchema: false,
	deletable:false,
	insertable:false,
	updateable:true,
	upsertable:false,
	keys:['movie'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	saveOrder: 1,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

Dopo aver creato il flusso di dati, aggiungere un'attività del flusso di dati alla pipeline.