Kopiowanie i przekształcanie danych w usłudze Snowflake przy użyciu usługi Azure Data Factory lub Azure Synapse Analytics
DOTYCZY: Azure Data Factory Azure Synapse Analytics
Napiwek
Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !
W tym artykule opisano sposób używania działanie Kopiuj w potokach usługi Azure Data Factory i Azure Synapse do kopiowania danych z i do usługi Snowflake oraz używania Przepływ danych do przekształcania danych w usłudze Snowflake. Aby uzyskać więcej informacji, zobacz artykuł wprowadzający dotyczący usługi Data Factory lub Azure Synapse Analytics.
Ważne
Nowy łącznik Snowflake zapewnia ulepszoną natywną obsługę rozwiązania Snowflake. Jeśli używasz starszego łącznika Snowflake w swoim rozwiązaniu, zalecamy uaktualnienie łącznika Snowflake najwcześniejszą wygodą. Zapoznaj się z tą sekcją , aby uzyskać szczegółowe informacje na temat różnicy między starszą i najnowszą wersją.
Obsługiwane możliwości
Ten łącznik snowflake jest obsługiwany w następujących możliwościach:
Obsługiwane możliwości | IR |
---|---|
działanie Kopiuj (źródło/ujście) | (1) (2) |
Przepływ danych mapowania (źródło/ujście) | (1) |
Działanie Lookup | (1) (2) |
Działanie skryptu | (1) (2) |
(1) Środowisko Azure Integration Runtime (2) Self-hosted Integration Runtime
W przypadku działanie Kopiuj ten łącznik Snowflake obsługuje następujące funkcje:
- Skopiuj dane z usługi Snowflake, która korzysta z polecenia COPY rozwiązania Snowflake do polecenia [location], aby uzyskać najlepszą wydajność.
- Skopiuj dane do usługi Snowflake, która korzysta z polecenia COPY firmy Snowflake do polecenia [table], aby uzyskać najlepszą wydajność. Obsługuje ona rozwiązanie Snowflake na platformie Azure.
- Jeśli serwer proxy jest wymagany do nawiązania połączenia z usługą Snowflake z własnego środowiska Integration Runtime, należy skonfigurować zmienne środowiskowe dla HTTP_PROXY i HTTPS_PROXY na hoście środowiska Integration Runtime.
Wymagania wstępne
Jeśli magazyn danych znajduje się wewnątrz sieci lokalnej, sieci wirtualnej platformy Azure lub chmury prywatnej Amazon Virtual, musisz skonfigurować własne środowisko Integration Runtime , aby się z nim połączyć. Pamiętaj, aby dodać adresy IP używane przez własne środowisko Integration Runtime do listy dozwolonych.
Jeśli magazyn danych jest zarządzaną usługą danych w chmurze, możesz użyć środowiska Azure Integration Runtime. Jeśli dostęp jest ograniczony do adresów IP zatwierdzonych w regułach zapory, możesz dodać adresy IP środowiska Azure Integration Runtime do listy dozwolonych.
Konto snowflake używane do źródła lub ujścia powinno mieć niezbędny USAGE
dostęp do bazy danych i dostępu do odczytu/zapisu w schemacie oraz tabel/widoków w nim. Ponadto powinien on również znajdować CREATE STAGE
się w schemacie, aby móc utworzyć etap zewnętrzny przy użyciu identyfikatora URI sygnatury dostępu współdzielonego.
Należy ustawić następujące wartości właściwości konta
Właściwości | Opis | Wymagani | Wartość domyślna |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Określa, czy należy wymagać obiektu integracji magazynu jako poświadczeń w chmurze podczas tworzenia nazwanego etapu zewnętrznego (przy użyciu funkcji CREATE STAGE) w celu uzyskania dostępu do lokalizacji magazynu w chmurze prywatnej. | FAŁSZ | FAŁSZ |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Określa, czy należy użyć nazwanego etapu zewnętrznego, który odwołuje się do obiektu integracji magazynu jako poświadczeń chmury podczas ładowania danych z lub zwalniania danych do lokalizacji magazynu w chmurze prywatnej. | FAŁSZ | FAŁSZ |
Aby uzyskać więcej informacji na temat mechanizmów zabezpieczeń sieci i opcji obsługiwanych przez usługę Data Factory, zobacz Strategie dostępu do danych.
Rozpocznij
Aby wykonać działanie Kopiuj za pomocą potoku, możesz użyć jednego z następujących narzędzi lub zestawów SDK:
- Narzędzie do kopiowania danych
- Witryna Azure Portal
- Zestaw SDK platformy .NET
- Zestaw SDK języka Python
- Azure PowerShell
- Interfejs API REST
- Szablon usługi Azure Resource Manager
Tworzenie połączonej usługi z usługą Snowflake przy użyciu interfejsu użytkownika
Wykonaj poniższe kroki, aby utworzyć połączoną usługę snowflake w interfejsie użytkownika witryny Azure Portal.
Przejdź do karty Zarządzanie w obszarze roboczym usługi Azure Data Factory lub Synapse i wybierz pozycję Połączone usługi, a następnie kliknij pozycję Nowy:
Wyszukaj pozycję Snowflake i wybierz łącznik Snowflake.
Skonfiguruj szczegóły usługi, przetestuj połączenie i utwórz nową połączoną usługę.
Szczegóły konfiguracji łącznika
Poniższe sekcje zawierają szczegółowe informacje o właściwościach definiujących jednostki specyficzne dla łącznika usługi Snowflake.
Właściwości połączonej usługi
Te właściwości ogólne są obsługiwane w przypadku połączonej usługi Snowflake:
Właściwości | Opis | Wymagania |
---|---|---|
type | Właściwość type musi być ustawiona na SnowflakeV2. | Tak |
accountIdentifier | Nazwa konta wraz z jego organizacją. Na przykład myorg-account123. | Tak |
database | Domyślna baza danych używana na potrzeby sesji po nawiązaniu połączenia. | Tak |
magazyn | Domyślny magazyn wirtualny używany na potrzeby sesji po nawiązaniu połączenia. | Tak |
authenticationType | Typ uwierzytelniania używanego do nawiązywania połączenia z usługą Snowflake. Dozwolone wartości to: Basic (Default) i KeyPair. Zapoznaj się z odpowiednimi sekcjami poniżej, aby uzyskać więcej właściwości i przykładów. | Nie. |
role | Domyślna rola zabezpieczeń używana dla sesji po nawiązaniu połączenia. | Nie. |
host | Nazwa hosta konta snowflake. Na przykład: contoso.snowflakecomputing.com . .cn jest również obsługiwany. |
Nie. |
connectVia | Środowisko Integration Runtime używane do nawiązywania połączenia z magazynem danych. Możesz użyć środowiska Azure Integration Runtime lub własnego środowiska Integration Runtime (jeśli magazyn danych znajduje się w sieci prywatnej). Jeśli nie zostanie określony, używa domyślnego środowiska Azure Integration Runtime. | Nie. |
Ten łącznik snowflake obsługuje następujące typy uwierzytelniania. Aby uzyskać szczegółowe informacje, zobacz odpowiednie sekcje.
Uwierzytelnianie podstawowe
Aby użyć uwierzytelniania podstawowego , oprócz właściwości ogólnych opisanych w poprzedniej sekcji, określ następujące właściwości:
Właściwości | Opis | Wymagania |
---|---|---|
Użytkownik | Nazwa logowania użytkownika snowflake. | Tak |
hasło | Hasło użytkownika snowflake. Oznacz to pole jako typ SecureString, aby przechowywać je bezpiecznie. Możesz również odwołać się do wpisu tajnego przechowywanego w usłudze Azure Key Vault. | Tak |
Przykład:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Hasło w usłudze Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Uwaga
Mapowanie Przepływ danych obsługuje tylko uwierzytelnianie podstawowe.
Uwierzytelnianie pary kluczy
Aby użyć uwierzytelniania par kluczy, należy skonfigurować i utworzyć użytkownika uwierzytelniania pary kluczy w usłudze Snowflake, odwołując się do uwierzytelniania par kluczy i rotacji par kluczy. Następnie zanotuj klucz prywatny i hasło (opcjonalnie), które służy do definiowania połączonej usługi.
Oprócz właściwości ogólnych opisanych w poprzedniej sekcji określ następujące właściwości:
Właściwości | Opis | Wymagania |
---|---|---|
Użytkownik | Nazwa logowania użytkownika snowflake. | Tak |
privateKey | Klucz prywatny używany do uwierzytelniania pary kluczy. Aby upewnić się, że klucz prywatny jest prawidłowy podczas wysyłania do usługi Azure Data Factory i biorąc pod uwagę, że plik privateKey zawiera znaki nowego wiersza (\n), ważne jest, aby poprawnie sformatować zawartość privateKey w postaci literału ciągu. Ten proces obejmuje jawne dodanie \n do każdego nowego wiersza. |
Tak |
privateKeyPassphrase | Hasło używane do odszyfrowywania klucza prywatnego, jeśli jest zaszyfrowane. | Nie. |
Przykład:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Właściwości zestawu danych
Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania zestawów danych, zobacz artykuł Zestawy danych.
Następujące właściwości są obsługiwane dla zestawu danych Snowflake.
Właściwości | Opis | Wymagania |
---|---|---|
type | Właściwość type zestawu danych musi być ustawiona na SnowflakeV2Table. | Tak |
schema | Nazwa schematu. Zwróć uwagę, że w nazwie schematu jest uwzględniana wielkość liter. | Nie dla źródła, tak dla ujścia |
table | Nazwa tabeli/widoku. Zwróć uwagę, że w nazwie tabeli jest rozróżniana wielkość liter. | Nie dla źródła, tak dla ujścia |
Przykład:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Właściwości działania kopiowania
Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania działań, zobacz artykuł Pipelines (Potoki ). Ta sekcja zawiera listę właściwości obsługiwanych przez źródło i ujście snowflake.
Płatk śniegu jako źródło
Łącznik Snowflake używa polecenia COPY rozwiązania Snowflake do [location] w celu uzyskania najlepszej wydajności.
Jeśli magazyn danych ujścia i format są natywnie obsługiwane przez polecenie Snowflake COPY, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować z snowflake do ujścia. Aby uzyskać szczegółowe informacje, zobacz Bezpośrednia kopia z usługi Snowflake. W przeciwnym razie użyj wbudowanej kopii etapowej z usługi Snowflake.
Aby skopiować dane z usługi Snowflake, w sekcji źródła działanie Kopiuj są obsługiwane następujące właściwości.
Właściwości | Opis | Wymagania |
---|---|---|
type | Właściwość type źródła działanie Kopiuj musi być ustawiona na SnowflakeV2Source. | Tak |
zapytanie | Określa zapytanie SQL do odczytu danych z usługi Snowflake. Jeśli nazwy schematu, tabeli i kolumn zawierają małe litery, podaj identyfikator obiektu w zapytaniu, np. select * from "schema"."myTable" .Wykonywanie procedury składowanej nie jest obsługiwane. |
Nie. |
exportSettings | Ustawienia zaawansowane używane do pobierania danych z usługi Snowflake. Można skonfigurować te obsługiwane przez copy do polecenia, które usługa będzie przechodzić po wywołaniu instrukcji . | Tak |
W obszarze exportSettings : |
||
type | Typ polecenia eksportu, ustawiony na SnowflakeExportCopyCommand. | Tak |
storageIntegration | Określ nazwę integracji magazynu utworzonej w aplikacji Snowflake. Aby zapoznać się z krokami wstępnymi dotyczącymi korzystania z integracji magazynu, zobacz Konfigurowanie integracji magazynu snowflake. | Nie. |
additionalCopyOptions | Dodatkowe opcje kopiowania udostępniane jako słownik par klucz-wartość. Przykłady: MAX_FILE_SIZE, ZASTĄP. Aby uzyskać więcej informacji, zobacz Snowflake Copy Options (Opcje kopiowania snowflake). | Nie. |
additionalFormatOptions | Dodatkowe opcje formatowania pliku udostępniane do polecenia COPY jako słownik par klucz-wartość. Przykłady: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Aby uzyskać więcej informacji, zobacz Snowflake Format Type Options (Opcje typu formatu snowflake). | Nie. |
Uwaga
Upewnij się, że masz uprawnienia do wykonywania następującego polecenia i uzyskiwania dostępu do INFORMATION_SCHEMA schematu i tabeli COLUMNS.
COPY INTO <location>
Bezpośrednia kopia z usługi Snowflake
Jeśli magazyn danych ujścia i format spełniają kryteria opisane w tej sekcji, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować z rozwiązania Snowflake do ujścia. Usługa sprawdza ustawienia i kończy się niepowodzeniem działanie Kopiuj uruchomić, jeśli następujące kryteria nie są spełnione:
Po określeniu
storageIntegration
w źródle:Magazyn danych ujścia to usługa Azure Blob Storage, o której mowa na etapie zewnętrznym w rozwiązaniu Snowflake. Przed skopiowaniem danych należy wykonać następujące czynności:
Utwórz połączoną usługę Azure Blob Storage dla ujścia usługi Azure Blob Storage z dowolnymi obsługiwanymi typami uwierzytelniania.
Przyznaj co najmniej rolę Współautor danych obiektu blob usługi Storage jednostce usługi Snowflake w ujściu kontroli dostępu do usługi Azure Blob Storage (IAM).
Jeśli nie określisz
storageIntegration
w źródle:Połączona usługa ujścia to Azure Blob Storage z uwierzytelnianiem sygnatury dostępu współdzielonego. Jeśli chcesz bezpośrednio skopiować dane do usługi Azure Data Lake Storage Gen2 w następującym obsługiwanym formacie, możesz utworzyć połączoną usługę Azure Blob Storage z uwierzytelnianiem SAS względem konta usługi Azure Data Lake Storage Gen2, aby uniknąć używania kopii etapowej z usługi Snowflake.
Format danych ujścia to Parquet, tekst rozdzielany lub JSON z następującymi konfiguracjami:
- W przypadku formatu Parquet koder kompresji to None, Snappy lub Lzo.
- W przypadku formatu tekstu rozdzielanego:
rowDelimiter
to \r\n lub dowolny pojedynczy znak.compression
nie może być kompresją, gzip, bzip2 lub deflate.encodingName
jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.quoteChar
jest cudzysłowem podwójnym, pojedynczym cudzysłowem lub pustym ciągiem (bez cudzysłowu).
- W przypadku formatu JSON kopiowanie bezpośrednie obsługuje tylko przypadek, w przypadku którego źródłowa tabela Snowflake lub wynik zapytania zawiera tylko jedną kolumnę, a typ danych tej kolumny to VARIANT, OBJECT lub ARRAY.
compression
nie może być kompresją, gzip, bzip2 lub deflate.encodingName
jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.filePattern
w ujściu działania kopiowania jest pozostawiona jako domyślna lub ustawiona na wartość setOfObjects.
W źródle
additionalColumns
działania kopiowania nie jest określony.Nie określono mapowania kolumn.
Przykład:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Kopia etapowa z usługi Snowflake
Jeśli magazyn danych ujścia lub format nie jest natywnie zgodny z poleceniem Snowflake COPY, jak wspomniano w ostatniej sekcji, włącz wbudowaną kopię etapową przy użyciu tymczasowego wystąpienia usługi Azure Blob Storage. Funkcja kopiowania etapowego zapewnia również lepszą przepływność. Usługa eksportuje dane z usługi Snowflake do magazynu przejściowego, a następnie kopiuje dane do ujścia, a na koniec czyści dane tymczasowe z magazynu przejściowego. Zobacz Kopiowanie etapowe, aby uzyskać szczegółowe informacje na temat kopiowania danych przy użyciu przemieszczania.
Aby użyć tej funkcji, utwórz połączoną usługę Azure Blob Storage, która odwołuje się do konta usługi Azure Storage jako tymczasowego przemieszczania. Następnie określ enableStaging
właściwości i stagingSettings
w działanie Kopiuj.
Po określeniu
storageIntegration
w źródle tymczasowe przejściowej usługi Azure Blob Storage powinna być taka, o której mowa w zewnętrznym etapie w usłudze Snowflake. Upewnij się, że utworzysz połączoną usługę Azure Blob Storage z obsługiwanym uwierzytelnianiem i przyznasz co najmniej rolę Współautor danych obiektu blob usługi Storage do jednostki usługi Snowflake w przemieszczaniu kontroli dostępu do usługi Azure Blob Storage (IAM).Jeśli nie określisz
storageIntegration
w źródle, tymczasowa połączona usługa Azure Blob Storage musi używać uwierzytelniania sygnatury dostępu współdzielonego zgodnie z wymaganiami polecenia Snowflake COPY. Upewnij się, że udzielono odpowiedniego uprawnienia dostępu do rozwiązania Snowflake w przejściowym usłudze Azure Blob Storage. Aby dowiedzieć się więcej na ten temat, zobacz ten artykuł.
Przykład:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Podczas wykonywania kopii etapowej z usługi Snowflake kluczowe jest ustawienie zachowania kopiowania ujścia na scalanie plików. To ustawienie gwarantuje, że wszystkie partycjonowane pliki są poprawnie obsługiwane i scalane, uniemożliwiając problem polegający na tym, że skopiowany jest tylko ostatni plik partycjonowany.
Przykładowa konfiguracja
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Uwaga
Nie można ustawić zachowania kopiowania ujścia na scalanie plików może spowodować skopiowanie tylko ostatniego pliku partycjonowanego.
Płatki śniegu jako zlew
Łącznik Snowflake używa polecenia COPY rozwiązania Snowflake do [table] w celu uzyskania najlepszej wydajności. Obsługuje zapisywanie danych w aplikacji Snowflake na platformie Azure.
Jeśli źródłowy magazyn danych i format są natywnie obsługiwane przez polecenie Snowflake COPY, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować ze źródła do usługi Snowflake. Aby uzyskać szczegółowe informacje, zobacz Bezpośrednia kopia do usługi Snowflake. W przeciwnym razie użyj wbudowanej kopii etapowej do usługi Snowflake.
Aby skopiować dane do usługi Snowflake, następujące właściwości są obsługiwane w sekcji ujścia działanie Kopiuj.
Właściwości | Opis | Wymagania |
---|---|---|
type | Właściwość type ujścia działanie Kopiuj ustawiona na SnowflakeV2Sink. | Tak |
preCopyScript | Określ zapytanie SQL dla działanie Kopiuj do uruchomienia przed zapisaniem danych w usłudze Snowflake w każdym uruchomieniu. Użyj tej właściwości, aby wyczyścić wstępnie załadowane dane. | Nie. |
importSettings | Ustawienia zaawansowane używane do zapisywania danych w usłudze Snowflake. Można skonfigurować te obsługiwane przez copy do polecenia, które usługa będzie przechodzić po wywołaniu instrukcji . | Tak |
W obszarze importSettings : |
||
type | Typ polecenia importu, ustawiony na SnowflakeImportCopyCommand. | Tak |
storageIntegration | Określ nazwę integracji magazynu utworzonej w aplikacji Snowflake. Aby zapoznać się z krokami wstępnymi dotyczącymi korzystania z integracji magazynu, zobacz Konfigurowanie integracji magazynu snowflake. | Nie. |
additionalCopyOptions | Dodatkowe opcje kopiowania udostępniane jako słownik par klucz-wartość. Przykłady: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Aby uzyskać więcej informacji, zobacz Snowflake Copy Options (Opcje kopiowania snowflake). | Nie. |
additionalFormatOptions | Dodatkowe opcje formatowania pliku udostępnione do polecenia COPY podane jako słownik par klucz-wartość. Przykłady: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Aby uzyskać więcej informacji, zobacz Snowflake Format Type Options (Opcje typu formatu snowflake). | Nie. |
Uwaga
Upewnij się, że masz uprawnienia do wykonywania następującego polecenia i uzyskiwania dostępu do INFORMATION_SCHEMA schematu i tabeli COLUMNS.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Bezpośrednia kopia do usługi Snowflake
Jeśli źródłowy magazyn danych i format spełniają kryteria opisane w tej sekcji, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować ze źródła do usługi Snowflake. Usługa sprawdza ustawienia i kończy się niepowodzeniem działanie Kopiuj uruchomić, jeśli następujące kryteria nie są spełnione:
Po określeniu
storageIntegration
w ujściu:Źródłowy magazyn danych to usługa Azure Blob Storage, o której mowa na etapie zewnętrznym w usłudze Snowflake. Przed skopiowaniem danych należy wykonać następujące czynności:
Utwórz połączoną usługę Azure Blob Storage dla źródłowej usługi Azure Blob Storage z dowolnymi obsługiwanymi typami uwierzytelniania.
Przyznaj co najmniej rolę Czytelnik danych obiektu blob usługi Storage jednostce usługi Snowflake w źródłowej kontroli dostępu do usługi Azure Blob Storage (IAM).
Jeśli nie określisz
storageIntegration
ujścia:Źródłowa połączona usługa to Azure Blob Storage z uwierzytelnianiem sygnatury dostępu współdzielonego. Jeśli chcesz bezpośrednio skopiować dane z usługi Azure Data Lake Storage Gen2 w następującym obsługiwanym formacie, możesz utworzyć połączoną usługę Azure Blob Storage z uwierzytelnianiem SAS względem konta usługi Azure Data Lake Storage Gen2, aby uniknąć używania kopii etapowej do usługi Snowflake.
Format danych źródłowych to Parquet, Rozdzielany tekst lub JSON z następującymi konfiguracjami:
W przypadku formatu Parquet koder kompresji to None lub Snappy.
W przypadku formatu tekstu rozdzielanego:
rowDelimiter
to \r\n lub dowolny pojedynczy znak. Jeśli ogranicznik wierszy nie jest "\r\n",firstRowAsHeader
musi być fałszywy iskipLineCount
nie jest określony.compression
nie może być kompresją, gzip, bzip2 lub deflate.encodingName
jest pozostawiona jako domyślna lub ustawiona na "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-8"8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".quoteChar
jest cudzysłowem podwójnym, pojedynczym cudzysłowem lub pustym ciągiem (bez cudzysłowu).
W przypadku formatu JSON kopiowanie bezpośrednie obsługuje tylko przypadek, w przypadku którego tabela Snowflake ujścia ma tylko jedną kolumnę, a typ danych tej kolumny to VARIANT, OBJECT lub ARRAY.
compression
nie może być kompresją, gzip, bzip2 lub deflate.encodingName
jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.- Nie określono mapowania kolumn.
W źródle działanie Kopiuj:
additionalColumns
nie jest określony.- Jeśli źródło jest folderem,
recursive
ma wartość true. prefix
, ,modifiedDateTimeStart
modifiedDateTimeEnd
ienablePartitionDiscovery
nie są określone.
Przykład:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Kopia etapowa w usłudze Snowflake
Jeśli źródłowy magazyn danych lub format nie jest natywnie zgodny z poleceniem Snowflake COPY, jak wspomniano w ostatniej sekcji, włącz wbudowaną kopię etapową przy użyciu tymczasowego wystąpienia usługi Azure Blob Storage. Funkcja kopiowania etapowego zapewnia również lepszą przepływność. Usługa automatycznie konwertuje dane w celu spełnienia wymagań dotyczących formatu danych snowflake. Następnie wywołuje polecenie COPY w celu załadowania danych do usługi Snowflake. Na koniec czyści dane tymczasowe z magazynu obiektów blob. Zobacz Kopiowanie etapowe, aby uzyskać szczegółowe informacje na temat kopiowania danych przy użyciu przemieszczania.
Aby użyć tej funkcji, utwórz połączoną usługę Azure Blob Storage, która odwołuje się do konta usługi Azure Storage jako tymczasowego przemieszczania. Następnie określ enableStaging
właściwości i stagingSettings
w działanie Kopiuj.
Po określeniu
storageIntegration
ujścia tymczasowy przejściowy magazyn obiektów blob platformy Azure powinien być tymczasowym etapem zewnętrznym w usłudze Snowflake. Upewnij się, że utworzysz połączoną usługę Azure Blob Storage z dowolnym obsługiwanym uwierzytelnianiem i przyznasz co najmniej rolę Czytelnik danych obiektu blob usługi Storage usługi Snowflake w przejściowej kontroli dostępu do usługi Azure Blob Storage (IAM).Jeśli nie określisz
storageIntegration
ujścia, tymczasowa połączona usługa Azure Blob Storage musi używać uwierzytelniania sygnatury dostępu współdzielonego zgodnie z wymaganiami polecenia Snowflake COPY.
Przykład:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Właściwości przepływu mapowania danych
Podczas przekształcania danych w przepływie danych mapowania można odczytywać dane z tabel i zapisywać je w usłudze Snowflake. Aby uzyskać więcej informacji, zobacz przekształcanie źródła i przekształcanie ujścia w przepływach danych mapowania. Możesz użyć zestawu danych Snowflake lub wbudowanego zestawu danych jako typu źródła i ujścia.
Przekształcanie źródła
W poniższej tabeli wymieniono właściwości obsługiwane przez źródło snowflake. Te właściwości można edytować na karcie Opcje źródła. Łącznik korzysta z wewnętrznego transferu danych snowflake.
Nazwa/nazwisko | opis | Wymagania | Dozwolone wartości | Właściwość skryptu przepływu danych |
---|---|---|---|---|
Table | W przypadku wybrania pozycji Tabela jako danych wejściowych przepływ danych pobierze wszystkie dane z tabeli określonej w zestawie danych Snowflake lub w opcjach źródłowych podczas korzystania z wbudowanego zestawu danych. | Nie. | String | (tylko w przypadku wbudowanego zestawu danych) tableName schemaName |
Query | Jeśli wybierzesz pozycję Zapytanie jako dane wejściowe, wprowadź zapytanie, aby pobrać dane z usługi Snowflake. To ustawienie zastępuje dowolną tabelę wybraną w zestawie danych. Jeśli nazwy schematu, tabeli i kolumn zawierają małe litery, podaj identyfikator obiektu w zapytaniu, np. select * from "schema"."myTable" . |
Nie. | String | zapytanie |
Włączanie wyodrębniania przyrostowego (wersja zapoznawcza) | Użyj tej opcji, aby poinformować usługę ADF o przetwarzaniu tylko wierszy, które uległy zmianie od czasu ostatniego wykonania potoku. | Nie. | Wartość logiczna | enableCdc |
Kolumna przyrostowa | W przypadku korzystania z funkcji wyodrębniania przyrostowego należy wybrać kolumnę daty/godziny/liczbową, której chcesz użyć jako znaku wodnego w tabeli źródłowej. | Nie. | String | waterMarkColumn |
Włączanie rozwiązania Snowflake Change Tracking (wersja zapoznawcza) | Ta opcja umożliwia usłudze ADF wykorzystanie technologii przechwytywania zmian danych usługi Snowflake w celu przetwarzania tylko danych różnicowych od czasu poprzedniego wykonania potoku. Ta opcja automatycznie ładuje dane różnicowe przy użyciu operacji wstawiania, aktualizowania i usuwania wierszy bez konieczności wykonywania żadnych kolumn przyrostowych. | Nie. | Wartość logiczna | enableNativeCdc |
Zmiany netto | W przypadku korzystania ze śledzenia zmian płatka śniegu można użyć tej opcji, aby uzyskać deduplikowane zmienione wiersze lub wyczerpujące zmiany. Zduplikowane zmienione wiersze będą pokazywać tylko najnowsze wersje wierszy, które uległy zmianie od danego punktu w czasie, podczas gdy wyczerpujące zmiany pokażą wszystkie wersje każdego wiersza, które uległy zmianie, w tym te, które zostały usunięte lub zaktualizowane. Jeśli na przykład zaktualizujesz wiersz, zobaczysz wersję usuwania i wersję wstawiania w wyczerpujących zmianach, ale tylko wstawienie wersji w zdeduplikowanych zmienionych wierszach. W zależności od przypadku użycia możesz wybrać opcję, która odpowiada Twoim potrzebom. Domyślna opcja to false, co oznacza wyczerpujące zmiany. | Nie. | Wartość logiczna | netChanges |
Uwzględnij kolumny systemowe | W przypadku korzystania ze śledzenia zmian płatka śniegu można użyć opcji systemColumns, aby kontrolować, czy kolumny strumienia metadanych dostarczone przez usługę Snowflake są uwzględniane lub wykluczone w danych wyjściowych śledzenia zmian. Domyślnie właściwość systemColumns jest ustawiona na wartość true, co oznacza, że kolumny strumienia metadanych są uwzględniane. Możesz ustawić wartość systemColumns na wartość false, jeśli chcesz je wykluczyć. | Nie. | Wartość logiczna | systemColumns |
Rozpocznij czytanie od początku | Ustawienie tej opcji za pomocą wyodrębniania przyrostowego i śledzenia zmian spowoduje, że usługa ADF odczytuje wszystkie wiersze podczas pierwszego wykonywania potoku z włączonym wyodrębnieniem przyrostowym. | Nie. | Wartość logiczna | skipInitialLoad |
Przykłady skryptów źródłowych snowflake
Jeśli używasz zestawu danych Snowflake jako typu źródła, skojarzony skrypt przepływu danych to:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Jeśli używasz wbudowanego zestawu danych, skojarzony skrypt przepływu danych to:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Natywne śledzenie zmian
Usługa Azure Data Factory obsługuje teraz funkcję natywną w rozwiązaniu Snowflake znaną jako śledzenie zmian, która obejmuje śledzenie zmian w postaci dzienników. Ta funkcja płatka śniegu pozwala nam śledzić zmiany w danych w czasie, co ułatwia przyrostowe ładowanie i inspekcję danych. Aby użyć tej funkcji, po włączeniu funkcji przechwytywania zmian i wybraniu śledzenia zmian snowflake tworzymy obiekt strumienia dla tabeli źródłowej, która umożliwia śledzenie zmian w źródłowej tabeli płatków śniegu. Następnie użyjemy klauzuli CHANGES w zapytaniu, aby pobrać tylko nowe lub zaktualizowane dane z tabeli źródłowej. Ponadto zaleca się zaplanowanie potoku tak, aby zmiany zostały zużyte w przedziale czasu przechowywania danych ustawionym dla tabeli źródłowej płatka śniegu, a inny użytkownik może zobaczyć niespójne zachowanie w przechwyconych zmianach.
Przekształcenie ujścia
W poniższej tabeli wymieniono właściwości obsługiwane przez ujście snowflake. Te właściwości można edytować na karcie Ustawienia . W przypadku korzystania z wbudowanego zestawu danych zobaczysz dodatkowe ustawienia, które są takie same jak właściwości opisane w sekcji właściwości zestawu danych. Łącznik korzysta z wewnętrznego transferu danych snowflake.
Nazwa/nazwisko | opis | Wymagania | Dozwolone wartości | Właściwość skryptu przepływu danych |
---|---|---|---|---|
Metoda aktualizacji | Określ, jakie operacje są dozwolone w miejscu docelowym snowflake. Aby zaktualizować, upsert lub usunąć wiersze, do tagowania wierszy dla tych akcji jest wymagane przekształcenie alter wiersza. |
Tak | true lub false |
możliwe do usunięcia możliwość wstawienia możliwe do aktualizacji upsertable |
Kolumny kluczy | W przypadku aktualizacji, operacji upsert i usuwania należy ustawić kolumnę klucza lub kolumny w celu określenia, który wiersz ma zostać zmieniony. | Nie. | Tablica | keys |
Akcja tabeli | Określa, czy należy ponownie utworzyć lub usunąć wszystkie wiersze z tabeli docelowej przed zapisem. - Brak: żadna akcja nie zostanie wykonana w tabeli. - Utwórz ponownie: tabela zostanie porzucona i utworzona ponownie. Wymagane w przypadku dynamicznego tworzenia nowej tabeli. - Obcinanie: wszystkie wiersze z tabeli docelowej zostaną usunięte. |
Nie. | true lub false |
odtworzyć truncate |
Przykłady skryptów ujścia snowflake
W przypadku użycia zestawu danych Snowflake jako typu ujścia skojarzony skrypt przepływu danych to:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Jeśli używasz wbudowanego zestawu danych, skojarzony skrypt przepływu danych to:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Optymalizacja wypychania zapytań
Ustawiając poziom rejestrowania potoku na Brak, wykluczamy przesyłanie metryk transformacji pośredniej, uniemożliwiając potencjalne przeszkody optymalizacjom platformy Spark i włączając optymalizację wypychania zapytań zapewnianą przez usługę Snowflake. Ta optymalizacja wypychania umożliwia znaczne ulepszenia wydajności dla dużych tabel Snowflake z rozbudowanymi zestawami danych.
Uwaga
Nie obsługujemy tabel tymczasowych w usłudze Snowflake, ponieważ są one lokalne dla sesji lub użytkownika, który je tworzy, co czyni je niedostępnymi dla innych sesji i podatnymi na zastępowanie jako zwykłe tabele przez usługę Snowflake. Chociaż snowflake oferuje tabele przejściowe jako alternatywę, które są dostępne globalnie, wymagają ręcznego usuwania, sprzeczne z naszym podstawowym celem korzystania z tabel tymczasowych, które mają na celu uniknięcie wszelkich operacji usuwania w schemacie źródłowym.
Właściwości działania wyszukiwania
Aby uzyskać więcej informacji na temat właściwości, zobacz Działanie wyszukiwania.
Uaktualnianie łącznika snowflake
Aby uaktualnić łącznik Snowflake, możesz przeprowadzić uaktualnienie równoległe lub uaktualnienie w miejscu.
Uaktualnianie równoległe
Aby przeprowadzić uaktualnienie równoległe, wykonaj następujące kroki:
- Utwórz nową połączoną usługę Snowflake i skonfiguruj ją, odwołując się do połączonych właściwości usługi.
- Utwórz zestaw danych na podstawie nowo utworzonej połączonej usługi Snowflake.
- Zastąp nową połączoną usługę i zestaw danych istniejącymi w potokach, które są przeznaczone dla starszych obiektów.
Uaktualnienie w miejscu
Aby przeprowadzić uaktualnienie w miejscu, należy edytować istniejący ładunek połączonej usługi i zaktualizować zestaw danych, aby korzystać z nowej połączonej usługi.
Zaktualizuj typ z Snowflake na SnowflakeV2.
Zmodyfikuj ładunek połączonej usługi ze starszego formatu na nowy wzorzec. Możesz wypełnić każde pole z interfejsu użytkownika po zmianie typu wymienionego powyżej lub zaktualizować ładunek bezpośrednio za pośrednictwem edytora JSON. Zapoznaj się z sekcją Właściwości połączonej usługi w tym artykule, aby zapoznać się z obsługiwanymi właściwościami połączenia. W poniższych przykładach przedstawiono różnice w ładunku dla starszych i nowych połączonych usług Snowflake:
Starsza wersja ładunku JSON połączonej usługi Snowflake:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Nowy ładunek JSON połączonej usługi Snowflake:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Zaktualizuj zestaw danych, aby korzystał z nowej połączonej usługi. Możesz utworzyć nowy zestaw danych na podstawie nowo utworzonej połączonej usługi lub zaktualizować właściwość typu istniejącego zestawu danych z tabeli SnowflakeTable do tabeli SnowflakeV2Table.
Różnice między snowflake i Snowflake (starsza wersja)
Łącznik Snowflake oferuje nowe funkcje i jest zgodny z większością funkcji łącznika Snowflake (starsza wersja). W poniższej tabeli przedstawiono różnice funkcji między usługą Snowflake i Snowflake (starsza wersja).
Snowflake | Snowflake (starsza wersja) |
---|---|
Obsługa uwierzytelniania par podstawowych i kluczowych. | Obsługa uwierzytelniania podstawowego. |
Parametry skryptu nie są obecnie obsługiwane w działaniu skryptu. Alternatywnie użyj wyrażeń dynamicznych dla parametrów skryptu. Aby uzyskać więcej informacji, zobacz Wyrażenia i funkcje w usługach Azure Data Factory i Azure Synapse Analytics. | Obsługa parametrów skryptu w działaniu skryptu. |
Obsługa funkcji BigDecimal w działaniu wyszukiwania. Typ NUMBER, zgodnie z definicją w aplikacji Snowflake, będzie wyświetlany jako ciąg w działaniu Wyszukiwania. Jeśli chcesz ukryć go jako typ liczbowy, możesz użyć parametru potoku z funkcją int lub funkcją zmiennoprzecinkową. Na przykład , int(activity('lookup').output.firstRow.VALUE) float(activity('lookup').output.firstRow.VALUE) |
Funkcja BigDecimal nie jest obsługiwana w działaniu wyszukiwania. |
Właściwości accountIdentifier , warehouse , database schema i role służą do nawiązywania połączenia. |
Właściwość connectionstring służy do nawiązywania połączenia. |
typ danych sygnatury czasowej w elemecie Snowflake jest odczytywany jako typ danych DateTimeOffset w działaniu Lookup i Script. | Typ danych sygnatury czasowej w aplikacji Snowflake jest odczytywany jako typ danych DateTime w działaniu Lookup i Script. Jeśli nadal musisz użyć wartości Datetime jako parametru w potoku po uaktualnieniu łącznika, możesz przekonwertować typ DateTimeOffset na typ DateTime przy użyciu funkcji formatDateTime (zalecane) lub funkcji concat. Na przykład: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE) , concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') |
Powiązana zawartość
Aby uzyskać listę magazynów danych obsługiwanych jako źródła i ujścia według działanie Kopiuj, zobacz obsługiwane magazyny danych i formaty.