Udostępnij za pośrednictwem


Kopiowanie i przekształcanie danych w usłudze Snowflake przy użyciu usługi Azure Data Factory lub Azure Synapse Analytics

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym artykule opisano sposób używania działanie Kopiuj w potokach usługi Azure Data Factory i Azure Synapse do kopiowania danych z i do usługi Snowflake oraz używania Przepływ danych do przekształcania danych w usłudze Snowflake. Aby uzyskać więcej informacji, zobacz artykuł wprowadzający dotyczący usługi Data Factory lub Azure Synapse Analytics.

Ważne

Nowy łącznik Snowflake zapewnia ulepszoną natywną obsługę rozwiązania Snowflake. Jeśli używasz starszego łącznika Snowflake w swoim rozwiązaniu, zalecamy uaktualnienie łącznika Snowflake najwcześniejszą wygodą. Zapoznaj się z tą sekcją , aby uzyskać szczegółowe informacje na temat różnicy między starszą i najnowszą wersją.

Obsługiwane możliwości

Ten łącznik snowflake jest obsługiwany w następujących możliwościach:

Obsługiwane możliwości IR
działanie Kopiuj (źródło/ujście) (1) (2)
Przepływ danych mapowania (źródło/ujście) (1)
Działanie Lookup (1) (2)
Działanie skryptu (1) (2)

(1) Środowisko Azure Integration Runtime (2) Self-hosted Integration Runtime

W przypadku działanie Kopiuj ten łącznik Snowflake obsługuje następujące funkcje:

  • Skopiuj dane z usługi Snowflake, która korzysta z polecenia COPY rozwiązania Snowflake do polecenia [location], aby uzyskać najlepszą wydajność.
  • Skopiuj dane do usługi Snowflake, która korzysta z polecenia COPY firmy Snowflake do polecenia [table], aby uzyskać najlepszą wydajność. Obsługuje ona rozwiązanie Snowflake na platformie Azure.
  • Jeśli serwer proxy jest wymagany do nawiązania połączenia z usługą Snowflake z własnego środowiska Integration Runtime, należy skonfigurować zmienne środowiskowe dla HTTP_PROXY i HTTPS_PROXY na hoście środowiska Integration Runtime.

Wymagania wstępne

Jeśli magazyn danych znajduje się wewnątrz sieci lokalnej, sieci wirtualnej platformy Azure lub chmury prywatnej Amazon Virtual, musisz skonfigurować własne środowisko Integration Runtime , aby się z nim połączyć. Pamiętaj, aby dodać adresy IP używane przez własne środowisko Integration Runtime do listy dozwolonych.

Jeśli magazyn danych jest zarządzaną usługą danych w chmurze, możesz użyć środowiska Azure Integration Runtime. Jeśli dostęp jest ograniczony do adresów IP zatwierdzonych w regułach zapory, możesz dodać adresy IP środowiska Azure Integration Runtime do listy dozwolonych.

Konto snowflake używane do źródła lub ujścia powinno mieć niezbędny USAGE dostęp do bazy danych i dostępu do odczytu/zapisu w schemacie oraz tabel/widoków w nim. Ponadto powinien on również znajdować CREATE STAGE się w schemacie, aby móc utworzyć etap zewnętrzny przy użyciu identyfikatora URI sygnatury dostępu współdzielonego.

Należy ustawić następujące wartości właściwości konta

Właściwości Opis Wymagani Wartość domyślna
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION Określa, czy należy wymagać obiektu integracji magazynu jako poświadczeń w chmurze podczas tworzenia nazwanego etapu zewnętrznego (przy użyciu funkcji CREATE STAGE) w celu uzyskania dostępu do lokalizacji magazynu w chmurze prywatnej. FAŁSZ FAŁSZ
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION Określa, czy należy użyć nazwanego etapu zewnętrznego, który odwołuje się do obiektu integracji magazynu jako poświadczeń chmury podczas ładowania danych z lub zwalniania danych do lokalizacji magazynu w chmurze prywatnej. FAŁSZ FAŁSZ

Aby uzyskać więcej informacji na temat mechanizmów zabezpieczeń sieci i opcji obsługiwanych przez usługę Data Factory, zobacz Strategie dostępu do danych.

Rozpocznij

Aby wykonać działanie Kopiuj za pomocą potoku, możesz użyć jednego z następujących narzędzi lub zestawów SDK:

Tworzenie połączonej usługi z usługą Snowflake przy użyciu interfejsu użytkownika

Wykonaj poniższe kroki, aby utworzyć połączoną usługę snowflake w interfejsie użytkownika witryny Azure Portal.

  1. Przejdź do karty Zarządzanie w obszarze roboczym usługi Azure Data Factory lub Synapse i wybierz pozycję Połączone usługi, a następnie kliknij pozycję Nowy:

  2. Wyszukaj pozycję Snowflake i wybierz łącznik Snowflake.

    Zrzut ekranu przedstawiający łącznik Snowflake.

  3. Skonfiguruj szczegóły usługi, przetestuj połączenie i utwórz nową połączoną usługę.

    Zrzut ekranu przedstawiający połączoną konfigurację usługi snowflake.

Szczegóły konfiguracji łącznika

Poniższe sekcje zawierają szczegółowe informacje o właściwościach definiujących jednostki specyficzne dla łącznika usługi Snowflake.

Właściwości połączonej usługi

Te właściwości ogólne są obsługiwane w przypadku połączonej usługi Snowflake:

Właściwości Opis Wymagania
type Właściwość type musi być ustawiona na SnowflakeV2. Tak
accountIdentifier Nazwa konta wraz z jego organizacją. Na przykład myorg-account123. Tak
database Domyślna baza danych używana na potrzeby sesji po nawiązaniu połączenia. Tak
magazyn Domyślny magazyn wirtualny używany na potrzeby sesji po nawiązaniu połączenia. Tak
authenticationType Typ uwierzytelniania używanego do nawiązywania połączenia z usługą Snowflake. Dozwolone wartości to: Basic (Default) i KeyPair. Zapoznaj się z odpowiednimi sekcjami poniżej, aby uzyskać więcej właściwości i przykładów. Nie.
role Domyślna rola zabezpieczeń używana dla sesji po nawiązaniu połączenia. Nie.
host Nazwa hosta konta snowflake. Na przykład: contoso.snowflakecomputing.com. .cn jest również obsługiwany. Nie.
connectVia Środowisko Integration Runtime używane do nawiązywania połączenia z magazynem danych. Możesz użyć środowiska Azure Integration Runtime lub własnego środowiska Integration Runtime (jeśli magazyn danych znajduje się w sieci prywatnej). Jeśli nie zostanie określony, używa domyślnego środowiska Azure Integration Runtime. Nie.

Ten łącznik snowflake obsługuje następujące typy uwierzytelniania. Aby uzyskać szczegółowe informacje, zobacz odpowiednie sekcje.

Uwierzytelnianie podstawowe

Aby użyć uwierzytelniania podstawowego , oprócz właściwości ogólnych opisanych w poprzedniej sekcji, określ następujące właściwości:

Właściwości Opis Wymagania
Użytkownik Nazwa logowania użytkownika snowflake. Tak
hasło Hasło użytkownika snowflake. Oznacz to pole jako typ SecureString, aby przechowywać je bezpiecznie. Możesz również odwołać się do wpisu tajnego przechowywanego w usłudze Azure Key Vault. Tak

Przykład:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Hasło w usłudze Azure Key Vault:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Uwaga

Mapowanie Przepływ danych obsługuje tylko uwierzytelnianie podstawowe.

Uwierzytelnianie pary kluczy

Aby użyć uwierzytelniania par kluczy, należy skonfigurować i utworzyć użytkownika uwierzytelniania pary kluczy w usłudze Snowflake, odwołując się do uwierzytelniania par kluczy i rotacji par kluczy. Następnie zanotuj klucz prywatny i hasło (opcjonalnie), które służy do definiowania połączonej usługi.

Oprócz właściwości ogólnych opisanych w poprzedniej sekcji określ następujące właściwości:

Właściwości Opis Wymagania
Użytkownik Nazwa logowania użytkownika snowflake. Tak
privateKey Klucz prywatny używany do uwierzytelniania pary kluczy.

Aby upewnić się, że klucz prywatny jest prawidłowy podczas wysyłania do usługi Azure Data Factory i biorąc pod uwagę, że plik privateKey zawiera znaki nowego wiersza (\n), ważne jest, aby poprawnie sformatować zawartość privateKey w postaci literału ciągu. Ten proces obejmuje jawne dodanie \n do każdego nowego wiersza.
Tak
privateKeyPassphrase Hasło używane do odszyfrowywania klucza prywatnego, jeśli jest zaszyfrowane. Nie.

Przykład:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Właściwości zestawu danych

Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania zestawów danych, zobacz artykuł Zestawy danych.

Następujące właściwości są obsługiwane dla zestawu danych Snowflake.

Właściwości Opis Wymagania
type Właściwość type zestawu danych musi być ustawiona na SnowflakeV2Table. Tak
schema Nazwa schematu. Zwróć uwagę, że w nazwie schematu jest uwzględniana wielkość liter. Nie dla źródła, tak dla ujścia
table Nazwa tabeli/widoku. Zwróć uwagę, że w nazwie tabeli jest rozróżniana wielkość liter. Nie dla źródła, tak dla ujścia

Przykład:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

Właściwości działania kopiowania

Aby uzyskać pełną listę sekcji i właściwości dostępnych do definiowania działań, zobacz artykuł Pipelines (Potoki ). Ta sekcja zawiera listę właściwości obsługiwanych przez źródło i ujście snowflake.

Płatk śniegu jako źródło

Łącznik Snowflake używa polecenia COPY rozwiązania Snowflake do [location] w celu uzyskania najlepszej wydajności.

Jeśli magazyn danych ujścia i format są natywnie obsługiwane przez polecenie Snowflake COPY, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować z snowflake do ujścia. Aby uzyskać szczegółowe informacje, zobacz Bezpośrednia kopia z usługi Snowflake. W przeciwnym razie użyj wbudowanej kopii etapowej z usługi Snowflake.

Aby skopiować dane z usługi Snowflake, w sekcji źródła działanie Kopiuj są obsługiwane następujące właściwości.

Właściwości Opis Wymagania
type Właściwość type źródła działanie Kopiuj musi być ustawiona na SnowflakeV2Source. Tak
zapytanie Określa zapytanie SQL do odczytu danych z usługi Snowflake. Jeśli nazwy schematu, tabeli i kolumn zawierają małe litery, podaj identyfikator obiektu w zapytaniu, np. select * from "schema"."myTable".
Wykonywanie procedury składowanej nie jest obsługiwane.
Nie.
exportSettings Ustawienia zaawansowane używane do pobierania danych z usługi Snowflake. Można skonfigurować te obsługiwane przez copy do polecenia, które usługa będzie przechodzić po wywołaniu instrukcji . Tak
W obszarze exportSettings:
type Typ polecenia eksportu, ustawiony na SnowflakeExportCopyCommand. Tak
storageIntegration Określ nazwę integracji magazynu utworzonej w aplikacji Snowflake. Aby zapoznać się z krokami wstępnymi dotyczącymi korzystania z integracji magazynu, zobacz Konfigurowanie integracji magazynu snowflake. Nie.
additionalCopyOptions Dodatkowe opcje kopiowania udostępniane jako słownik par klucz-wartość. Przykłady: MAX_FILE_SIZE, ZASTĄP. Aby uzyskać więcej informacji, zobacz Snowflake Copy Options (Opcje kopiowania snowflake). Nie.
additionalFormatOptions Dodatkowe opcje formatowania pliku udostępniane do polecenia COPY jako słownik par klucz-wartość. Przykłady: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Aby uzyskać więcej informacji, zobacz Snowflake Format Type Options (Opcje typu formatu snowflake). Nie.

Uwaga

Upewnij się, że masz uprawnienia do wykonywania następującego polecenia i uzyskiwania dostępu do INFORMATION_SCHEMA schematu i tabeli COLUMNS.

  • COPY INTO <location>

Bezpośrednia kopia z usługi Snowflake

Jeśli magazyn danych ujścia i format spełniają kryteria opisane w tej sekcji, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować z rozwiązania Snowflake do ujścia. Usługa sprawdza ustawienia i kończy się niepowodzeniem działanie Kopiuj uruchomić, jeśli następujące kryteria nie są spełnione:

  • Po określeniu storageIntegration w źródle:

    Magazyn danych ujścia to usługa Azure Blob Storage, o której mowa na etapie zewnętrznym w rozwiązaniu Snowflake. Przed skopiowaniem danych należy wykonać następujące czynności:

    1. Utwórz połączoną usługę Azure Blob Storage dla ujścia usługi Azure Blob Storage z dowolnymi obsługiwanymi typami uwierzytelniania.

    2. Przyznaj co najmniej rolę Współautor danych obiektu blob usługi Storage jednostce usługi Snowflake w ujściu kontroli dostępu do usługi Azure Blob Storage (IAM).

  • Jeśli nie określisz storageIntegration w źródle:

    Połączona usługa ujścia to Azure Blob Storage z uwierzytelnianiem sygnatury dostępu współdzielonego. Jeśli chcesz bezpośrednio skopiować dane do usługi Azure Data Lake Storage Gen2 w następującym obsługiwanym formacie, możesz utworzyć połączoną usługę Azure Blob Storage z uwierzytelnianiem SAS względem konta usługi Azure Data Lake Storage Gen2, aby uniknąć używania kopii etapowej z usługi Snowflake.

  • Format danych ujścia to Parquet, tekst rozdzielany lub JSON z następującymi konfiguracjami:

    • W przypadku formatu Parquet koder kompresji to None, Snappy lub Lzo.
    • W przypadku formatu tekstu rozdzielanego:
      • rowDelimiter to \r\n lub dowolny pojedynczy znak.
      • compressionnie może być kompresją, gzip, bzip2 lub deflate.
      • encodingName jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.
      • quoteCharjest cudzysłowem podwójnym, pojedynczym cudzysłowem lub pustym ciągiem (bez cudzysłowu).
    • W przypadku formatu JSON kopiowanie bezpośrednie obsługuje tylko przypadek, w przypadku którego źródłowa tabela Snowflake lub wynik zapytania zawiera tylko jedną kolumnę, a typ danych tej kolumny to VARIANT, OBJECT lub ARRAY.
      • compressionnie może być kompresją, gzip, bzip2 lub deflate.
      • encodingName jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.
      • filePattern w ujściu działania kopiowania jest pozostawiona jako domyślna lub ustawiona na wartość setOfObjects.
  • W źródle additionalColumns działania kopiowania nie jest określony.

  • Nie określono mapowania kolumn.

Przykład:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "query": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Kopia etapowa z usługi Snowflake

Jeśli magazyn danych ujścia lub format nie jest natywnie zgodny z poleceniem Snowflake COPY, jak wspomniano w ostatniej sekcji, włącz wbudowaną kopię etapową przy użyciu tymczasowego wystąpienia usługi Azure Blob Storage. Funkcja kopiowania etapowego zapewnia również lepszą przepływność. Usługa eksportuje dane z usługi Snowflake do magazynu przejściowego, a następnie kopiuje dane do ujścia, a na koniec czyści dane tymczasowe z magazynu przejściowego. Zobacz Kopiowanie etapowe, aby uzyskać szczegółowe informacje na temat kopiowania danych przy użyciu przemieszczania.

Aby użyć tej funkcji, utwórz połączoną usługę Azure Blob Storage, która odwołuje się do konta usługi Azure Storage jako tymczasowego przemieszczania. Następnie określ enableStaging właściwości i stagingSettings w działanie Kopiuj.

  • Po określeniu storageIntegration w źródle tymczasowe przejściowej usługi Azure Blob Storage powinna być taka, o której mowa w zewnętrznym etapie w usłudze Snowflake. Upewnij się, że utworzysz połączoną usługę Azure Blob Storage z obsługiwanym uwierzytelnianiem i przyznasz co najmniej rolę Współautor danych obiektu blob usługi Storage do jednostki usługi Snowflake w przemieszczaniu kontroli dostępu do usługi Azure Blob Storage (IAM).

  • Jeśli nie określisz storageIntegration w źródle, tymczasowa połączona usługa Azure Blob Storage musi używać uwierzytelniania sygnatury dostępu współdzielonego zgodnie z wymaganiami polecenia Snowflake COPY. Upewnij się, że udzielono odpowiedniego uprawnienia dostępu do rozwiązania Snowflake w przejściowym usłudze Azure Blob Storage. Aby dowiedzieć się więcej na ten temat, zobacz ten artykuł.

Przykład:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "query": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"                    
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Podczas wykonywania kopii etapowej z usługi Snowflake kluczowe jest ustawienie zachowania kopiowania ujścia na scalanie plików. To ustawienie gwarantuje, że wszystkie partycjonowane pliki są poprawnie obsługiwane i scalane, uniemożliwiając problem polegający na tym, że skopiowany jest tylko ostatni plik partycjonowany.

Przykładowa konfiguracja

{
    "type": "Copy",
    "source": {
        "type": "SnowflakeSource",
        "query": "SELECT * FROM my_table"
    },
    "sink": {
        "type": "AzureBlobStorage",
        "copyBehavior": "MergeFiles"
    }
}

Uwaga

Nie można ustawić zachowania kopiowania ujścia na scalanie plików może spowodować skopiowanie tylko ostatniego pliku partycjonowanego.

Płatki śniegu jako zlew

Łącznik Snowflake używa polecenia COPY rozwiązania Snowflake do [table] w celu uzyskania najlepszej wydajności. Obsługuje zapisywanie danych w aplikacji Snowflake na platformie Azure.

Jeśli źródłowy magazyn danych i format są natywnie obsługiwane przez polecenie Snowflake COPY, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować ze źródła do usługi Snowflake. Aby uzyskać szczegółowe informacje, zobacz Bezpośrednia kopia do usługi Snowflake. W przeciwnym razie użyj wbudowanej kopii etapowej do usługi Snowflake.

Aby skopiować dane do usługi Snowflake, następujące właściwości są obsługiwane w sekcji ujścia działanie Kopiuj.

Właściwości Opis Wymagania
type Właściwość type ujścia działanie Kopiuj ustawiona na SnowflakeV2Sink. Tak
preCopyScript Określ zapytanie SQL dla działanie Kopiuj do uruchomienia przed zapisaniem danych w usłudze Snowflake w każdym uruchomieniu. Użyj tej właściwości, aby wyczyścić wstępnie załadowane dane. Nie.
importSettings Ustawienia zaawansowane używane do zapisywania danych w usłudze Snowflake. Można skonfigurować te obsługiwane przez copy do polecenia, które usługa będzie przechodzić po wywołaniu instrukcji . Tak
W obszarze importSettings:
type Typ polecenia importu, ustawiony na SnowflakeImportCopyCommand. Tak
storageIntegration Określ nazwę integracji magazynu utworzonej w aplikacji Snowflake. Aby zapoznać się z krokami wstępnymi dotyczącymi korzystania z integracji magazynu, zobacz Konfigurowanie integracji magazynu snowflake. Nie.
additionalCopyOptions Dodatkowe opcje kopiowania udostępniane jako słownik par klucz-wartość. Przykłady: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Aby uzyskać więcej informacji, zobacz Snowflake Copy Options (Opcje kopiowania snowflake). Nie.
additionalFormatOptions Dodatkowe opcje formatowania pliku udostępnione do polecenia COPY podane jako słownik par klucz-wartość. Przykłady: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Aby uzyskać więcej informacji, zobacz Snowflake Format Type Options (Opcje typu formatu snowflake). Nie.

Uwaga

Upewnij się, że masz uprawnienia do wykonywania następującego polecenia i uzyskiwania dostępu do INFORMATION_SCHEMA schematu i tabeli COLUMNS.

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Bezpośrednia kopia do usługi Snowflake

Jeśli źródłowy magazyn danych i format spełniają kryteria opisane w tej sekcji, możesz użyć działanie Kopiuj, aby bezpośrednio skopiować ze źródła do usługi Snowflake. Usługa sprawdza ustawienia i kończy się niepowodzeniem działanie Kopiuj uruchomić, jeśli następujące kryteria nie są spełnione:

  • Po określeniu storageIntegration w ujściu:

    Źródłowy magazyn danych to usługa Azure Blob Storage, o której mowa na etapie zewnętrznym w usłudze Snowflake. Przed skopiowaniem danych należy wykonać następujące czynności:

    1. Utwórz połączoną usługę Azure Blob Storage dla źródłowej usługi Azure Blob Storage z dowolnymi obsługiwanymi typami uwierzytelniania.

    2. Przyznaj co najmniej rolę Czytelnik danych obiektu blob usługi Storage jednostce usługi Snowflake w źródłowej kontroli dostępu do usługi Azure Blob Storage (IAM).

  • Jeśli nie określisz storageIntegration ujścia:

    Źródłowa połączona usługa to Azure Blob Storage z uwierzytelnianiem sygnatury dostępu współdzielonego. Jeśli chcesz bezpośrednio skopiować dane z usługi Azure Data Lake Storage Gen2 w następującym obsługiwanym formacie, możesz utworzyć połączoną usługę Azure Blob Storage z uwierzytelnianiem SAS względem konta usługi Azure Data Lake Storage Gen2, aby uniknąć używania kopii etapowej do usługi Snowflake.

  • Format danych źródłowych to Parquet, Rozdzielany tekst lub JSON z następującymi konfiguracjami:

    • W przypadku formatu Parquet koder kompresji to None lub Snappy.

    • W przypadku formatu tekstu rozdzielanego:

      • rowDelimiter to \r\n lub dowolny pojedynczy znak. Jeśli ogranicznik wierszy nie jest "\r\n", firstRowAsHeader musi być fałszywy i skipLineCount nie jest określony.
      • compressionnie może być kompresją, gzip, bzip2 lub deflate.
      • encodingName jest pozostawiona jako domyślna lub ustawiona na "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-8"8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".
      • quoteCharjest cudzysłowem podwójnym, pojedynczym cudzysłowem lub pustym ciągiem (bez cudzysłowu).
    • W przypadku formatu JSON kopiowanie bezpośrednie obsługuje tylko przypadek, w przypadku którego tabela Snowflake ujścia ma tylko jedną kolumnę, a typ danych tej kolumny to VARIANT, OBJECT lub ARRAY.

      • compressionnie może być kompresją, gzip, bzip2 lub deflate.
      • encodingName jest pozostawiona jako domyślna lub ustawiona na wartość utf-8.
      • Nie określono mapowania kolumn.
  • W źródle działanie Kopiuj:

    • additionalColumns nie jest określony.
    • Jeśli źródło jest folderem, recursive ma wartość true.
    • prefix, , modifiedDateTimeStartmodifiedDateTimeEndi enablePartitionDiscovery nie są określone.

Przykład:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            }
        }
    }
]

Kopia etapowa w usłudze Snowflake

Jeśli źródłowy magazyn danych lub format nie jest natywnie zgodny z poleceniem Snowflake COPY, jak wspomniano w ostatniej sekcji, włącz wbudowaną kopię etapową przy użyciu tymczasowego wystąpienia usługi Azure Blob Storage. Funkcja kopiowania etapowego zapewnia również lepszą przepływność. Usługa automatycznie konwertuje dane w celu spełnienia wymagań dotyczących formatu danych snowflake. Następnie wywołuje polecenie COPY w celu załadowania danych do usługi Snowflake. Na koniec czyści dane tymczasowe z magazynu obiektów blob. Zobacz Kopiowanie etapowe, aby uzyskać szczegółowe informacje na temat kopiowania danych przy użyciu przemieszczania.

Aby użyć tej funkcji, utwórz połączoną usługę Azure Blob Storage, która odwołuje się do konta usługi Azure Storage jako tymczasowego przemieszczania. Następnie określ enableStaging właściwości i stagingSettings w działanie Kopiuj.

  • Po określeniu storageIntegration ujścia tymczasowy przejściowy magazyn obiektów blob platformy Azure powinien być tymczasowym etapem zewnętrznym w usłudze Snowflake. Upewnij się, że utworzysz połączoną usługę Azure Blob Storage z dowolnym obsługiwanym uwierzytelnianiem i przyznasz co najmniej rolę Czytelnik danych obiektu blob usługi Storage usługi Snowflake w przejściowej kontroli dostępu do usługi Azure Blob Storage (IAM).

  • Jeśli nie określisz storageIntegration ujścia, tymczasowa połączona usługa Azure Blob Storage musi używać uwierzytelniania sygnatury dostępu współdzielonego zgodnie z wymaganiami polecenia Snowflake COPY.

Przykład:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Właściwości przepływu mapowania danych

Podczas przekształcania danych w przepływie danych mapowania można odczytywać dane z tabel i zapisywać je w usłudze Snowflake. Aby uzyskać więcej informacji, zobacz przekształcanie źródła i przekształcanie ujścia w przepływach danych mapowania. Możesz użyć zestawu danych Snowflake lub wbudowanego zestawu danych jako typu źródła i ujścia.

Przekształcanie źródła

W poniższej tabeli wymieniono właściwości obsługiwane przez źródło snowflake. Te właściwości można edytować na karcie Opcje źródła. Łącznik korzysta z wewnętrznego transferu danych snowflake.

Nazwa/nazwisko opis Wymagania Dozwolone wartości Właściwość skryptu przepływu danych
Table W przypadku wybrania pozycji Tabela jako danych wejściowych przepływ danych pobierze wszystkie dane z tabeli określonej w zestawie danych Snowflake lub w opcjach źródłowych podczas korzystania z wbudowanego zestawu danych. Nie. String (tylko w przypadku wbudowanego zestawu danych)
tableName
schemaName
Query Jeśli wybierzesz pozycję Zapytanie jako dane wejściowe, wprowadź zapytanie, aby pobrać dane z usługi Snowflake. To ustawienie zastępuje dowolną tabelę wybraną w zestawie danych.
Jeśli nazwy schematu, tabeli i kolumn zawierają małe litery, podaj identyfikator obiektu w zapytaniu, np. select * from "schema"."myTable".
Nie. String zapytanie
Włączanie wyodrębniania przyrostowego (wersja zapoznawcza) Użyj tej opcji, aby poinformować usługę ADF o przetwarzaniu tylko wierszy, które uległy zmianie od czasu ostatniego wykonania potoku. Nie. Wartość logiczna enableCdc
Kolumna przyrostowa W przypadku korzystania z funkcji wyodrębniania przyrostowego należy wybrać kolumnę daty/godziny/liczbową, której chcesz użyć jako znaku wodnego w tabeli źródłowej. Nie. String waterMarkColumn
Włączanie rozwiązania Snowflake Change Tracking (wersja zapoznawcza) Ta opcja umożliwia usłudze ADF wykorzystanie technologii przechwytywania zmian danych usługi Snowflake w celu przetwarzania tylko danych różnicowych od czasu poprzedniego wykonania potoku. Ta opcja automatycznie ładuje dane różnicowe przy użyciu operacji wstawiania, aktualizowania i usuwania wierszy bez konieczności wykonywania żadnych kolumn przyrostowych. Nie. Wartość logiczna enableNativeCdc
Zmiany netto W przypadku korzystania ze śledzenia zmian płatka śniegu można użyć tej opcji, aby uzyskać deduplikowane zmienione wiersze lub wyczerpujące zmiany. Zduplikowane zmienione wiersze będą pokazywać tylko najnowsze wersje wierszy, które uległy zmianie od danego punktu w czasie, podczas gdy wyczerpujące zmiany pokażą wszystkie wersje każdego wiersza, które uległy zmianie, w tym te, które zostały usunięte lub zaktualizowane. Jeśli na przykład zaktualizujesz wiersz, zobaczysz wersję usuwania i wersję wstawiania w wyczerpujących zmianach, ale tylko wstawienie wersji w zdeduplikowanych zmienionych wierszach. W zależności od przypadku użycia możesz wybrać opcję, która odpowiada Twoim potrzebom. Domyślna opcja to false, co oznacza wyczerpujące zmiany. Nie. Wartość logiczna netChanges
Uwzględnij kolumny systemowe W przypadku korzystania ze śledzenia zmian płatka śniegu można użyć opcji systemColumns, aby kontrolować, czy kolumny strumienia metadanych dostarczone przez usługę Snowflake są uwzględniane lub wykluczone w danych wyjściowych śledzenia zmian. Domyślnie właściwość systemColumns jest ustawiona na wartość true, co oznacza, że kolumny strumienia metadanych są uwzględniane. Możesz ustawić wartość systemColumns na wartość false, jeśli chcesz je wykluczyć. Nie. Wartość logiczna systemColumns
Rozpocznij czytanie od początku Ustawienie tej opcji za pomocą wyodrębniania przyrostowego i śledzenia zmian spowoduje, że usługa ADF odczytuje wszystkie wiersze podczas pierwszego wykonywania potoku z włączonym wyodrębnieniem przyrostowym. Nie. Wartość logiczna skipInitialLoad

Przykłady skryptów źródłowych snowflake

Jeśli używasz zestawu danych Snowflake jako typu źródła, skojarzony skrypt przepływu danych to:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

Jeśli używasz wbudowanego zestawu danych, skojarzony skrypt przepływu danych to:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

Natywne śledzenie zmian

Usługa Azure Data Factory obsługuje teraz funkcję natywną w rozwiązaniu Snowflake znaną jako śledzenie zmian, która obejmuje śledzenie zmian w postaci dzienników. Ta funkcja płatka śniegu pozwala nam śledzić zmiany w danych w czasie, co ułatwia przyrostowe ładowanie i inspekcję danych. Aby użyć tej funkcji, po włączeniu funkcji przechwytywania zmian i wybraniu śledzenia zmian snowflake tworzymy obiekt strumienia dla tabeli źródłowej, która umożliwia śledzenie zmian w źródłowej tabeli płatków śniegu. Następnie użyjemy klauzuli CHANGES w zapytaniu, aby pobrać tylko nowe lub zaktualizowane dane z tabeli źródłowej. Ponadto zaleca się zaplanowanie potoku tak, aby zmiany zostały zużyte w przedziale czasu przechowywania danych ustawionym dla tabeli źródłowej płatka śniegu, a inny użytkownik może zobaczyć niespójne zachowanie w przechwyconych zmianach.

Przekształcenie ujścia

W poniższej tabeli wymieniono właściwości obsługiwane przez ujście snowflake. Te właściwości można edytować na karcie Ustawienia . W przypadku korzystania z wbudowanego zestawu danych zobaczysz dodatkowe ustawienia, które są takie same jak właściwości opisane w sekcji właściwości zestawu danych. Łącznik korzysta z wewnętrznego transferu danych snowflake.

Nazwa/nazwisko opis Wymagania Dozwolone wartości Właściwość skryptu przepływu danych
Metoda aktualizacji Określ, jakie operacje są dozwolone w miejscu docelowym snowflake.
Aby zaktualizować, upsert lub usunąć wiersze, do tagowania wierszy dla tych akcji jest wymagane przekształcenie alter wiersza.
Tak true lub false możliwe do usunięcia
możliwość wstawienia
możliwe do aktualizacji
upsertable
Kolumny kluczy W przypadku aktualizacji, operacji upsert i usuwania należy ustawić kolumnę klucza lub kolumny w celu określenia, który wiersz ma zostać zmieniony. Nie. Tablica keys
Akcja tabeli Określa, czy należy ponownie utworzyć lub usunąć wszystkie wiersze z tabeli docelowej przed zapisem.
- Brak: żadna akcja nie zostanie wykonana w tabeli.
- Utwórz ponownie: tabela zostanie porzucona i utworzona ponownie. Wymagane w przypadku dynamicznego tworzenia nowej tabeli.
- Obcinanie: wszystkie wiersze z tabeli docelowej zostaną usunięte.
Nie. true lub false odtworzyć
truncate

Przykłady skryptów ujścia snowflake

W przypadku użycia zestawu danych Snowflake jako typu ujścia skojarzony skrypt przepływu danych to:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Jeśli używasz wbudowanego zestawu danych, skojarzony skrypt przepływu danych to:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Optymalizacja wypychania zapytań

Ustawiając poziom rejestrowania potoku na Brak, wykluczamy przesyłanie metryk transformacji pośredniej, uniemożliwiając potencjalne przeszkody optymalizacjom platformy Spark i włączając optymalizację wypychania zapytań zapewnianą przez usługę Snowflake. Ta optymalizacja wypychania umożliwia znaczne ulepszenia wydajności dla dużych tabel Snowflake z rozbudowanymi zestawami danych.

Uwaga

Nie obsługujemy tabel tymczasowych w usłudze Snowflake, ponieważ są one lokalne dla sesji lub użytkownika, który je tworzy, co czyni je niedostępnymi dla innych sesji i podatnymi na zastępowanie jako zwykłe tabele przez usługę Snowflake. Chociaż snowflake oferuje tabele przejściowe jako alternatywę, które są dostępne globalnie, wymagają ręcznego usuwania, sprzeczne z naszym podstawowym celem korzystania z tabel tymczasowych, które mają na celu uniknięcie wszelkich operacji usuwania w schemacie źródłowym.

Właściwości działania wyszukiwania

Aby uzyskać więcej informacji na temat właściwości, zobacz Działanie wyszukiwania.

Uaktualnianie łącznika snowflake

Aby uaktualnić łącznik Snowflake, możesz przeprowadzić uaktualnienie równoległe lub uaktualnienie w miejscu.

Uaktualnianie równoległe

Aby przeprowadzić uaktualnienie równoległe, wykonaj następujące kroki:

  1. Utwórz nową połączoną usługę Snowflake i skonfiguruj ją, odwołując się do połączonych właściwości usługi.
  2. Utwórz zestaw danych na podstawie nowo utworzonej połączonej usługi Snowflake.
  3. Zastąp nową połączoną usługę i zestaw danych istniejącymi w potokach, które są przeznaczone dla starszych obiektów.

Uaktualnienie w miejscu

Aby przeprowadzić uaktualnienie w miejscu, należy edytować istniejący ładunek połączonej usługi i zaktualizować zestaw danych, aby korzystać z nowej połączonej usługi.

  1. Zaktualizuj typ z Snowflake na SnowflakeV2.

  2. Zmodyfikuj ładunek połączonej usługi ze starszego formatu na nowy wzorzec. Możesz wypełnić każde pole z interfejsu użytkownika po zmianie typu wymienionego powyżej lub zaktualizować ładunek bezpośrednio za pośrednictwem edytora JSON. Zapoznaj się z sekcją Właściwości połączonej usługi w tym artykule, aby zapoznać się z obsługiwanymi właściwościami połączenia. W poniższych przykładach przedstawiono różnice w ładunku dla starszych i nowych połączonych usług Snowflake:

    Starsza wersja ładunku JSON połączonej usługi Snowflake:

      {
         "name": "Snowflake1",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "annotations": [],
             "type": "Snowflake",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC",
                 "encryptedCredential": "<your_encrypted_credential_value>"
             },
             "connectVia": {
                 "referenceName": "AzureIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    

    Nowy ładunek JSON połączonej usługi Snowflake:

     {
         "name": "Snowflake2",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "parameters": {
                 "schema": {
                     "type": "string",
                     "defaultValue": "PUBLIC"
                 }
             },
             "annotations": [],
             "type": "SnowflakeV2",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "accountIdentifier": "<FAKE_Account>",
                 "user": "FAKE_USER",
                 "database": "FAKE_DB",
                 "warehouse": "FAKE_DW",
                 "encryptedCredential": "<placeholder>"
             },
             "connectVia": {
                 "referenceName": "AutoResolveIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    
  3. Zaktualizuj zestaw danych, aby korzystał z nowej połączonej usługi. Możesz utworzyć nowy zestaw danych na podstawie nowo utworzonej połączonej usługi lub zaktualizować właściwość typu istniejącego zestawu danych z tabeli SnowflakeTable do tabeli SnowflakeV2Table.

Różnice między snowflake i Snowflake (starsza wersja)

Łącznik Snowflake oferuje nowe funkcje i jest zgodny z większością funkcji łącznika Snowflake (starsza wersja). W poniższej tabeli przedstawiono różnice funkcji między usługą Snowflake i Snowflake (starsza wersja).

Snowflake Snowflake (starsza wersja)
Obsługa uwierzytelniania par podstawowych i kluczowych. Obsługa uwierzytelniania podstawowego.
Parametry skryptu nie są obecnie obsługiwane w działaniu skryptu. Alternatywnie użyj wyrażeń dynamicznych dla parametrów skryptu. Aby uzyskać więcej informacji, zobacz Wyrażenia i funkcje w usługach Azure Data Factory i Azure Synapse Analytics. Obsługa parametrów skryptu w działaniu skryptu.
Obsługa funkcji BigDecimal w działaniu wyszukiwania. Typ NUMBER, zgodnie z definicją w aplikacji Snowflake, będzie wyświetlany jako ciąg w działaniu Wyszukiwania. Jeśli chcesz ukryć go jako typ liczbowy, możesz użyć parametru potoku z funkcją int lub funkcją zmiennoprzecinkową. Na przykład , int(activity('lookup').output.firstRow.VALUE)float(activity('lookup').output.firstRow.VALUE) Funkcja BigDecimal nie jest obsługiwana w działaniu wyszukiwania.
Właściwości accountIdentifier, warehouse, databaseschema i role służą do nawiązywania połączenia. Właściwość connectionstring służy do nawiązywania połączenia.
typ danych sygnatury czasowej w elemecie Snowflake jest odczytywany jako typ danych DateTimeOffset w działaniu Lookup i Script. Typ danych sygnatury czasowej w aplikacji Snowflake jest odczytywany jako typ danych DateTime w działaniu Lookup i Script.
Jeśli nadal musisz użyć wartości Datetime jako parametru w potoku po uaktualnieniu łącznika, możesz przekonwertować typ DateTimeOffset na typ DateTime przy użyciu funkcji formatDateTime (zalecane) lub funkcji concat. Na przykład: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE), concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z')

Aby uzyskać listę magazynów danych obsługiwanych jako źródła i ujścia według działanie Kopiuj, zobacz obsługiwane magazyny danych i formaty.