Copiare e trasformare i dati in Snowflake usando Azure Data Factory o Azure Synapse Analytics
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
Questo articolo illustra come usare l'attività di copia nelle pipeline di Azure Data Factory e Azure Synapse per copiare dati da e in Snowflake e usare Flusso di dati per trasformare i dati in Snowflake. Per altre informazioni, vedere l'articolo introduttivo per Data Factory o Azure Synapse Analytics.
Importante
Il nuovo connettore Snowflake offre un supporto nativo migliorato per Snowflake. Se si usa il connettore Snowflake legacy nella soluzione, è consigliabile aggiornare il connettore Snowflake alla prima praticità. Per informazioni dettagliate sulla differenza tra la versione legacy e quella più recente, vedere questa sezione.
Funzionalità supportate
Questo connettore Snowflake è supportato per le funzionalità seguenti:
Funzionalità supportate | IR |
---|---|
Attività di copia (origine/sink) | (1) (2) |
Flusso di dati per mapping (origine/sink) | (1) |
Attività Lookup | (1) (2) |
Attività script | (1) (2) |
① Azure Integration Runtime ② Runtime di integrazione self-hosted
Per l'attività di copia, questo connettore Snowflake supporta le funzioni seguenti:
- Copiare dati da Snowflake che usa il comando COPIA di Snowflake in [posizione] per ottenere prestazioni ottimali.
- Copiare i dati in Snowflake che sfrutta il comando COPIA di Snowflake in [tabella] per ottenere prestazioni ottimali. Supporta Snowflake in Azure.
- Se è necessario un proxy per connettersi a Snowflake da un runtime di integrazione self-hosted, è necessario configurare le variabili di ambiente per HTTP_PROXY e HTTPS_PROXY nell'host del runtime di integrazione.
Prerequisiti
Se l'archivio dati si trova all'interno di una rete locale, una rete virtuale di Azure o un cloud privato virtuale di Amazon, è necessario configurare un runtime di integrazione self-hosted per connettersi. Assicurarsi di aggiungere gli indirizzi IP usati dal runtime di integrazione self-hosted all'elenco consentito.
Se l'archivio dati è un servizio dati del cloud gestito, è possibile usare Azure Integration Runtime. Se l'accesso è limitato solo agli indirizzi IP approvati nelle regole del firewall, è possibile aggiungere gli IP di Azure Integration Runtime nell'elenco di quelli consentiti.
L'account Snowflake usato per Source o Sink deve avere l'accesso necessario USAGE
per il database e l'accesso in lettura/scrittura allo schema e alle tabelle/viste al suo interno. Inoltre, deve avere anche CREATE STAGE
sullo schema per poter creare la fase esterna con l'URI di firma di accesso condiviso.
I valori delle proprietà account seguenti devono essere impostati
Proprietà | Descrizione | Richiesto | Valore predefinito |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Specifica se richiedere un oggetto di integrazione dell'archiviazione come credenziali cloud durante la creazione di una fase esterna denominata (usando CREA FASE) per accedere a un percorso di archiviazione cloud privato. | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Specifica se richiedere l'uso di una fase esterna denominata che fa riferimento a un oggetto di integrazione dell'archiviazione come credenziali cloud durante il caricamento o lo scaricamento di dati in un percorso di archiviazione cloud privato. | FALSE | FALSE |
Per altre informazioni sui meccanismi di sicurezza di rete e sulle opzioni supportate da Data Factory, vedere strategie di accesso ai dati.
Operazioni preliminari
Per eseguire l'attività di copia con una pipeline, è possibile usare uno degli strumenti o SDK seguenti:
- Strumento Copia dati
- Il portale di Azure
- .NET SDK
- SDK di Python
- Azure PowerShell
- API REST
- Modello di Azure Resource Manager
Creare un servizio collegato a Snowflake usando l'interfaccia utente
Usare la procedura seguente per creare un servizio collegato a Snowflake nell'interfaccia utente del portale di Azure.
Passare alla scheda Gestisci nell'area di lavoro di Azure Data Factory o Synapse e selezionare Servizi collegati, quindi fare clic su Nuovo:
Cercare Snowflake e selezionare il connettore Snowflake.
Configurare i dettagli del servizio, testare la connessione e creare il nuovo servizio collegato.
Dettagli di configurazione del connettore
Le sezioni seguenti forniscono informazioni dettagliate sulle proprietà che definiscono entità specifiche di un connettore Snowflake.
Proprietà del servizio collegato
Queste proprietà generiche sono supportate per il servizio collegato Snowflake:
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà type deve essere impostata su SnowflakeV2. | Sì |
accountIdentifier | Nome dell'account insieme alla relativa organizzazione. Ad esempio, myorg-account123. | Sì |
database | Database predefinito utilizzato per la sessione dopo la connessione. | Sì |
warehouse | Il warehouse virtuale predefinito usato per la sessione dopo la connessione. | Sì |
authenticationType | Tipo di autenticazione usato per connettersi al servizio Snowflake. I valori consentiti sono: Basic (Default) e KeyPair. Per altre proprietà ed esempi su ogni valore, vedere le sezioni corrispondenti di seguito. | No |
ruolo | Ruolo di sicurezza predefinito usato per la sessione dopo la connessione. | No |
host | Nome host dell'account Snowflake. Ad esempio: contoso.snowflakecomputing.com . .cn è supportato anche. |
No |
connectVia | Il runtime di integrazione usato per connettersi all'archivio dati. È possibile usare Azure Integration Runtime o un runtime di integrazione self-hosted (se l'archivio dati si trova in una rete privata). Se non specificato, viene usato il runtime di integrazione di Azure predefinito. | No |
Il connettore Snowflake supporta i tipi di autenticazione seguenti. Per informazioni dettagliate, vedere le sezioni corrispondenti.
Autenticazione di base
Per usare l’autenticazione Basic, oltre alle proprietà generiche descritte nella sezione precedente, specificare le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
utente | Nome di accesso per l'utente Snowflake. | Sì |
password | Password per l'utente Snowflake. Contrassegnare questo campo come tipo SecureString per archiviarlo in modo sicuro. È anche possibile fare riferimento a un segreto archiviato in Azure Key Vault. | Sì |
Esempio:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Password in Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Nota
I flussi di dati di mapping supportano solo l'autenticazione di base.
Autenticazione della coppia di chiavi SSH
Per usare l'autenticazione della coppia di chiavi, è necessario configurare e creare un utente di autenticazione della coppia di chiavi in Snowflake facendo riferimento all'autenticazione della coppia di chiavi e alla rotazione della coppia di chiavi. Successivamente, prendere nota della chiave privata e della passphrase (facoltativa), che si usa per definire il servizio collegato.
Oltre alle proprietà generiche descritte nella sezione precedente, specificare le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
utente | Nome di accesso per l'utente Snowflake. | Sì |
privateKey | Chiave privata usata per l'autenticazione della coppia di chiavi. Per assicurarsi che la chiave privata sia valida quando viene inviata ad Azure Data Factory e considerando che il file privateKey include caratteri di nuova riga (\n), è essenziale formattare correttamente il contenuto privateKey nel formato letterale stringa. Questo processo comporta l'aggiunta esplicita di \n a ogni nuova riga. |
Sì |
privateKeyPassphrase | Passphrase usata per decrittografare la chiave privata, se crittografata. | No |
Esempio:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Proprietà del set di dati
Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione dei set di dati, vedere l'articolo Set di dati.
Per il set di dati Snowflake sono supportate le proprietà seguenti.
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà type del set di dati deve essere impostata su SnowflakeV2Table. | Sì |
schema | Nome dello schema. Si noti che il nome dello schema fa distinzione tra maiuscole e minuscole. | No per l'origine, Sì per il sink |
table | Nome della tabella/vista. Si noti che il nome della tabella fa distinzione tra maiuscole e minuscole. | No per l'origine, Sì per il sink |
Esempio:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Proprietà dell'attività di copia
Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione delle attività, vedere l'articolo sulle pipeline. In questa sezione viene fornito un elenco delle proprietà supportate dall'origine e dal sink Snowflake.
Snowflake come fonte
Il connettore Snowflake usa il comando COPIA di Snowflake in [posizione] per ottenere prestazioni ottimali.
Se l'archivio dati sink e il formato sono supportati in modo nativo dal comando Snowflake COPY, è possibile usare l'attività Copy per copiare direttamente da Snowflake al sink. Per informazioni dettagliate, vedere Copia diretta da Snowflake. In caso contrario, usare la copia a fasi predefinita da Snowflake.
Per copiare dati da Snowflake, nella sezione Origine attività di copia sono supportate le proprietà seguenti.
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà Tipo dell'origine dell'attività di copia deve essere impostata su SnowflakeV2Source. | Sì |
query | Specifica la query SQL per leggere i dati da Snowflake. Se i nomi dello schema, della tabella e delle colonne contengono lettere minuscole, virgolette l'identificatore dell'oggetto nella query, ad esempio select * from "schema"."myTable" .L'esecuzione di stored procedure non è supportata. |
No |
exportSettings | Impostazioni avanzate usate per recuperare i dati da Snowflake. È possibile configurare quelli supportati dal comando COPY nel quale il servizio passerà quando si richiama l'istruzione. | Sì |
In exportSettings : |
||
type | Tipo di comando di esportazione, impostato su SnowflakeExportCopyCommand. | Sì |
storageIntegration | Specificare il nome dell'integrazione di archiviazione creata in Snowflake. Per i passaggi preliminari dell'uso dell'integrazione dell'archiviazione, vedere Configurazione di un'integrazione dell'archiviazione Snowflake. | No |
additionalCopyOptions | Opzioni di copia aggiuntive, fornite come dizionario di coppie chiave-valore. Esempi: MAX_FILE_SIZE, OVERWRITE. Per altre informazioni, vedere Opzioni di copia Snowflake. | No |
additionalFormatOptions | Opzioni di formato di file aggiuntive fornite al comando COPY come dizionario di coppie chiave-valore. Esempi: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Per altre informazioni, vedere Opzioni del tipo di formato Snowflake. | No |
Nota
Assicurarsi di disporre dell'autorizzazione per eseguire il comando seguente e accedere allo schema INFORMATION_SCHEMA e alla tabella COLUMNS.
COPY INTO <location>
Copia diretta da Snowflake
Se l'archivio dati sink e il formato soddisfano i criteri descritti in questa sezione, è possibile usare l'attività Copia per copiare direttamente da Snowflake al sink. Il servizio controlla le impostazioni e, se i criteri seguenti non vengono soddisfatti, l'esecuzione dell'attività di copia non riesce:
Quando si specifica
storageIntegration
nell'origine:L'archivio dati sink è l'archivio BLOB di Azure a cui si fa riferimento nella fase esterna in Snowflake. Prima di copiare i dati, è necessario completare i passaggi seguenti:
Creare un servizio collegato archiviazione BLOB di Azure per l'archiviazione BLOB di Azure sink con qualsiasi tipo di autenticazione supportato.
Concedere almeno il ruolo collaboratore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'archiviazione BLOB di Azure sink controllo di accesso (IAM).
Quando non si specifica
storageIntegration
nell'origine:Il servizio collegato sink è Archiviazione BLOB di Azure con autenticazione della firma di accesso condiviso. Se si vuole copiare direttamente i dati in Azure Data Lake Storage Gen2 nel formato supportato seguente, è possibile creare un servizio collegato di Archiviazione BLOB di Azure con l'autenticazione SAS per l'account Azure Data Lake Storage Gen2, per evitare di usare la copia di staging da Snowflake.
Il formato di dati sink è Parquet, testo delimitatoo JSON con le configurazioni seguenti:
- Per il formato Parquet , il codec di compressione è Nessuno, Snappy o Lzo.
- Per il formato testo delimitato:
rowDelimiter
è \r\n, o qualsiasi singolo carattere.compression
può essere nessuna compressione, gzip, bzip2 o deflate.encodingName
è impostato sul valore predefinito o su utf-8.quoteChar
è virgolette doppie, virgolette singole o stringa vuota (senza virgolette).
- Per il formato JSON, la copia diretta supporta solo il caso in cui il risultato della tabella Snowflake o della query di origine abbia solo una singola colonna e il tipo di dati di questa colonna sia VARIANT, OBJECT o ARRAY.
compression
può essere nessuna compressione, gzip, bzip2 o deflate.encodingName
è impostato sul valore predefinito o su utf-8.filePattern
nel sink dell'attività di copia viene lasciato come predefinito o impostato su setOfObjects.
Nell'origine dell'attività di copia
additionalColumns
non è specificato.Il mapping delle colonne non è specificato.
Esempio:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Copia di staging da Snowflake
Quando l'archivio dati sink o il formato non è compatibile in modo nativo con il comando Snowflake COPIA, come indicato nell'ultima sezione, abilitare la copia a fasi predefinita usando un'istanza di archiviazione BLOB di Azure temporanea. La funzionalità copia di staging assicura inoltre una migliore velocità effettiva. Il servizio esporta i dati da Snowflake nell'archiviazione di staging, quindi copia i dati nel sink e infine pulisce i dati temporanei dall'archiviazione di staging. Per informazioni dettagliate sulla copia dei dati tramite una gestione temporanea, vedere Copia di staging.
Per usare questa funzionalità, creare un servizio collegato di archiviazione BLOB di Azure che fa riferimento all'account di archiviazione di Azure come staging provvisorio. Specificare quindi le proprietà enableStaging
e stagingSettings
nell'attività Copia.
Quando si specifica
storageIntegration
nell'origine, l'archiviazione BLOB di Azure temporanea di staging deve essere quella a cui si fa riferimento nella fase esterna in Snowflake. Assicurarsi di creare un servizio collegato archiviazione BLOB di Azure con qualsiasi autenticazione supportata e concedere almeno un ruolo collaboratore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'entità servizio di gestione temporanea di Archiviazione BLOB di Azure controllo di accesso (IAM).Quando non si specifica
storageIntegration
nell'origine, il servizio collegato Archiviazione BLOB di Azure di staging deve usare l'autenticazione della firma di accesso condiviso, come richiesto dal comando Snowflake COPY. Assicurarsi di concedere l'autorizzazione di accesso appropriata a Snowflake nell'archiviazione BLOB di Azure di staging. Per altre informazioni, vedere questo articolo.
Esempio:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Quando si esegue una copia di staging da Snowflake, è fondamentale impostare il comportamento di copia sink su Unisci file. Questa impostazione garantisce che tutti i file partizionati vengano gestiti e uniti correttamente, impedendo che si verifichi un problema dovuto alla sola copia dell'ultimo file partizionato.
Configurazione di esempio
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Nota
Se non si imposta il comportamento di copia sink su Unisci file, è possibile che venga copiato solo l'ultimo file partizionato.
Snowflake come sink
Il connettore Snowflake usa il comando COPIA in [tabella] di Snowflake per ottenere prestazioni ottimali. Supporta la scrittura di dati in Snowflake in Azure.
Se l'archivio dati e il formato di origine sono supportati in modo nativo dal comando COPIA di Snowflake, è possibile usare l'attività Copia per copiare direttamente dall'origine a Snowflake. Per informazioni dettagliate, vedere Copia diretta in Snowflake. In caso contrario, usare la copia a fasi predefinita in Snowflake.
Per copiare i dati in Snowflake, nella sezione sink dell’attività di copia sono supportate le proprietà seguenti.
Proprietà | Descrizione | Richiesto |
---|---|---|
type | Proprietà type del sink dell'attività di copia, impostata su SnowflakeV2Sink. | Sì |
preCopyScript | Specificare una query SQL per l'attività di copia da eseguire prima di scrivere i dati in Snowflake in ogni esecuzione. Usare questa proprietà per pulire i dati precaricati. | No |
importSettings | Impostazioni avanzate usate per scrivere dati in Snowflake. È possibile configurare quelli supportati dal comando COPY nel quale il servizio passerà quando si richiama l'istruzione. | Sì |
In importSettings : |
||
type | Tipo di comando di importazione, impostato su SnowflakeImportCopyCommand. | Sì |
storageIntegration | Specificare il nome dell'integrazione di archiviazione creata in Snowflake. Per i passaggi preliminari dell'uso dell'integrazione dell'archiviazione, vedere Configurazione di un'integrazione dell'archiviazione Snowflake. | No |
additionalCopyOptions | Opzioni di copia aggiuntive, fornite come dizionario di coppie chiave-valore. Esempi: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Per altre informazioni, vedere Opzioni di copia Snowflake. | No |
additionalFormatOptions | Opzioni di formato di file aggiuntive fornite al comando COPIA, fornite come dizionario di coppie chiave-valore. Esempi: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Per altre informazioni, vedere Opzioni del tipo di formato Snowflake. | No |
Nota
Assicurarsi di disporre dell'autorizzazione per eseguire il comando seguente e accedere allo schema INFORMATION_SCHEMA e alla tabella COLUMNS.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Copia diretta in Snowflake
Se l'archivio dati di origine e il formato soddisfano i criteri descritti in questa sezione, è possibile usare l'attività Copia per copiare direttamente dall'origine a Snowflake. Il servizio controlla le impostazioni e, se i criteri seguenti non vengono soddisfatti, l'esecuzione dell'attività di copia non riesce:
Quando si specifica
storageIntegration
nel sink:L'archivio dati di origine è l'archivio BLOB di Azure a cui si fa riferimento nella fase esterna in Snowflake. Prima di copiare i dati, è necessario completare i passaggi seguenti:
Creare un servizio collegato archiviazione BLOB di Azure per l'archiviazione BLOB di Azure di origine con qualsiasi tipo di autenticazione supportato.
Concedere almeno il ruolo lettore di dati BLOB di archiviazione all'entità servizio Snowflake nell'archivio BLOB di Azure di origine controllo di accesso (IAM).
Quando non si specifica
storageIntegration
nel sink:Il servizio collegato di origine è Archiviazione BLOB di Azure con autenticazione della firma di accesso condiviso. Se si vuole copiare direttamente i dati da Azure Data Lake Storage Gen2 nel formato supportato seguente, è possibile creare un servizio collegato di Archiviazione BLOB di Azure con l'autenticazione di firma di accesso condiviso per l'account Azure Data Lake Storage Gen2, per evitare di usare la copia di staging in Snowflake.
Il formato dei dati di origine è Parquet, ORC o Testo delimitato, con le configurazioni seguenti:
Per il formato Parquet, il codec di compressione è Nessuno o Snappy.
Per il formato testo delimitato:
rowDelimiter
è \r\n, o qualsiasi singolo carattere. Se il delimitatore di riga non è "\r\n",firstRowAsHeader
deve essere falseeskipLineCount
non è specificato.compression
può essere nessuna compressione, gzip, bzip2 o deflate.encodingName
viene lasciato come predefinito o impostato su "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2 " ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8855", "ISO-885 9-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".quoteChar
è virgolette doppie, virgolette singole o stringa vuota (senza virgolette).
Per il formato JSON, la copia diretta supporta solo il caso in cui la tabella Snowflake sink ha solo una colonna e il tipo di dati di questa colonna è VARIANT, OBJECT o ARRAY.
compression
può essere nessuna compressione, gzip, bzip2 o deflate.encodingName
è impostato sul valore predefinito o su utf-8.- Il mapping delle colonne non è specificato.
Nell'origine dell'attività Copia:
additionalColumns
non è specificato.- Se l'origine è una cartella,
recursive
è impostata su true. prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
eenablePartitionDiscovery
non sono specificati.
Esempio:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Copia di staging in Snowflake
Quando l'archivio dati o il formato di origine non sono compatibili in modo nativo con il comando Snowflake COPY, come indicato nell'ultima sezione, abilitare la copia a fasi predefinita usando un'istanza di archiviazione Blob di Azure temporanea. La funzionalità copia di staging assicura inoltre una migliore velocità effettiva. Il servizio converte automaticamente i dati in modo da soddisfare i requisiti di formato dei dati di Snowflake. Richiama quindi il comando COPIA per caricare i dati in Snowflake. Infine, pulisce i dati temporanei dall'archiviazione BLOB. Per informazioni dettagliate sulla copia dei dati tramite una gestione temporanea, vedere Copia di staging.
Per usare questa funzionalità, creare un servizio collegato di archiviazione BLOB di Azure che fa riferimento all'account di archiviazione di Azure come staging provvisorio. Specificare quindi le proprietà enableStaging
e stagingSettings
nell'attività Copia.
Quando si specifica
storageIntegration
nel sink, l'archiviazione BLOB di Azure temporanea di staging deve essere quella a cui si fa riferimento nella fase esterna in Snowflake. Assicurarsi di creare un servizio collegato archiviazione BLOB di Azure con qualsiasi autenticazione supportata e concedere almeno un ruolo lettore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'entità servizio di gestione temporanea di Archiviazione BLOB di Azure controllo di accesso (IAM).Quando non si specifica
storageIntegration
nel sink, il servizio collegato Archiviazione BLOB di Azure di staging deve usare l'autenticazione della firma di accesso condiviso come richiesto dal comando Snowflake COPIA.
Esempio:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Proprietà del flusso di dati per mapping
Quando si trasformano i dati nel flusso di dati di mapping, è possibile leggere e scrivere nelle tabelle in Snowflake. Per altre informazioni, vedere la trasformazione origine e la trasformazione sink nei flussi di dati per mapping. È possibile scegliere di usare un set di dati Snowflake o un set di dati inline come tipo di origine e sink.
Trasformazione origine
Nella tabella seguente sono elencate le proprietà supportate dall'origine di Snowflake. È possibile modificare queste proprietà nella scheda Opzioni origine. Il connettore usa trasferimento dati interno di Snowflake.
Nome | Descrizione | Richiesto | Valori consentiti | Proprietà script del flusso di dati |
---|---|---|---|---|
Tabella | Se si seleziona Tabella come input, il flusso di dati recupera tutti i dati dalla tabella specificata nel set di dati Snowflake o nelle opzioni di origine quando si usa il set di dati inline. | No | String | (solo per set di dati inline) tableName schemaName |
Query | Se si seleziona Query come input, immettere una query per recuperare i dati da Snowflake. Questa impostazione esegue l'override di qualsiasi tabella scelta nel set di dati. Se i nomi dello schema, della tabella e delle colonne contengono lettere minuscole, virgolette l'identificatore dell'oggetto nella query, ad esempio select * from "schema"."myTable" . |
No | String | query |
Abilitare l'estrazione incrementale (anteprima) | Usare questa opzione per indicare ad ADF di elaborare solo le righe modificate dall'ultima esecuzione della pipeline. | No | Booleano | enableCdc |
Colonna incrementale | Quando si usa la funzionalità di estrazione incrementale, è necessario scegliere la colonna data/ora/numerica da usare come limite nella tabella di origine. | No | String | waterMarkColumn |
Abilitare Rilevamento modifiche di Snowflake (anteprima) | Questa opzione consente a Azure Data Factory di sfruttare la tecnologia Snowflake Change data capture per elaborare solo i dati differenziali dall'esecuzione precedente della pipeline. Questa opzione carica automaticamente i dati differenziali con operazioni di inserimento, aggiornamento ed eliminazione di righe senza richiedere alcuna colonna incrementale. | No | Booleano | enableNativeCdc |
Modifiche nette | Quando si usa il rilevamento delle modifiche Snowflake, è possibile usare questa opzione per ottenere le righe modificate deduplicate o le modifiche complete. Le righe modificate deduplicate mostreranno solo le versioni più recenti delle righe modificate da un determinato punto nel tempo, mentre le modifiche complete mostreranno tutte le versioni di ogni riga modificata, incluse quelle eliminate o aggiornate. Ad esempio, se si aggiorna una riga, verrà visualizzata una versione di eliminazione e una versione di inserimento in modifiche complete, ma solo la versione di inserimento nelle righe modificate deduplicate. A seconda del caso d'uso, è possibile scegliere l'opzione più adatta alle proprie esigenze. L'opzione predefinita è false, il che significa modifiche complete. | No | Booleano | netChanges |
Includere colonne di sistema | Quando si usa il rilevamento delle modifiche Snowflake, è possibile usare l'opzione systemColumns per controllare se le colonne del flusso di metadati fornite da Snowflake sono incluse o escluse nell'output del rilevamento delle modifiche. Per impostazione predefinita, systemColumns è impostato su true, ovvero le colonne del flusso di metadati sono incluse. È possibile impostare systemColumns su false se si desidera escluderli. | No | Booleano | systemColumns |
Iniziare a leggere dall'inizio | L'impostazione di questa opzione con l'estrazione incrementale e il rilevamento delle modifiche indicherà ad Azure Data Factory di leggere tutte le righe alla prima esecuzione di una pipeline con estrazione incrementale attivata. | No | Booleano | skipInitialLoad |
Esempi di script di origine Snowflake
Quando si usa il set di dati Snowflake come tipo di origine, lo script del flusso di dati associato è:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Se si usa un set di dati inline, lo script del flusso di dati associato è:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Rilevamento modifiche native
Azure Data Factory supporta ora una funzionalità nativa in Snowflake nota come rilevamento delle modifiche, che comporta il rilevamento delle modifiche sotto forma di log. Questa funzionalità di Snowflake consente di tenere traccia delle modifiche apportate ai dati nel tempo, rendendola utile per il caricamento e il controllo incrementali dei dati. Per utilizzare questa funzionalità, quando si abilita Change Data Capture e si seleziona Rilevamento modifiche di Snowflake, viene creato un oggetto Stream per la tabella di origine che abilita il rilevamento delle modifiche nella tabella snowflake di origine. Successivamente, viene usata la clausola MODIFICHE nella query per recuperare solo i dati nuovi o aggiornati dalla tabella di origine. È anche consigliabile pianificare la pipeline in modo che le modifiche vengano utilizzate entro l'intervallo di tempo di conservazione dei dati impostato per la tabella di origine snowflake. In caso contrario, l'utente potrebbe vedere un comportamento incoerente nelle modifiche acquisite.
Trasformazione sink
Nella tabella seguente sono elencate le proprietà supportate dal sink di Snowflake. È possibile modificare queste proprietà nella scheda Impostazioni. Quando si usa il set di dati inline, verranno visualizzate impostazioni aggiuntive, che corrispondono alle proprietà descritte nella sezione proprietà del set di dati. Il connettore usa il trasferimento interno dei dati di Snowflake.
Nome | Descrizione | Richiesto | Valori consentiti | Proprietà script del flusso di dati |
---|---|---|---|---|
Metodo di aggiornamento | Specificare le operazioni consentite nella destinazione di Snowflake. Per operazioni di aggiornamento, upsert o eliminazione di righe, è necessaria una trasformazione Altera riga perché i tag siano applicati alle righe per queste azioni. |
Sì | true oppure false |
deletable insertable updateable upsertable |
Colonne chiave | Per le operazioni di aggiornamento, upsert ed eliminazione è necessario impostare una o più colonne chiave per determinare quale riga modificare. | No | Matrice | keys |
azione Tabella | determina se ricreare o rimuovere tutte le righe dalla tabella di destinazione prima della scrittura. - Nessuno: non verrà eseguita alcuna azione sulla tabella. - Ricrea: la tabella verrà eliminata e ricreata. Questa opzione è obbligatoria se si crea una nuova tabella in modo dinamico. - Tronca: verranno rimosse tutte le righe della tabella di destinazione. |
No | true oppure false |
recreate truncate |
Esempi di script sink Snowflake
Quando si usa il set di dati Snowflake come tipo di sink, lo script del flusso di dati associato è:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Se si usa un set di dati inline, lo script del flusso di dati associato è:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Ottimizzazione pushdown query
Impostando il livello di registrazione della pipeline su Nessuno, si esclude la trasmissione delle metriche di trasformazione intermedia, impedendo potenziali ostacoli alle ottimizzazioni spark e abilitando l'ottimizzazione pushdown delle query fornita da Snowflake. Questa ottimizzazione pushdown consente miglioramenti sostanziali delle prestazioni per tabelle Snowflake di grandi dimensioni con set di dati estesi.
Nota
Non sono supportate tabelle temporanee in Snowflake, perché sono locali per la sessione o per l'utente che li crea, rendendole inaccessibili ad altre sessioni e soggette a sovrascrivere come tabelle regolari da Snowflake. Anche se Snowflake offre tabelle temporanee come alternativa, accessibili a livello globale, richiedono l'eliminazione manuale, contraddicendo l'obiettivo principale dell'uso delle tabelle temp, evitando qualsiasi operazione di eliminazione nello schema di origine.
Proprietà dell'attività Lookup
Per altre informazioni sulle proprietà, vedere Attività di Ricerca.
Aggiornare il connettore Snowflake
Per aggiornare il connettore Snowflake, è possibile eseguire un aggiornamento side-by-side o un aggiornamento sul posto.
Aggiornamento affiancato
Per eseguire un aggiornamento side-by-side, completare la procedura seguente:
- Creare un nuovo servizio collegato Snowflake e configurarlo facendo riferimento alle proprietà del servizio collegato.
- Creare un set di dati basato sul servizio collegato Snowflake appena creato.
- Sostituire il nuovo servizio collegato e il set di dati con quelli esistenti nelle pipeline che hanno come destinazione gli oggetti legacy.
Aggiornamento sul posto
Per eseguire un aggiornamento sul posto, è necessario modificare il payload del servizio collegato esistente e aggiornare il set di dati per usare il nuovo servizio collegato.
Aggiornare il tipo da Snowflake a SnowflakeV2.
Modificare il payload del servizio collegato dal formato legacy al nuovo modello. È possibile compilare ogni campo dall'interfaccia utente dopo aver modificato il tipo indicato in precedenza oppure aggiornare il payload direttamente tramite l'editor JSON. Per le proprietà di connessione supportate, vedere la sezione Proprietà del servizio collegato in questo articolo. Gli esempi seguenti illustrano le differenze nel payload per i nuovi servizi collegati Legacy e Snowflake:
Payload JSON del servizio collegato Snowflake legacy:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Nuovo payload JSON del servizio collegato Snowflake:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Aggiornare il set di dati per usare il nuovo servizio collegato. È possibile creare un nuovo set di dati in base al servizio collegato appena creato oppure aggiornare la proprietà type di un set di dati esistente da SnowflakeTable a SnowflakeV2Table.
Differenze tra Snowflake e Snowflake (legacy)
Il connettore Snowflake offre nuove funzionalità ed è compatibile con la maggior parte delle funzionalità del connettore Snowflake (legacy). La tabella seguente illustra le differenze di funzionalità tra Snowflake e Snowflake (legacy).
Snowflake | Snowflake (legacy) |
---|---|
Supportare l'autenticazione della coppia di chiavi e di base. | Supportare l'autenticazione di base. |
I parametri script non sono attualmente supportati nell'attività Script. In alternativa, usare espressioni dinamiche per i parametri di script. Per altre informazioni, vedere Espressioni e funzioni in Azure Data Factory e Azure Synapse Analytics. | Supportare i parametri di script nell'attività Script. |
Supportare BigDecimal nell'attività Lookup. Il tipo NUMERO, come definito in Snowflake, verrà visualizzato come stringa nell'attività Lookup. Se si vuole coprirlo al tipo numerico, è possibile usare il parametro della pipeline con la funzione int o la funzione float. Ad esempio int(activity('lookup').output.firstRow.VALUE) , float(activity('lookup').output.firstRow.VALUE) |
BigDecimal non è supportato nell'attività Lookup. |
Le accountIdentifier proprietà , database warehouse , schema e role vengono usate per stabilire una connessione. |
La connectionstring proprietà viene utilizzata per stabilire una connessione. |
il tipo di dati timestamp in Snowflake viene letto come tipo di dati DateTimeOffset nell'attività Lookup e Script. | il tipo di dati timestamp in Snowflake viene letto come tipo di dati DateTime nell'attività Lookup e Script. Se è ancora necessario usare il valore Datetime come parametro nella pipeline dopo l'aggiornamento del connettore, è possibile convertire il tipo DateTimeOffset in tipo DateTime usando la funzione formatDateTime (scelta consigliata) o la funzione concat. Ad esempio: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE) , concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') |
Contenuto correlato
Per un elenco degli archivi dati supportati come origini e sink dall'attività Copy, vedere Archivi dati e formati supportati.