Daten in Snowflake mithilfe von Data Factory oder Azure Synapse Analytics kopieren und transformieren
GILT FÜR: Azure Data Factory Azure Synapse Analytics
Tipp
Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!
In diesem Artikel wird beschrieben, wie Sie Daten mithilfe der Copy-Aktivität in Azure Data Factory- und Azure Synapse-Pipelines aus und in Snowflake kopieren sowie Daten mithilfe des Tools „Datenfluss“ in Snowflake transformieren. Weitere Informationen finden Sie im Einführungsartikel zu Data Factory oder Azure Synapse Analytics.
Wichtig
Der neue Snowflake-Connector bietet verbesserte native Snowflake-Unterstützung. Wenn Sie den Legacy-Snowflake-Connector in Ihrer Lösung verwenden, sollten Sie möglichst bald ein Upgrade Ihres Snowflake-Connectors durchführen. Ausführliche Informationen zum Unterschied zwischen der Legacy- und der neuesten Version finden Sie in diesem Abschnitt .
Unterstützte Funktionen
Für den Snowflake-Connector werden die folgenden Funktionen unterstützt:
Unterstützte Funktionen | IR |
---|---|
Kopieraktivität (Quelle/Senke) | ① ② |
Zuordnungsdatenfluss (Quelle/Senke) | ① |
Lookup-Aktivität | ① ② |
Skriptaktivität | ① ② |
① Azure Integration Runtime ② Selbstgehostete Integration Runtime
Für die Kopieraktivität unterstützt dieser Snowflake-Connector die folgenden Funktionen:
- Kopieren von Daten aus Snowflake unter Verwendung des Snowflake-Befehls COPY into [Speicherort], um optimale Leistung zu erzielen.
- Kopieren von Daten in Snowflake unter Verwendung des Snowflake-Befehls COPY into [Tabelle], um optimale Leistung zu erzielen. Snowflake in Azure wird unterstützt.
- Wenn für einen Proxy eine Verbindung mit Snowflake über eine selbstgehostete Integration Runtime erforderlich ist, müssen Sie die Umgebungsvariablen HTTP_PROXY und HTTPS_PROXY auf dem Integration Runtime-Host konfigurieren.
Voraussetzungen
Wenn sich Ihr Datenspeicher in einem lokalen Netzwerk, in einem virtuellen Azure-Netzwerk oder in einer virtuellen privaten Amazon-Cloud befindet, müssen Sie eine selbstgehostete Integration Runtime konfigurieren, um eine Verbindung herzustellen. Die von der selbstgehosteten Integration Runtime verwendeten IP-Adressen müssen der Positivliste hinzugefügt werden.
Handelt es sich bei Ihrem Datenspeicher um einen verwalteten Clouddatendienst, können Sie die Azure Integration Runtime verwenden. Ist der Zugriff auf IP-Adressen beschränkt, die in den Firewallregeln genehmigt sind, können Sie Azure Integration Runtime-IP-Adressen zur Positivliste hinzufügen.
Das für „Quelle“ oder „Senke“ verwendete Snowflake-Konto benötigt USAGE
-Zugriff auf die Datenbank und Lese-/Schreibzugriff auf das Schema und die zugehörigen Tabellen/Sichten. Außerdem muss es die Berechtigung CREATE STAGE
für das Schema haben, um die Phase „Extern“ mit SAS-URI erstellen zu können.
Die folgenden Werte für Kontoeigenschaften müssen festgelegt werden
Eigenschaft | Beschreibung | Erforderlich | Standard |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Gibt an, ob bei der Erstellung einer benannten externen Phase (mit CREATE STAGE) ein Speicherintegrationsobjekt als Anmeldeinformationen für den Zugriff auf einen privaten Cloudspeicherort benötigt wird. | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Gibt an, ob eine benannte externe Phase, die auf ein Speicherintegrationsobjekt verweist, als Anmeldeinformationen für die Cloud verwendet werden soll, wenn Daten aus einem privaten Cloudspeicherort geladen oder in diesen entladen werden. | FALSE | FALSE |
Weitere Informationen zu den von Data Factory unterstützten Netzwerksicherheitsmechanismen und -optionen finden Sie unter Datenzugriffsstrategien.
Erste Schritte
Sie können eines der folgenden Tools oder SDKs verwenden, um die Kopieraktivität mit einer Pipeline zu verwenden:
- Das Tool „Daten kopieren“
- Azure-Portal
- Das .NET SDK
- Das Python SDK
- Azure PowerShell
- Die REST-API
- Die Azure Resource Manager-Vorlage
Erstellen eines verknüpften Diensts mit Snowflake über die Benutzeroberfläche
Verwenden Sie die folgenden Schritte, um einen verknüpften Dienst mit Snowflake auf der Azure-Portal Benutzeroberfläche zu erstellen.
Navigieren Sie in Ihrem Azure Data Factory- oder Synapse-Arbeitsbereich zu der Registerkarte „Verwalten“, wählen Sie „Verknüpfte Dienste“ aus und klicken Sie dann auf „Neu“:
Suchen Sie nach Snowflake, und wählen Sie dann den Snowflake-Connector aus.
Konfigurieren Sie die Dienstdetails, testen Sie die Verbindung, und erstellen Sie den neuen verknüpften Dienst.
Details zur Connector-Konfiguration
Die folgenden Abschnitte enthalten Details zu Eigenschaften, die zum Definieren von Entitäten speziell für einen Snowflake-Connector verwendet werden.
Eigenschaften des verknüpften Diensts
Diese generischen Eigenschaften werden für den mit Snowflake verknüpften Dienst unterstützt:
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
Typ | Die „type“-Eigenschaft muss auf SnowflakeV2 festgelegt sein. | Ja |
accountIdentifier | Der Name des Kontos zusammen mit seiner Organisation. Beispiel: meineOrg-Konto123. | Ja |
database | Die Standarddatenbank, die nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. | Ja |
warehouse | Das standardmäßige virtuelle Warehouse, das nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. | Ja |
authenticationType | Typ der Authentifizierung für die Verbindung mit dem Snowflake-Dienst. Zulässige Werte sind: Basic (Standard) und KeyPair. Weitere Informationen zu anderen Eigenschaften und Beispiele finden Sie weiter unten in den jeweiligen Abschnitten. | No |
role | Die Standardsicherheitsrolle, die nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. | No |
host | Der Hostname des Snowflake-Kontos. Beispiel: contoso.snowflakecomputing.com .cn wird auch unterstützt. |
No |
connectVia | Die Integration Runtime, die zum Herstellen einer Verbindung mit dem Datenspeicher verwendet wird. Sie können die Azure Integration Runtime oder eine selbstgehostete Integration Runtime verwenden (wenn sich der Datenspeicher in einem privaten Netzwerk befindet). Wenn kein Wert angegeben ist, wird die standardmäßige Azure Integration Runtime verwendet. | No |
Dieser Snowflake-Connector unterstützt die folgenden Authentifizierungstypen. Weitere Informationen finden Sie in den entsprechenden Abschnitten.
Standardauthentifizierung
Um die Standardauthentifizierung zu verwenden, geben Sie zusätzlich zu den im vorherigen Abschnitt beschriebenen allgemeinen Eigenschaften die folgenden Eigenschaften an:
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
user | Anmeldename für Snowflake-Benutzer. | Ja |
Kennwort | Das Kennwort für Snowflake-Benutzer. Markieren Sie dieses Feld als Typ SecureString, um es sicher zu speichern. Sie können auch auf ein Geheimnis verweisen, das in Azure Key Vault gespeichert ist. | Ja |
Beispiel:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Kennwort in Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Hinweis
Der Zuordnungsdatenfluss unterstützt nur die Standardauthentifizierung.
Authentifizierung per Schlüsselpaar
Um die Authentifizierung per Schlüsselpaar zu verwenden, müssen Sie einen Schlüsselpaar-Authentifizierungsbenutzer in Snowflake konfigurieren und erstellen. Informationen dazu finden Sie unter Schlüsselpaarauthentifizierung und Schlüsselpaarrotation. Notieren Sie sich anschließend den privaten Schlüssel und die Passphrase (optional), die Sie zum Definieren des verknüpften Diensts verwenden.
Geben Sie zusätzlich zu den im vorherigen Abschnitt beschriebenen generischen Eigenschaften die folgenden Eigenschaften an:
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
user | Anmeldename für Snowflake-Benutzer. | Ja |
privateKey | Der private Schlüssel, der für die Schlüsselpaarauthentifizierung verwendet wird. Um sicherzustellen, dass der private Schlüssel gültig ist, wenn er an Azure Data Factory gesendet wird, und in Anbetracht der Tatsache, dass die privateKey-Datei Neue-Zeile-Zeichen (\n) enthält, ist es wichtig, den privateKey-Inhalt in seiner Zeichenfolgenliteralform korrekt zu formatieren. Dieser Vorgang umfasst das explizite Hinzufügen von \n zu jeder neuen Zeile. |
Ja |
privateKeyPassphrase | Die Passphrase, die zum Entschlüsseln des privaten Schlüssels verwendet wird, wenn er verschlüsselt ist. | No |
Beispiel:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Dataset-Eigenschaften
Eine vollständige Liste mit den Abschnitten und Eigenschaften, die zum Definieren von Datasets zur Verfügung stehen, finden Sie im Artikel zu Datasets.
Folgende Eigenschaften werden für das Snowflake-Dataset unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
Typ | Die „type“-Eigenschaft des Datasets muss auf SnowflakeV2Table festgelegt werden. | Ja |
schema | Name des Schemas. Beachten Sie, das beim Schemanamen die Groß- und Kleinschreibung beachtet wird. | Quelle: Nein, Senke: Ja |
table | Name der Tabelle/Ansicht. Beachten Sie, dann beim Tabellennamen wird die Groß- und Kleinschreibung beachtet wird. | Quelle: Nein, Senke: Ja |
Beispiel:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Eigenschaften der Kopieraktivität
Eine vollständige Liste mit den Abschnitten und Eigenschaften zum Definieren von Aktivitäten finden Sie im Artikel Pipelines. Dieser Abschnitt enthält eine Liste der Eigenschaften, die von der Snowflake-Quelle und -Senke unterstützt werden.
Snowflake als Quelle
Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Speicherort], um optimale Leistung zu erzielen.
Wenn der Senkendatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Details finden Sie unter Direktes Kopieren aus Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren aus Snowflake.
Beim Kopieren von Daten aus Snowflake werden die folgenden Eigenschaften im Abschnitt source der Kopieraktivität unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
Typ | Die „type“-Eigenschaft der Quelle der Copy-Aktivität muss auf SnowflakeV2Source festgelegt werden. | Ja |
Abfrage | Gibt die SQL-Abfrage an, mit der Daten aus Snowflake gelesen werden. Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable" .Die Ausführung der gespeicherten Prozedur wird nicht unterstützt. |
No |
exportSettings | Erweiterte Einstellungen, die zum Abrufen von Daten aus Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. | Ja |
Unter exportSettings : |
||
type | Der Typ des Exportbefehls, festgelegt auf SnowflakeExportCopyCommand. | Ja |
storageIntegration | Gibt den Namen der Speicherintegration an, die Sie in Ihrer Snowflake-Instanz erstellt haben. Die Schritte, die Sie vor der Verwendung der Speicherintegration ausführen müssen, finden Sie unter Konfigurieren einer Snowflake-Speicherintegration. | No |
additionalCopyOptions | Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: MAX_FILE_SIZE, OVERWRITE. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). | Nein |
additionalFormatOptions | Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). | Nein |
Hinweis
Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.
COPY INTO <location>
Direktes Kopieren aus Snowflake
Wenn der Senkendatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:
Wenn Sie
storageIntegration
in der Quelle angeben:Der Senkendatenspeicher ist die Azure Blob Storage-Instanz, auf die Sie in der externen Stage in Snowflake verwiesen haben. Sie müssen die folgenden Schritte ausführen, bevor Sie Daten kopieren:
Erstellen Sie einen mit Azure Blob Storage verknüpften Dienst für die Senkeninstanz von Azure Blob Storage, der über alle unterstützten Authentifizierungstypen verfügt.
Gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Senkeninstanz von Azure Blob Storage mindestens die Rolle Mitwirkender an Storage-Blobdaten.
Wenn Sie
storageIntegration
nicht in der Quelle angeben:Der verknüpfte Senkendienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten im folgenden unterstützten Format direkt zu Azure Data Lake Storage Gen2 kopieren möchten, können Sie eine verknüpfte Azure Blob Storage-Instanz mit SAS-Authentifizierung für Ihr Azure Data Lake Storage Gen2-Konto erstellen. Damit vermeiden Sie gestagte Kopiervorgänge aus Snowflake.
Das Senkendatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:
- Beim Format Parquet wird einer der Komprimierungscodecs None, Snappy oder Lzo verwendet.
- Beim Format Durch Trennzeichen getrennter Text:
rowDelimiter
ist \r\n oder ein beliebiges einzelnes Zeichen.compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt.quoteChar
ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
- Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Quelltabelle oder das Abfrageergebnis nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt.filePattern
in der Senke der Kopieraktivität wird auf dem Standardwert belassen oder auf setOfObjects festgelegt.
In der Quelle der Copy-Aktivität ist
additionalColumns
nicht angegeben.Die Spaltenzuordnung ist nicht angegeben.
Beispiel:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Gestaffeltes Kopieren aus Snowflake
Wenn der Senkendatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst exportiert Daten aus Snowflake in den Stagingspeicher, kopiert dann die Daten in die Senke und bereinigt schließlich die temporären Daten aus dem Stagingspeicher. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.
Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging
und stagingSettings
in der Kopieraktivität an.
Wenn Sie
storageIntegration
in der Quelle angeben, sollte das Zwischenstaginginstanz von Azure Blob Storage die sein, auf die Sie in der externen Stage in Snowflake verwiesen haben. Achten Sie darauf, einen mit Azure Blob Storage verknüpften Dienst dafür zu erstellen, der über alle unterstützten Authentifizierungen verfügt, und gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Azure Blob Storage-Staginginstanz mindestens die Rolle Mitwirkender an Storage-Blobdaten.Wenn Sie
storageIntegration
nicht in der Quelle angeben, muss der mit Azure Blob Storage verknüpfte Stagingdienst die SAS-Authentifizierung verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist. Erteilen Sie Snowflake die erforderlichen Zugriffsberechtigungen in Azure Blob Storage-Staginginstanz. Weitere Informationen hierzu finden Sie in diesem Artikel.
Beispiel:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Beim Ausführen einer mehrstufigen Kopie von Snowflake ist es wichtig, das Verhalten der Senkenkopie auf Dateien zusammenführen festzulegen. Diese Einstellung stellt sicher, dass alle partitionierten Dateien ordnungsgemäß verarbeitet und zusammengeführt werden und verhindern, dass nur die letzte partitionierte Datei kopiert wird.
Beispielkonfiguration
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Hinweis
Wird das Verhalten der Senkenkopie nicht auf Dateien zusammenführen festgelegt, kann dies dazu führen, dass nur die letzte partitionierte Datei kopiert wird.
Snowflake als Senke
Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Tabelle], um optimale Leistung zu erzielen. Er unterstützt das Schreiben von Daten in Snowflake in Azure.
Wenn der Quelldatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Details finden Sie unter Direktes Kopieren in Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren in Snowflake.
Beim Kopieren von Daten in Snowflake werden die folgenden Eigenschaften im Abschnitt sink der Kopieraktivität unterstützt.
Eigenschaft | Beschreibung | Erforderlich |
---|---|---|
Typ | Die „type“-Eigenschaft der Senke der Copy-Aktivität, festgelegt auf SnowflakeV2Sink. | Ja |
preCopyScript | Geben Sie eine SQL-Abfrage an, die bei jeder Ausführung von der Kopieraktivität ausgeführt werden soll, bevor Daten in Snowflake geschrieben werden. Sie können diese Eigenschaft nutzen, um vorab geladene Daten zu bereinigen. | Nein |
importSettings | Erweiterte Einstellungen, die zum Schreiben von Daten in Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. | Ja |
Unter importSettings : |
||
type | Der Typ des Importbefehls, festgelegt auf SnowflakeImportCopyCommand. | Ja |
storageIntegration | Gibt den Namen der Speicherintegration an, die Sie in Ihrer Snowflake-Instanz erstellt haben. Die Schritte, die Sie vor der Verwendung der Speicherintegration ausführen müssen, finden Sie unter Konfigurieren einer Snowflake-Speicherintegration. | No |
additionalCopyOptions | Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). | Nein |
additionalFormatOptions | Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). | Nein |
Hinweis
Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Direktes Kopieren in Snowflake
Wenn der Quelldatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:
Wenn Sie
storageIntegration
in der Senke angeben:Der Quelldatenspeicher ist die Azure Blob Storage-Instanz, auf die Sie in der externen Stage in Snowflake verwiesen haben. Sie müssen die folgenden Schritte ausführen, bevor Sie Daten kopieren:
Erstellen Sie einen mit Azure Blob Storage verknüpften Dienst für die Azure Blob Storage-Quellinstanz, der über alle unterstützten Authentifizierungstypen verfügt.
Gewähren Sie dem Snowflake-Dienstprinzipal mindestens die Rolle Storage-Blobdatenleser in der Zugriffssteuerung (IAM) der Azure Blob Storage-Quellinstanz.
Wenn Sie
storageIntegration
nicht in der Senke angeben:Der verknüpfte Quelldienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten im folgenden unterstützten Format direkt aus Azure Data Lake Storage Gen2 kopieren möchten, können Sie eine verknüpfte Azure Blob Storage-Instanz mit SAS-Authentifizierung für Ihr Azure Data Lake Storage Gen2-Konto erstellen. Damit vermeiden Sie gestagte Kopiervorgänge zu Snowflake.
Das Quelldatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:
Beim Format Parquet wird einer der Komprimierungscodecs None oder Snappy verwendet.
Beim Format Durch Trennzeichen getrennter Text:
rowDelimiter
ist \r\n oder ein beliebiges einzelnes Zeichen. Wenn das Zeilentrennzeichen nicht „\r\n“ ist, mussfirstRowAsHeader
auf false festgelegt sein.skipLineCount
ist nicht angegeben.compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.encodingName
wird als Standardwert übernommen oder ist auf „UTF-8“, „UTF-16“, „UTF-16BE“, „UTF-32“, „UTF-32BE“, „BIG5“, „EUC-JP“, „EUC-KR“, „GB18030“, „ISO-2022-JP“, „ISO-2022-KR“, „ISO-8859-1“, „ISO-8859-2“, „ISO-8859-5“, „ISO-8859-6“, „ISO-8859-7“, „ISO-8859-8“, „ISO-8859-9“, „WINDOWS-1250“, „WINDOWS-1251“, „WINDOWS-1252“, „WINDOWS-1253“, „WINDOWS-1254“ oder „WINDOWS-1255“ festgelegt.quoteChar
ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Senkentabelle nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.
compression
kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.encodingName
wird als Standardwert übernommen oder ist auf utf-8 festgelegt.- Die Spaltenzuordnung ist nicht angegeben.
In der Quelle der Kopieraktivität:
additionalColumns
ist nicht angegeben.- Wenn es sich bei der Quelle um einen Ordner handelt, wird
recursive
auf TRUE festgelegt. prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
undenablePartitionDiscovery
wurden nicht angegeben.
Beispiel:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Gestaffeltes Kopieren in Snowflake
Wenn der Quelldatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst konvertiert die Daten automatisch, damit das Datenformat den Anforderungen von Snowflake entspricht. Dann wird der Befehl „COPY“ aufgerufen, um die Daten in Snowflake zu laden. Abschließend werden Sie die temporären Daten in Blob Storage bereinigt. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.
Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging
und stagingSettings
in der Kopieraktivität an.
Wenn Sie
storageIntegration
in der Senke angeben, sollte das Zwischenstaginginstanz von Azure Blob Storage die sein, auf die Sie in der externen Stage in Snowflake verwiesen haben. Achten Sie darauf, einen mit Azure Blob Storage verknüpften Dienst dafür zu erstellen, der über alle unterstützten Authentifizierungen verfügt, und gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Azure Blob Storage-Staginginstanz mindestens die Rolle Storage-Blobdatenleser.Wenn Sie
storageIntegration
nicht in der Senke angeben, muss der mit Azure Blob Storage verknüpfte Stagingdienst die SAS-Authentifizierung verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist.
Beispiel:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Eigenschaften von Mapping Data Flow
Beim Transformieren von Daten im Zuordnungsdatenfluss können Sie in Snowflake Tabellen lesen und in diese schreiben. Weitere Informationen finden Sie unter Quellentransformation und Senkentransformation in Zuordnungsdatenflüssen. Sie können ein Snowflake-Dataset oder Inlinedataset als Quell- und Senkentyp verwenden.
Quellentransformation
In der folgenden Tabelle sind die von einer Snowflake-Quelle unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Quelloptionen bearbeiten. Der Connector verwendet die interne Datenübertragung von Snowflake.
Name | BESCHREIBUNG | Erforderlich | Zulässige Werte | Datenflussskript-Eigenschaft |
---|---|---|---|---|
Tabelle | Wenn Sie „Tabelle“ als Eingabe auswählen, ruft der Datenfluss alle Daten aus der Tabelle ab, die im Snowflake-Dataset oder bei Verwendung des Inlinedatasets in den Quelloptionen angegeben ist. | Nein | String | (nur für Inlinedataset) tableName schemaName |
Abfrage | Wenn Sie „Abfrage“ als Eingabe auswählen, geben Sie eine Abfrage zum Abrufen von Daten aus Snowflake ein. Diese Einstellung überschreibt jede Tabelle, die Sie im Dataset ausgewählt haben. Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable" . |
Nein | String | Abfrage |
Inkrementelle Extrahierung aktivieren (Vorschau) | Verwenden Sie diese Option, um ADF mitzuteilen, dass nur Zeilen verarbeitet werden sollen, die seit der letzten Ausführung der Pipeline geändert wurden. | Nein | Boolean | enableCdc |
Inkrementelle Spalte | Wenn Sie das Feature für das inkrementelle Extrahieren verwenden, müssen Sie die Datums-/Uhrzeit-/numerische Spalte auswählen, die Sie als Grenzwert in der Quelltabelle verwenden möchten. | Nein | String | waterMarkColumn |
Aktivieren der Snowflake-Änderungsnachverfolgung (Vorschau) | Mit dieser Option kann ADF die Snowflake-Change Data Capture-Technologie nutzen, um nur die Deltadaten seit der vorherigen Pipelineausführung zu verarbeiten. Mit dieser Option werden die Deltadaten automatisch mit Vorgängen für Zeileneinfügung, Zeilenaktualisierung und Zeilenlöschung geladen, ohne dass eine inkrementelle Spalte erforderlich ist. | Nein | Boolean | enableNativeCdc |
Netto-Änderungen | Wenn Sie die Snowflake-Änderungsnachverfolgung verwenden, können Sie diese Option verwenden, um deduplizierte geänderte Zeilen oder umfassende Änderungen abzurufen. Deduplizierte geänderte Zeilen zeigen nur die neuesten Versionen der Zeilen an, die seit einem bestimmten Zeitpunkt geändert wurden, während umfassende Änderungen alle Versionen jeder geänderten Zeile anzeigen, einschließlich der Zeilen, die gelöscht oder aktualisiert wurden. Wenn Sie z. B. eine Zeile aktualisieren, wird eine Löschversion und eine Einfügeversion in den umfassenden Änderungen angezeigt, aber nur die Einfügeversion in deduplizierten geänderten Zeilen. Je nach Anwendungsfall können Sie die Option auswählen, die Ihren Bedürfnissen entspricht. Die Standardoption ist FALSCH, was umfassende Änderungen bedeutet. | Nein | Boolean | netChanges |
Systemspalten einschließen | Bei Verwendung der Snowflake-Änderungsnachverfolgung können Sie mithilfe der Option „systemColumns“ steuern, ob die von Snowflake bereitgestellten Metadatenstromspalten in die Ausgabe der Änderungsnachverfolgung einbezogen oder ausgeschlossen werden. Standardmäßig ist „systemColumns“ auf WAHR festgelegt, was bedeutet, dass die Metadatenstromspalten eingeschlossen sind. Sie können „systemColumns“ auf FALSCH festlegen, wenn Sie diese ausschließen möchten. | Nein | Boolean | systemColumns |
Lesen von Anfang an beginnen | Wenn Sie diese Option mit dem inkrementellen Extrahieren und der Änderungsnachverfolgung festlegen, wird ADF angewiesen, alle Zeilen bei der ersten Ausführung einer Pipeline mit aktiviertem inkrementellem Extrahieren zu lesen. | Nein | Boolean | skipInitialLoad |
Beispiele für Snowflake-Quellskripts
Wenn Sie das Snowflake-Dataset als Quelltyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Native Änderungsnachverfolgung
Azure Data Factory unterstützt jetzt ein natives Feature in Snowflake, das als Änderungsnachverfolgung bekannt ist, was das Nachverfolgen von Änderungen in Form von Protokollen umfasst. Dieses Feature von Snowflake ermöglicht es uns, die Änderungen der Daten im Zeitverlauf zu verfolgen, was für das inkrementelle Laden von Daten und für Überprüfungszwecke nützlich ist. Wenn Sie dieses Feature verwenden möchten, erstellen wir beim Aktivieren von Change Data Capture und Auswählen der Snowflake-Änderungsnachverfolgung ein Stream-Objekt für die Quelltabelle, das die Änderungsnachverfolgung in der Snowflake-Tabelle ermöglicht. Anschließend verwenden wir die CHANGES-Klausel in unserer Abfrage, um nur die neuen oder aktualisierten Daten aus der Quelltabelle abzurufen. Außerdem wird empfohlen, die Pipeline so zu planen, dass Änderungen innerhalb des Intervalls der Datenaufbewahrungszeit für die Snowflake-Quelltabelle verarbeitet werden, sonst können Benutzer inkonsistentes Verhalten bei erfassten Änderungen sehen.
Senkentransformation
In der folgenden Tabelle sind die von einer Snowflake-Senke unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Einstellungen bearbeiten. Bei Verwendung eines Inlinedatasets werden zusätzliche Einstellungen angezeigt. Diese entsprechen den Eigenschaften, die im Abschnitt zu den Dataseteigenschaften beschrieben sind. Der Connector verwendet die interne Datenübertragung von Snowflake.
Name | BESCHREIBUNG | Erforderlich | Zulässige Werte | Datenflussskript-Eigenschaft |
---|---|---|---|---|
Updatemethode | Geben Sie an, welche Vorgänge für das Snowflake-Ziel zulässig sind. Um Aktualisierungs-, Upsert- oder Löschaktionen auf Zeilen anzuwenden, muss eine Zeilenänderungstransformation zum Kennzeichnen von Zeilen für diese Aktionen erfolgen. |
Ja | true oder false |
deletable insertable updateable upsertable |
Schlüsselspalten | Für Update-, Upsert- und Löschvorgänge muss mindestens eine Schlüsselspalte festgelegt werden, um die Zeile zu bestimmen, die geändert werden soll. | Nein | Array | keys |
Aktion table | Bestimmt, ob die Zieltabelle vor dem Schreiben neu erstellt werden soll oder alle Zeilen aus der Zieltabelle entfernt werden sollen. - Keine: Es wird keine Aktion an der Tabelle vorgenommen. - Neu erstellen: Die Tabelle wird gelöscht und neu erstellt. Erforderlich, wenn eine neue Tabelle dynamisch erstellt wird. - Abschneiden: Alle Zeilen werden aus der Zieltabelle entfernt. |
Nein | true oder false |
Neu erstellen truncate |
Beispiele für Snowflake-Senkenskripts
Wenn Sie das Snowflake-Dataset als Senkentyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Abfrage-Pushdown-Optimierung
Durch Festlegen des Protokolliergrads der Pipeline auf „Keine“ schließen wir die Übertragung von Zwischenmetriken für die Transformation aus, wodurch potenzielle Hindernisse für Spark-Optimierungen vermieden und die von Snowflake bereitgestellte Abfrage-Pushdown-Optimierung ermöglicht werden. Diese Pushdown-Optimierung ermöglicht erhebliche Leistungsverbesserungen für große Snowflake-Tabellen mit umfangreichen Datasets.
Hinweis
Temporäre Tabellen in Snowflake werden nicht unterstützt, da sie lokal für die Sitzungen oder Benutzer sind, die sie erstellen, wodurch sie für andere Sitzungen unzugänglich sind und anfällig dafür macht, von Snowflake als reguläre Tabellen überschrieben zu werden. Während Snowflake transiente Tabellen als Alternative bietet, die global zugänglich sind, erfordern sie ein manuelles Löschen, was unserem primären Ziel der Verwendung von temporäre Tabellen widerspricht, das darin besteht, jegliche Löschvorgänge im Quellschema zu vermeiden.
Eigenschaften der Lookup-Aktivität
Weitere Informationen zu den Eigenschaften finden Sie unter Lookup-Aktivität in Azure Data Factory.
Upgraden des Snowflake-Connectors
Um den verknüpften Snowflake-Dienst zu upgraden, können Sie ein paralleles Upgrade oder ein direktes Upgrade durchführen.
Paralleles Upgrade
Führen Sie die folgenden Schritte aus, um ein paralleles Upgrade durchzuführen:
- Erstellen Sie einen neuen verknüpften Snowflake-Dienst, und konfigurieren Sie ihn anhand der Eigenschaften des verknüpften Diensts.
- Erstellen Sie ein Dataset basierend auf dem neu erstellten verknüpften Snowflake-Dienst.
- Ersetzen Sie den neuen verknüpften Dienst und das Dataset durch die vorhandenen in den Pipelines, die auf die Legacyobjekte verweisen.
Direktes Upgrade
Um ein direktes Upgrade durchzuführen, müssen Sie die Nutzdaten des vorhandenen verknüpften Diensts bearbeiten und das Dataset aktualisieren, damit der neue verknüpfte Dienst verwendet wird.
Aktualisieren Sie den Typ von Snowflake in SnowflakeV2.
Ändern Sie die Nutzdaten des verknüpften Diensts aus dem Legacyformat in das neue Muster. Sie können jedes Feld entweder über die Benutzeroberfläche ausfüllen, nachdem Sie den oben genannten Typ geändert haben, oder die Nutzdaten direkt über den JSON-Editor aktualisieren. Im Abschnitt Eigenschaften des verknüpften Diensts in diesem Artikel finden Sie die unterstützten Verbindungseigenschaften. Die folgenden Beispiele zeigen die Unterschiede bei den Nutzdaten für die Legacy- und die neuen verknüpften Snowflake-Dienste:
JSON-Nutzdaten des verknüpften Snowflake-Legacydiensts:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
JSON-Nutzdaten des neuen verknüpften Snowflake-Diensts:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Aktualisieren Sie das Dataset, um den neuen verknüpften Dienst zu verwenden. Sie können entweder ein neues Dataset basierend auf dem neu erstellten verknüpften Dienst erstellen oder die type-Eigenschaft eines vorhandenen Datasets von SnowflakeTable in SnowflakeV2Table ändern.
Unterschiede zwischen Snowflake und Snowflake (Legacy)
Der Snowflake-Connector bietet neue Funktionen und ist mit den meisten Features des Snowflake-Connector (Legacy) kompatibel. Die folgende Tabelle zeigt die Funktionsunterschiede zwischen Snowflake und Snowflake (Legacy).
Snowflake | Snowflake (Legacy) |
---|---|
Unterstützen die Standardauthentifizierung und die Authentifizierung über ein Schlüsselpaar. | Unterstützen die Standardauthentifizierung. |
Skriptparameter werden derzeit in Skripten nicht unterstützt. Verwenden Sie alternativ dynamische Ausdrücke für Skriptparameter. Weitere Informationen finden Sie unter Ausdrücke und Funktionen in Azure Data Factory und Azure Synapse Analytics. | Unterstützen Skriptparameter in Skripten. |
Unterstützen BigDecimal in Lookups. Der in Snowflake definierte Typ NUMBER wird als Zeichenfolge in Lookups angezeigt. | BigDecimal wird in Lookups nicht unterstützt. |
Legacy-Eigenschaft connectionstring ist veraltet und wird durch die erforderlichen Parameter Konto, Lager, Datenbank, Schema und Rolle ersetzt |
Im älteren Snowflake-Verbinder wurde die connectionstring -Eigenschaft verwendet, um eine Verbindung herzustellen. |
Zugehöriger Inhalt
Eine Liste der Datenspeicher, die als Quellen und Senken für die Copy-Aktivität unterstützt werden, finden Sie unter Unterstützte Datenspeicher und Formate.