Azure Data Factory または Azure Synapse Analytics を使用して、Snowflake のデータをコピーして変換します。
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
この記事では、Azure Data Factory および Azure Synapse パイプラインのコピー アクティビティを使用して、Snowflake との間でデータをコピーし合い、Data Flow を使用して Snowflake のデータを変換する方法について説明します。 詳細については、Data Factory または Azure Synapse Analytics の概要記事を参照してください。
重要
新しい Snowflake コネクタでは、ネイティブ Snowflake のサポートが強化されています。 ソリューション内でレガシ Snowflake コネクタを使用している場合は、できるだけ早期に Snowflake コネクタをアップグレードすることが推奨されます。 レガシ バージョンと最新バージョンの違いの詳細については、このセクションを参照してください。
サポートされる機能
この Snowflake コネクタは、次の機能でサポートされます。
サポートされる機能 | IR |
---|---|
Copy アクティビティ (ソース/シンク) | ① ② |
マッピング データ フロー (ソース/シンク) | ① |
Lookup アクティビティ | ① ② |
スクリプト活動 | ① ② |
① Azure 統合ランタイム ② セルフホステッド統合ランタイム
コピー アクティビティの場合、この Snowflake コネクタは次の機能をサポートします。
- Snowflake からのデータのコピー。Snowflake の COPY into [location] コマンドを利用して、最適なパフォーマンスを実現します。
- Snowflake へのデータのコピー。Snowflake の COPY into [table] コマンドを利用して、最適なパフォーマンスを実現します。 Azure 上の Snowflake がサポートされています。
- セルフホステッド Integration Runtime から Snowflake に接続するためにプロキシが必要な場合は、Integration Runtime ホストで HTTP_PROXY と HTTPS_PROXY の環境変数を設定する必要があります。
前提条件
データ ストアがオンプレ ミスネットワーク、Azure 仮想ネットワーク、または Amazon Virtual Private Cloud 内にある場合は、それに接続するようセルフホステッド統合ランタイムを構成する必要があります。 セルフホステッド統合ランタイムが使用する IP アドレスを許可リストに追加する必要があります。
データ ストアがマネージド クラウド データ サービスである場合は、Azure Integration Runtime を使用できます。 ファイアウォール規則で承認されている IP にアクセスが制限されている場合は、Azure Integration Runtime の IP を許可リストに追加できます。
ソースまたはシンクに使用される Snowflake アカウントには、データベースに対する必要な USAGE
アクセス権と、スキーマおよびその下のテーブル/ビューに対する読み取り/書き込みアクセス権が必要です。 さらに、SAS URI を使用して外部ステージを作成できるように、スキーマ上に CREATE STAGE
も存在する必要があります。
次のアカウント プロパティの値を設定する必要があります
プロパティ | 内容 | 必要 | Default |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | プライベート クラウド ストレージの場所にアクセスするために (CREATE STAGE を使用して) 名前付き外部ステージを作成するときに、ストレージ統合オブジェクトをクラウド資格情報として要求するかどうかを指定します。 | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | プライベート クラウド ストレージの場所へのデータの読み込みまたはアンロードを行うときに、ストレージ統合オブジェクトをクラウド資格情報として参照する名前付き外部ステージの使用を要求するかどうかを指定します。 | FALSE | FALSE |
Data Factory によってサポートされるネットワーク セキュリティ メカニズムやオプションの詳細については、「データ アクセス戦略」を参照してください。
はじめに
パイプラインでコピー アクティビティを実行するには、次のいずれかのツールまたは SDK を使用します。
UI を使用して Snowflake のリンク サービスを作成する
次の手順を使用して、Azure portal UI で Snowflake のリンク サービスを作成します。
Azure Data Factory または Synapse ワークスペースの [管理] タブに移動し、[リンク サービス] を選択して、[新規] をクリックします。
Snowflake を検索し、Snowflake コネクタを選択します。
サービスの詳細を構成し、接続をテストして、新しいリンク サービスを作成します。
コネクタの構成の詳細
次のセクションでは、Snowflake コネクタに固有のエンティティを定義するプロパティについて詳しく説明します。
リンクされたサービス プロパティ
Snowflake のリンク サービスでは、次の汎用プロパティがサポートされます。
プロパティ | 内容 | 必須 |
---|---|---|
type | type プロパティは SnowflakeV2 に設定する必要があります。 | はい |
accountIdentifier | その組織とアカウントの名前。 たとえば、myorg-account123。 | はい |
database | 接続後にセッションで使用される既定のデータベース。 | はい |
倉庫 | 接続後にセッションで使用される既定の仮想ウェアハウス。 | はい |
authenticationType | Snowflake サービスへの接続に使用される認証の種類。 使用することができる値は、Basic (既定値) と KeyPair です。 それぞれのプロパティとサンプルについては、以下の対応するセクションを参照してください。 | いいえ |
role | 接続後にセッションで使用される既定のセキュリティ ロール。 | いいえ |
connectVia | データ ストアに接続するために使用される統合ランタイム。 Azure 統合ランタイムまたはセルフホステッド統合ランタイムを使用できます (データ ストアがプライベート ネットワークにある場合)。 指定されていない場合は、既定の Azure 統合ランタイムが使用されます。 | いいえ |
この Snowflake コネクタでは、次の認証の種類がサポートされています。 詳細については、対応するセクションをご覧ください。
基本認証
前のセクションで説明した汎用的なプロパティに加えて、基本認証を使用するには、次のプロパティを指定します。
プロパティ | 内容 | 必須 |
---|---|---|
ユーザー | Snowflake ユーザーのログイン名。 | はい |
password | Snowflake ユーザーのパスワード。 安全に保存するには、このフィールドを SecureString 型としてマークします。 Azure Key Vault に格納されているシークレットを参照することもできます。 | はい |
例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Azure Key Vault 内のパスワード:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Note
マッピング データ フローでは、基本認証のみがサポートされます。
キー ペア認証
キー ペア認証を使用するには、「キー ペア認証とキー ペア ローテーション」を参照して、Snowflake 内にキー ペア認証ユーザーを構成および作成する必要があります。 その後、秘密キーとパスフレーズ (省略可能) を書き留めます (リンク サービスを定義するために使用します)。
前のセクションで説明した汎用的なプロパティに加えて、次のプロパティを指定します。
プロパティ | 内容 | 必須 |
---|---|---|
ユーザー | Snowflake ユーザーのログイン名。 | はい |
privateKey | キー ペア認証に使用される秘密キー。 Azure Data Factory に送信される際に秘密キーが有効であることを確認し、privateKey ファイルに改行文字 (\n) が含まれることを考慮するには、privateKey の内容を文字列リテラル形式で正しく書式設定することが不可欠です。 このプロセスでは、各改行に「\n」を明示的に追加します。 |
はい |
privateKeyPassphrase | 秘密キーの暗号化解除に使用されるパスフレーズ (暗号化されている場合)。 | いいえ |
例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
データセットのプロパティ
データセットを定義するために使用できるセクションとプロパティの完全な一覧については、データセットに関する記事をご覧ください。
Snowflake データセットでは、次のプロパティがサポートされます。
プロパティ | 内容 | 必須 |
---|---|---|
type | データセットの type プロパティは SnowflakeV2Table に設定する必要があります。 | はい |
schema | スキーマの名前。 スキーマ名は、大文字と小文字が区別されることに注意してください。 | ソースの場合はいいえ、シンクの場合ははい |
table | テーブル/ビューの名前。 テーブル名は、大文字と小文字が区別されることに注意してください。 | ソースの場合はいいえ、シンクの場合ははい |
例:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
コピー アクティビティのプロパティ
アクティビティの定義に利用できるセクションとプロパティの完全な一覧については、パイプラインに関する記事を参照してください。 このセクションでは、Snowflake のソースとシンクでサポートされるプロパティの一覧を示します。
ソースとしての Snowflake
Snowflake コネクタは、Snowflake の COPY into [location] コマンドを利用して、最適なパフォーマンスを実現します。
シンクのデータ ストアと形式が Snowflake COPY コマンドでネイティブにサポートされている場合は、コピー アクティビティを使用して Snowflake からシンクに直接コピーできます。 詳しくは、「Snowflake から直接コピーする」をご覧ください。 それ以外の場合は、組み込みの Snowflake からのステージング コピーを使用します。
Snowflake からデータをコピーするために、コピー アクティビティの source セクションでは次のプロパティがサポートされています。
プロパティ | 内容 | 必須 |
---|---|---|
type | Copy アクティビティのソースの type プロパティは SnowflakeV2Source に設定する必要があります。 | はい |
query | Snowflake からデータを読み取る SQL クエリを指定します。 スキーマ、テーブル、および列の名前に小文字が含まれている場合は、クエリでオブジェクト識別子を引用符で囲みます (例: select * from "schema"."myTable" )。ストアド プロシージャの実行はサポートされていません。 |
いいえ |
exportSettings | Snowflake からデータを取得するために使用される詳細設定。 COPY into コマンドでサポートされるものを構成できます。これは、ステートメントを呼び出すときにこのサービスによって渡されます。 | はい |
exportSettings の下: |
||
type | エクスポート コマンドの type を SnowflakeExportCopyCommand に設定します。 | はい |
storageIntegration | Snowflake で作成したストレージ統合の名前を指定します。 ストレージ統合を使用するための前提となる手順については、「Snowflake ストレージ統合の構成」を参照してください。 | いいえ |
additionalCopyOptions | 追加のコピー オプション。キーと値のペアのディクショナリとして指定されます。 例 :MAX_FILE_SIZE、OVERWRITE。 詳細については、「Snowflake コピー オプション」を参照してください。 | いいえ |
additionalFormatOptions | キーと値のペアのディクショナリとして COPY コマンドに指定される、追加のファイル形式オプション。 例 :DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 詳細については、「Snowflake 形式の種類のオプション」を参照してください。 | いいえ |
注意
次のコマンドを実行し、INFORMATION_SCHEMA スキーマと COLUMNS テーブルにアクセスするアクセス許可があることを確認します。
COPY INTO <location>
Snowflake から直接コピーする
シンクのデータ ストアと形式がこのセクションで説明する基準を満たす場合は、コピー アクティビティを使用して、Snowflake からシンクに直接コピーできます。 次の条件が満たされていない場合は、サービスが設定をチェックして、Copy アクティビティの実行は失敗します。
ソースに
storageIntegration
を指定する場合:シンク データ ストアは、Snowflake の外部ステージで参照した Azure Blob Storage です。 データをコピーする前に、次の手順を完了する必要があります。
サポートされている認証の種類を使用して、シンクの Azure Blob Storage 用の Azure Blob Storage リンク サービスを作成します。
シンクの Azure Blob Storage のアクセス制御 (IAM) で Snowflake サービス プリンシパルに対し、ストレージ BLOB データ共同作成者以上のロールを付与します。
ソースに
storageIntegration
を指定しない場合:シンクのリンクされたサービスが、Shared Access Signature 認証を使用する Azure Blob Storage です。 サポートされている次の形式で Azure Data Lake Storage Gen2 にデータを直接コピーする場合、Azure Data Lake Storage Gen2 アカウントに対する SAS 認証を使用して Azure Blob Storage リンク サービスを作成すると、Snowflake からのステージング コピーを使用しないようにすることができます。
シンク データ形式が、次のように構成された Parquet、区切りテキスト、または JSON です。
- Parquet 形式の場合は、圧縮コーデックが None、Snappy、または Lzo です。
- 区切りテキスト形式の場合:
rowDelimiter
が \r\n または任意の 1 文字です。compression
が、no compression、gzip、bzip2、または deflate です。encodingName
が既定値のままか、utf-8 に設定されている。quoteChar
が、double quote、single quote、または empty string (引用符なし) です。
- JSON 形式の場合、直接コピーでは、ソースの Snowflake テーブルまたはクエリ結果に 1 つの列しかなく、この列のデータ型が VARIANT、OBJECT、または ARRAY であるケースのみがサポートされます。
compression
が、no compression、gzip、bzip2、または deflate です。encodingName
が既定値のままか、utf-8 に設定されている。- コピー アクティビティのシンクでは、
filePattern
は既定値のままにするか、setOfObjects に設定します。
Copy アクティビティのソース内で、
additionalColumns
が指定されていません。列マッピングが指定されていません。
例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Snowflake からのステージング コピー
前のセクションで説明したように、シンクのデータ ストアまたは形式が Snowflake COPY コマンドとネイティブに互換性がない場合は、中間の Azure Blob Storage インスタンスを使用して組み込みのステージング コピーを有効にします。 ステージング コピー機能はスループットも優れています。 Snowflake のデータをステージング ストレージにエクスポートしてから、データをシンクにコピーし、最後にステージング ストレージの一時データをクリーンアップします。 ステージングを使用したデータのコピーの詳細は、「ステージング コピー」を参照してください。
この機能を使うには、中間ステージとして、Azure ストレージ アカウントを参照する Azure Blob Storage のリンクされたサービスを作成します。 次に、コピー アクティビティに enableStaging
プロパティと stagingSettings
プロパティを指定します。
ソースに
storageIntegration
を指定する場合、中間ステージングの Azure Blob Storage は、Snowflake の外部ステージで参照したものであることが必要です。 それに対し、サポートされている認証を使用して Azure Blob Storage リンク サービスを作成し、ステージングの Azure Blob Storage のアクセス制御 (IAM) で Snowflake サービス プリンシパルに対し、ストレージ BLOB データ共同作成者以上のロールを付与する必要があります。ソースに
storageIntegration
を指定しない場合、ステージングの Azure Blob Storage リンク サービスでは、Snowflake COPY コマンドで必要な、Shared Access Signature 認証を使用する必要があります。 ステージングの Azure Blob Storage で Snowflake に適切なアクセス許可を付与していることを確認します。 この詳細については、こちらの記事を参照してください。
例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Snowflake からステージング コピーを実行するときは、シンク コピーの動作をファイルのマージに設定することが重要です。 この設定により、パーティション分割されたすべてのファイルが正しく処理およびマージされるため、最後にパーティション分割されたファイルのみがコピーされる問題を回避できます。
構成の例
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Note
シンク コピーの動作をファイルのマージに設定しないと、最後にパーティション分割されたファイルのみがコピーされる場合があります。
シンクとしての Snowflake
Snowflake コネクタは、Snowflake の COPY into [table] コマンドを利用して、最適なパフォーマンスを実現します。 Azure での Snowflake へのデータの書き込みがサポートされています。
ソースのデータ ストアと形式が Snowflake COPY コマンドでネイティブにサポートされている場合は、コピー アクティビティを使用してソースから Snowflake に直接コピーできます。 詳しくは、「Snowflake に直接コピーする」をご覧ください。 それ以外の場合は、組み込みの Snowflake へのステージング コピーを使用します。
Snowflake にデータをコピーするために、コピー アクティビティの sink セクションでは次のプロパティがサポートされています。
プロパティ | 内容 | 必須 |
---|---|---|
type | Copy アクティビティの sink の type プロパティは、SnowflakeV2Sink に設定します。 | はい |
preCopyScript | コピー アクティビティの毎回の実行で、データを Snowflake に書き込む前に実行する SQL クエリを指定します。 前に読み込まれたデータをクリーンアップするには、このプロパティを使います。 | いいえ |
importSettings | Snowflake にデータを書き込むために使用される詳細設定。 COPY into コマンドでサポートされるものを構成できます。これは、ステートメントを呼び出すときにこのサービスによって渡されます。 | はい |
importSettings の下: |
||
type | インポート コマンドの type を SnowflakeImportCopyCommand に設定します。 | はい |
storageIntegration | Snowflake で作成したストレージ統合の名前を指定します。 ストレージ統合を使用するための前提となる手順については、「Snowflake ストレージ統合の構成」を参照してください。 | いいえ |
additionalCopyOptions | 追加のコピー オプション。キーと値のペアのディクショナリとして指定されます。 例 :ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 詳細については、「Snowflake コピー オプション」を参照してください。 | いいえ |
additionalFormatOptions | キーと値のペアのディクショナリとして COPY コマンドに指定される、追加のファイル形式オプション。 例 :DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 詳細については、「Snowflake 形式の種類のオプション」を参照してください。 | いいえ |
注意
次のコマンドを実行し、INFORMATION_SCHEMA スキーマと COLUMNS テーブルにアクセスするアクセス許可があることを確認します。
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Snowflake に直接コピーする
ソースのデータ ストアと形式がこのセクションで説明する基準を満たす場合は、コピー アクティビティを使用して、ソースから Snowflake に直接コピーできます。 次の条件が満たされていない場合は、サービスが設定をチェックして、Copy アクティビティの実行は失敗します。
シンクに
storageIntegration
を指定する場合:ソース データ ストアは、Snowflake の外部ステージで参照した Azure Blob Storage です。 データをコピーする前に、次の手順を完了する必要があります。
サポートされている認証の種類を使用して、ソースの Azure Blob Storage 用の Azure Blob Storage リンク サービスを作成します。
ソースの Azure Blob Storage のアクセス制御 (IAM) で Snowflake サービス プリンシパルに対し、ストレージ BLOB データ閲覧者以上のロールを付与します。
シンクに
storageIntegration
を指定しない場合:ソースのリンクされたサービスが、Shared Access Signature 認証を使用する Azure Blob Storage です。 サポートされている次の形式で Azure Data Lake Storage Gen2 からデータを直接コピーする場合、Azure Data Lake Storage Gen2 アカウントに対する SAS 認証を使用して Azure Blob Storage リンク サービスを作成すると、Snowflake へのステージング コピーを使用しないようにすることができます。
ソース データ形式が、次のように構成された Parquet、区切りテキスト、または JSON です。
Parquet 形式の場合は、圧縮コーデックが None またはSnappyです。
区切りテキスト形式の場合:
rowDelimiter
が \r\n または任意の 1 文字です。 行区切り記号が “\r\n” ではない場合は、firstRowAsHeader
を false にする必要があり、skipLineCount
が指定されていません。compression
が、no compression、gzip、bzip2、または deflate です。encodingName
が既定値のままになっているか、"UTF-8"、"UTF-16"、"UTF-16BE"、"UTF-32"、"UTF-32BE"、"BIG5"、"EUC-JP"、"EUC-KR"、"GB18030"、"ISO-2022-JP"、"ISO-2022-KR"、"ISO-8859-1"、"ISO-8859-2"、"ISO-8859-5"、"ISO-8859-6"、"ISO-8859-7"、"ISO-8859-8"、"ISO-8859-9"、"WINDOWS-1250"、"WINDOWS-1251"、"WINDOWS-1252"、"WINDOWS-1253"、"WINDOWS-1254"、"WINDOWS-1255" に設定されています。quoteChar
が、double quote、single quote、または empty string (引用符なし) です。
JSON 形式の場合、直接コピーでは、シンクの Snowflake テーブルに 1 つの列しかなく、この列のデータ型が VARIANT、OBJECT、または ARRAY であるケースのみがサポートされます。
compression
が、no compression、gzip、bzip2、または deflate です。encodingName
が既定値のままか、utf-8 に設定されている。- 列マッピングが指定されていません。
コピー アクティビティ ソース内:
additionalColumns
が指定されていません。- ソースがフォルダーの場合、
recursive
が true に設定されています。 prefix
、modifiedDateTimeStart
、modifiedDateTimeEnd
、およびenablePartitionDiscovery
が指定されていない。
例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Snowflake へのステージング コピー
前のセクションで説明したように、ソースのデータ ストアまたは形式が Snowflake COPY コマンドとネイティブに互換性がない場合は、中間の Azure Blob Storage インスタンスを使用して組み込みのステージング コピーを有効にします。 ステージング コピー機能はスループットも優れています。 Snowflake のデータ形式要件を満たすようにデータを自動的に変換します。 次に、COPY コマンドを呼び出して、Snowflake にデータを読み込みます。 最後に、BLOB ストレージから一時データをクリーンアップします。 ステージングを使用したデータのコピーの詳細は、「ステージング コピー」を参照してください。
この機能を使うには、中間ステージとして、Azure ストレージ アカウントを参照する Azure Blob Storage のリンクされたサービスを作成します。 次に、コピー アクティビティに enableStaging
プロパティと stagingSettings
プロパティを指定します。
シンクに
storageIntegration
を指定する場合、中間ステージングの Azure Blob Storage は、Snowflake の外部ステージで参照したものであることが必要です。 それに対し、サポートされている認証を使用して Azure Blob Storage リンク サービスを作成し、ステージングの Azure Blob Storage のアクセス制御 (IAM) で Snowflake サービス プリンシパルに対し、ストレージ BLOB データ閲覧者以上のロールを付与する必要があります。シンクに
storageIntegration
を指定しない場合、ステージングの Azure Blob Storage リンク サービスでは、Snowflake COPY コマンドで必要な、Shared Access Signature 認証を使用する必要があります。
例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Mapping Data Flow のプロパティ
マッピング データ フローでデータを変換する場合、Snowflake のテーブルから読み書きすることができます。 詳細については、マッピング データ フローのソース変換とシンク変換に関する記事をご覧ください。 ソースとシンクの種類として、Snowflake データセットまたはインライン データセットを使用することができます。
ソース変換
次の表に、Snowflake ソースでサポートされるプロパティの一覧を示します。 これらのプロパティは、 [ソース オプション] タブで編集できます。コネクタは、Snowflake 内部データ転送を利用します。
名前 | 説明 | 必須 | 使用できる値 | データ フロー スクリプトのプロパティ |
---|---|---|---|---|
テーブル | 入力として [テーブル] を選択した場合、データ フローは、Snowflake データセットで指定された、またはインライン データセットを使用するときにソース オプションで指定されたテーブルからすべてのデータをフェッチします。 | いいえ | String | (インライン データセットのみ) tableName schemaName |
クエリ | 入力として [クエリ] を選択した場合は、Snowflake からデータをフェッチするクエリを入力します。 この設定により、データセットで選択したすべてのテーブルがオーバーライドされます。 スキーマ、テーブル、および列の名前に小文字が含まれている場合は、クエリでオブジェクト識別子を引用符で囲みます (例: select * from "schema"."myTable" )。 |
いいえ | String | query |
増分抽出を有効にする (プレビュー) | このオプションを使用して、パイプラインが前回実行されてから変更された行のみを処理するように ADF に指示します。 | いいえ | ブール型 | enableCdc |
増分列 | 増分抽出機能を使用する場合は、ソース テーブルのウォーターマークとして使用する日時または数値列を選択する必要があります。 | いいえ | String | waterMarkColumn |
Snowflake の変更追跡を有効にする (プレビュー) | このオプションを使用すると、ADF が Snowflake 変更データ キャプチャ テクノロジを利用して、前回のパイプライン実行以降の差分データのみを処理できるようになります。 このオプションは、増分列を必要とせずに、行の挿入、更新、削除の操作による差分データを自動的に読み込みます。 | いいえ | ブール型 | enableNativeCdc |
正味変更 | Snowflake の変更追跡を使用するときに、このオプションを使用して、重複除去された変更行または完全な変更を取得できます。 重複除去された変更行には、特定の時点以降に変更された行の最新バージョンのみが表示されますが、完全な変更では、削除または更新された行を含め、変更された各行のすべてのバージョンが表示されます。 たとえば、行を更新すると、完全な変更では削除バージョンと挿入バージョンが表示されますが、重複除去された変更行では挿入バージョンのみが表示されます。 ユース ケースに応じて、自分のニーズに合ったオプションを選択できます。 既定のオプションは false です。これは、完全な変更を意味します。 | いいえ | ブール型 | netChanges |
システム列を含める | Snowflake の変更追跡を使用するときに、systemColumns オプションを使用して、Snowflake によって提供されるメタデータ ストリーム列を変更追跡の出力に含めるか除外するかを制御できます。 既定では、systemColumns は true に設定されています。つまり、メタデータ ストリーム列が含まれます。 除外する場合は、systemColumns を false に設定できます。 | いいえ | ブール型 | systemColumns |
最初から読み取りを開始する | 増分抽出と変更追跡でこのオプションを設定すると、増分抽出が有効になっているパイプラインの最初の実行時にすべての行を読み取るよう ADF に指示します。 | いいえ | ブール型 | skipInitialLoad |
Snowflake のソース スクリプトの例
ソースの種類として Snowflake データセットを使用すると、関連付けられているデータ フロー スクリプトは次のようになります。
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
インライン データセットを使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
ネイティブ変更追跡
Azure Data Factory で、変更追跡と呼ばれる Snowflake のネイティブ機能がサポートされるようになりました。これには、ログの形式での変更追跡が含まれます。 Snowflake のこの機能を使用すると、時間の経過に伴うデータの変更を追跡できるため、増分データの読み込みと監査に役立ちます。 この機能を利用するために、変更データ キャプチャを有効にして Snowflake の変更追跡を選択すると、ソース テーブルの Stream オブジェクトが作成され、ソースの Snowflake テーブルでの変更追跡が可能になります。 その後、クエリで CHANGES 句を使用して、ソース テーブルから新規の、または更新されたデータのみをフェッチします。 また、Snowflake のソース テーブルに設定されたデータ保持時間の間隔内に変更が消費されるようにパイプラインをスケジュールすることもお勧めします。そうしなければ、キャプチャされた変更の動作に一貫性がなくなる可能性があります。
シンク変換
次の表に、Snowflake シンクでサポートされるプロパティの一覧を示します。 これらのプロパティは、[設定] タブで編集できます。インライン データセットを使用する場合、「データセットのプロパティ」セクションで説明されているプロパティと同じ追加の設定が表示されます。 コネクタは、Snowflake 内部データ転送を利用します。
名前 | 説明 | 必須 | 使用できる値 | データ フロー スクリプトのプロパティ |
---|---|---|---|---|
更新方法 | 対象となる Snowflake に対して許可される操作を指定します。 行を更新、アップサート、または削除するには、それらのアクションに対して行をタグ付けするために行の変更変換が必要になります。 |
はい | true または false |
deletable insertable updateable upsertable |
[キー列] | 更新、アップサート、削除の場合、1 つまたは複数のキー列を設定して、変更する行を決定する必要があります。 | いいえ | Array | キー |
テーブル アクション | 書き込み前に変換先テーブルのすべての行を再作成するか削除するかを指定します。 - なし: テーブルに対してアクションは実行されません。 - Recreate:テーブルが削除され、再作成されます。 新しいテーブルを動的に作成する場合に必要です。 - Truncate:ターゲット テーブルのすべての行が削除されます。 |
いいえ | true または false |
recreate truncate |
Snowflake シンク スクリプトの例
シンクの種類として Snowflake データセットを使用する際、関連付けられているデータ フロー スクリプトは次のようになります。
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
インライン データセットを使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
クエリ プッシュダウンの最適化
パイプライン ログ レベルを None に設定することで、中間変換メトリックの送信を除外して Spark 最適化に対する潜在的な障害を防ぎ、Snowflake によって提供されるクエリ プッシュダウンの最適化を有効にします。 このプッシュダウンの最適化により、大量のデータセットを含む大規模な Snowflake テーブルのパフォーマンスが大幅に向上します。
Note
一時テーブルはセッションに対してローカルであり、作成したユーザーが他のセッションにアクセスすることができなくなり、Snowflake によって通常のテーブルとして上書きされる傾向があるため、Snowflake ではサポートされていません。 Snowflake はグローバルにアクセス可能な、一時的なテーブルを代替として提供しますが、これには手動削除が必要になるため、ソース スキーマでの削除操作を回避するという一時テーブルの使用の主目的とは矛盾しています。
Lookup アクティビティのプロパティ
プロパティの詳細については、ルックアップ アクティビティに関する記事を参照してください。
Snowflake コネクタをアップグレードする
Snowflake コネクタをアップグレードするには、サイド バイ サイド アップグレードまたはインプレース アップグレードを実行できます。
サイド バイ サイド アップグレード
サイド バイ サイド アップグレードを実行するには、以下の手順を実行します。
- 新しい Snowflake リンク サービスを作成し、リンク サービスのプロパティを参照して構成します。
- 新しく作成した Snowflake のリンク サービスに基づいてデータセットを作成します。
- レガシ オブジェクトをターゲットとするパイプライン内の既存のリンク サービスとデータセットを、新しいものに置き換えます。
インプレース アップグレード
インプレース アップグレードを実行するには、既存のリンク サービス ペイロードを編集し、新しいリンク サービスを使用するようにデータ セットを更新する必要があります。
type を Snowflake から SnowflakeV2 に更新します。
リンク サービスのペイロードをレガシ形式から新しいパターンに変更します。 上記の type を変更した後、ユーザー インターフェイスから各フィールドに入力するか、JSON エディターを通じてペイロードを直接更新することができます。 サポートされている接続プロパティについては、この記事の「リンク サービスのプロパティ」セクションを参照してください。 次の例は、レガシと新規のリンク サービスにおけるペイロードの違いを示しています。
レガシ Snowflake のリンク サービス JSON ペイロード:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
新しい Snowflake のリンク サービス JSON ペイロード:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
新しいリンク サービスを使用するようにデータセットを更新します。 新しく作成されたリンク サービスに基づいて新しいデータセットを作成するか、既存のデータセットの type プロパティを SnowflakeTable から SnowflakeV2Table に更新することができます。
Snowflake と Snowflake の違い (レガシ)
Snowflake コネクタでは新しい機能を提供し、Snowflake (レガシ) コネクタのほとんどの機能と互換性があります。 次の表では、Snowflake と Snowflake (レガシ) の機能の違いを示しています。
Snowflake | Snowflake (レガシ) |
---|---|
基本とキー ペア認証をサポートします。 | 基本認証をサポートします。 |
現在、スクリプト アクティビティではスクリプト パラメーターはサポートされていません。 代わりに、動的な式をスクリプト パラメーターに使用します。 詳しくは、Azure Data Factory および Azure Synapse Analytics の式と関数を参照してください。 | スクリプト アクティビティでスクリプト パラメーターをサポートします。 |
検索アクティビティで BigDecimal をサポートします。 Snowflake で定義されている NUMBER 型は、検索アクティビティで文字列として表示されます。 | 検索アクティビティでは BigDecimal はサポートされていません。 |
従来の connectionstring プロパティは、必須パラメーター Account、Warehouse、Database、Schema、および Role を優先して非推奨になりました |
従来の Snowflake コネクタでは、connectionstring プロパティを使用して接続が確立されました。 |
関連するコンテンツ
コピー アクティビティによってソース、シンクとしてサポートされるデータ ストアの一覧については、サポートされるデータ ストアと形式の表を参照してください。