使用 Azure Data Factory 或 Azure Synapse Analytics 在 Snowflake 中複製和轉換資料
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費啟動新的試用版!
本文概述如何使用 Azure Data Factory 和 Azure Synapse Analytics 管線中的複製活動,在 Snowflake 複製/貼上資料,並使用資料流程來轉換 Snowflake 中的資料。 若要深入了解,請閱讀 Data Factory 或 Azure Synapse Analytics 簡介文章。
重要
新的 Snowflake 連接器提供改善的原生 Snowflake 支援。 如果您在解決方案中使用舊版 Snowflake 連接器,建議您 儘早升級 Snowflake 連接器 。 如需舊版與最新版本之間差異的詳細資料,請參閱本節。
支援的功能
此 Snowflake 連接器支援下列功能:
支援的功能 | IR |
---|---|
複製活動 (來源/接收) | (1) (2) |
對應資料流 (來源/接收) | (1) |
查閱活動 | (1) (2) |
指令碼活動 | (1) (2) |
① 整合執行階段 ② 自我裝載整合執行階段
針對複製活動,此 Snowflake 連接器支援下列功能:
- 用 Snowflake 的 COPY into [location] (英文) 命令從 Snowflake 複製資料,藉此達到最佳效能。
- 用 Snowflake 的 COPY into [table] (英文) 命令將資料複製到 Snowflake,藉此達到最佳效能。 這項命令支援 Azure 上的 Snowflake。
- 如果需要 Proxy 從自我裝載整合執行階段連線到 Snowflake,您必須在整合執行階段主機上設定 HTTP_PROXY 和 HTTPS_PROXY 環境變數。
必要條件
如果您的資料存放區位於內部部署網路、Azure 虛擬網路或 Amazon 虛擬私人雲端中,則必須設定自我裝載整合執行階段以與其連線。 請務必將自我裝載整合執行階段使用的 IP 位址新增至允許清單。
如果您的資料存放區是受控雲端資料服務,則可使用 Azure Integration Runtime。 如果只能存取防火牆規則中核准的 IP,您可以將 Azure Integration Runtime IP 新增至允許清單。
用於來源或接收的 Snowflake 帳戶應該具有資料庫的必要 USAGE
存取權,以及結構描述及其下的資料表/檢視表的讀取/寫入存取權。 此外,其也應該在結構描述上具有 CREATE STAGE
,才能使用 SAS URI 建立外部階段。
必須設定下列帳戶屬性值
屬性 | 描述 | 是必要欄位 | 預設 |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | 指定在建立具名外部階段 (使用 CREATE STAGE) 存取私人雲端儲存位置時,是否需要儲存體整合物件做為雲端認證。 | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | 指定從私人雲端儲存位置載入資料或卸載資料時,是否要使用參考儲存體整合物件做為雲端認證的具名外部階段。 | FALSE | FALSE |
如需 Data Factory 支援的網路安全性機制和選項的詳細資訊,請參閱資料存取策略。
開始使用
若要透過管線執行複製活動,您可以使用下列其中一個工具或 SDK:
使用 UI 建立 Snowflake 的連結服務
使用下列步驟,在 Azure 入口網站 UI 中建立 Snowflake 的連結服務。
前往 Azure Data Factory 或 Synapse 工作區的 [管理] 索引標籤,選取 [連結服務],然後按一下 [新增]:
搜尋 Snowflake 並選取 Snowflake 連接器。
設定服務詳細資料,測試連線,然後建立新的連結服務。
連接器設定詳細資料
下列各節提供屬性的相關詳細資料,這些屬性會定義 Snowflake 連接器的特定實體。
連結服務屬性
Snowflake 連結服務支援下列泛型屬性:
屬性 | 描述 | 必要 |
---|---|---|
type | type 屬性必須設定為 SnowflakeV2。 | Yes |
accountIdentifier | 帳戶名稱以及其組織。 例如,myorg-account123。 | Yes |
database | 連線之後用於工作階段的預設資料庫。 | Yes |
倉儲 | 連線之後用於工作階段的預設虛擬倉儲。 | Yes |
authenticationType | 用來連線到 Snowflake 服務的驗證類型。 允許的值為:Basic (預設值) 和 KeyPair。 請分別參閱下列有關更多屬性和範例的對應區段。 | No |
角色 (role) | 連線之後用於工作階段的預設安全性角色。 | No |
host | Snowflake 帳戶的主機名。 例如: contoso.snowflakecomputing.com 。 .cn 也支援 。 |
No |
connectVia | 用來連線到資料存放區的整合執行階段。 如果您的資料存放區位於私人網路,則可使用 Azure 整合執行階段或自我裝載整合執行階段。 如果未指定,則會使用預設的 Azure 整合執行階段。 | No |
Snowflake 連接器支援下列驗證類型。 如需詳細資料,請參閱對應章節。
基本驗證
若要使用「基本」驗證,除了上一節所述的泛型屬性外,請指定下列屬性:
屬性 | 描述 | 必要 |
---|---|---|
使用者 | Snowflake 使用者的登入名稱。 | Yes |
password | Snowflake 使用者的密碼。 將此欄位標記為 SecureString 類型以將其安全地儲存。 您也可以參考 Azure Key Vault 中儲存的認證。 | Yes |
範例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Azure Key Vault 中的密碼:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
注意
對應資料流僅支援基本驗證。
金鑰組驗證
若要使用「金鑰組」驗證,您必須參閱金鑰組驗證和金鑰組輪替,在 Snowflake 中設定及建立金鑰組驗證使用者。 之後,請記下您用來定義連結服務的私密金鑰和複雜密碼 (選用)。
除了上一節所述的一般屬性以外,請指定下列屬性:
屬性 | 描述 | 必要 |
---|---|---|
使用者 | Snowflake 使用者的登入名稱。 | Yes |
privateKey | 用於金鑰組驗證的私密金鑰。 為了確保私密金鑰在傳送至 Azure Data Factory 時有效,並考慮到 privateKey 檔案包含新行字元 (\n),請務必以字串常值形式正確地格式化 privateKey 內容。 此流程涉及明確地將 \n 新增至每個新行符號。 |
Yes |
privateKeyPassphrase | 用於解密私密金鑰 (如果已加密) 的複雜密碼。 | No |
範例:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
資料集屬性
如需可用來定義資料集的區段和屬性完整清單,請參閱資料集一文。
以下是針對 Snowflake 資料集支援的屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 資料集的 type 屬性必須設定為 SnowflakeV2Table。 | Yes |
schema | 結構描述的名稱。 請注意,結構描述名稱會區分大小寫。 | 否 (來源);是 (接收器) |
table | 資料表/檢視的名稱。 請注意,表格名稱會區分大小寫。 | 否 (來源);是 (接收器) |
範例:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
複製活動屬性
如需可用來定義活動的區段和屬性完整清單,請參閱管線一文。 本節提供 Snowflake 來源和接收器支援的屬性清單。
以 Snowflake 做為來源
Snowflake 連接器會使用 Snowflake 的 COPY into [location] 命令實現最佳效能。
如果 Snowflake COPY 命令原生支援接收器資料存放區和格式,則可使用複製活動,直接從 Snowflake 複製到接收器。 如需詳細資訊,請參閱從 Snowflake 直接複製。 若否,請使用內建的從 Snowflake 暫存複本。
若要從 Snowflake 複製資料,複製活動的 [source] 區段支援下列屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 複製活動來源的 type 屬性必須設定為 SnowflakeV2Source。 | Yes |
query | 指定 SQL 查詢從 Snowflake 讀取資料。 如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable" 。不支援執行預存程序。 |
No |
exportSettings | 用來從 Snowflake 擷取資料的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 | Yes |
在 exportSettings 之下: |
||
type | 匯出命令的型別,設定為 SnowflakeExportCopyCommand。 | Yes |
storageIntegration | 指定您在 Snowflake 中建立的儲存體整合名稱。 如需使用儲存體整合的必要步驟,請參閱設定 Snowflake 儲存體整合。 | No |
additionalCopyOptions | 其他複製選項,以機碼值組字典的形式提供。 範例:MAX_FILE_SIZE、OVERWRITE。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文)。 | No |
additionalFormatOptions | 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文)。 | No |
注意
請確定您有權執行下列命令,並存取架構 INFORMATION_SCHEMA 和資料表 COLUMNS。
COPY INTO <location>
從 Snowflake 直接複製
如果您的接收器資料存放區和格式符合本節所述準則,則可使用複製活動,直接從 Snowflake 複製到接收器。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:
當您在來源中指定
storageIntegration
時:接收器資料存放區是您在 Snowflake 外部階段中參考的 Azure Blob 儲存體。 您必須完成下列步驟,然後才能複製資料:
使用任何支援的驗證類型,為接收器 Azure Blob 儲存體建立 Azure Blob 儲存體連結服務。
將至少儲存體 Blob 資料參與者角色授與接收器 Azure Blob 儲存體存取控制 (IAM) 中的 Snowflake 服務主體。
當您未在來源中指定
storageIntegration
時:接收器連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接將資料複製到 Azure Data Lake Storage Gen2,請針對 Azure Data Lake Storage Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 儲存體連結服務,以避免使用從 Snowflake 分段複製。
接收器資料格式包括 Parquet、分隔符號文字 或 JSON,並具有下列設定:
- 針對 Parquet 格式,壓縮轉碼器為 None、Snappy 或 Lzo。
- 針對分隔符號文字格式:
rowDelimiter
為 \r\n 或任何單一字元。compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。quoteChar
為雙引號、單引號或空字串 (沒有引號字元)。
- 針對 JSON 格式,直接複製僅支援來源 Snowflake 資料表或查詢結果只有單一資料行的案例,且此資料行的資料類型為 VARIANT、OBJECT 或 ARRAY。
compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。filePattern
在複製活動接收器中保留為預設值,或設定為 setOfObjects。
在複製活動來源中,不會指定
additionalColumns
。不會指定資料行對應。
範例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
從 Snowflake 暫存複本
當您的接收器資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡 Azure Blob 儲存體執行個體啟用內建分段複製。 分段複製功能也能提供更好的輸送量。 服務會將資料從 Snowflake 匯出至暫存儲存體,然後將資料複製到接收器,最後從暫存儲存體中清除暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製。
如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStaging
和 stagingSettings
屬性。
當您在來源中指定
storageIntegration
時,過渡暫存 Azure Blob 儲存體應該是您在 Snowflake 外部階段中參考的儲存體。 請確定您使用任何支援的驗證為其建立 Azure Blob 儲存體連結服務,並將至少儲存體 Blob 資料參與者角色授與暫存 Azure Blob 儲存體存取控制 (IAM) 中的 Snowflake 服務主體。當您未在來源中指定
storageIntegration
時,根據 Snowflake COPY 命令所要求,暫存 Azure Blob 儲存體連結服務必須使用共用存取簽章驗證。 請確定您將適當的存取權限授與暫存 Azure Blob 儲存體中的 Snowflake。 若要深入瞭解,請參閱這篇文章。
範例:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
從 Snowflake 執行分段複製時,請務必將 [接收複製行為] 設定為 [合併檔案]。 此設定可確保已正確處理及合併所有分割的檔案,以避免只複製最後一個資料分割檔案的問題。
範例設定
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
注意
無法將 [接收複製行為] 設定為 [合併檔案],可能會導致僅複製最後一個分割的檔案。
以 Snowflake 做為接收器
Snowflake 連接器會用 Snowflake 的 COPY into [table] (英文) 命令達到最佳效能。 此功能支援將資料寫入 Azure 上的 Snowflake。
如果 Snowflake COPY 命令原生支援來源資料存放區和格式,則可使用複製活動,直接從來源複製到 Snowflake。 如需詳細資訊,請參閱直接複製到 Snowflake。 若否,請使用內建的暫存複本到 Snowflake。
若要將資料複製到 Snowflake,複製活動的 [sink] 區段支援下列屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 複製活動接收器的 type 屬性必須設定為 SnowflakeV2Sink。 | Yes |
preCopyScript | 指定一個供複製活動在每次執行時將資料寫入到 Snowflake 前執行的 SQL 查詢。 使用此屬性來清除預先載入的資料。 | No |
importSettings | 用來將資料寫入 Snowflake 的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 | Yes |
在 importSettings 之下: |
||
type | 匯入命令的型別,設定為 SnowflakeImportCopyCommand。 | Yes |
storageIntegration | 指定您在 Snowflake 中建立的儲存體整合名稱。 如需使用儲存體整合的必要步驟,請參閱設定 Snowflake 儲存體整合。 | No |
additionalCopyOptions | 其他複製選項,以機碼值組字典的形式提供。 範例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文)。 | No |
additionalFormatOptions | 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文)。 | No |
注意
請確定您有權執行下列命令,並存取架構 INFORMATION_SCHEMA 和資料表 COLUMNS。
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
直接複製到 Snowflake
如果您的來源資料存放區和格式符合本節所述的準則,則可使用複製活動,直接從來源複製到 Snowflake。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:
當您在接收器中指定
storageIntegration
時:來源資料存放區是您在 Snowflake 外部階段中參考的 Azure Blob 儲存體。 您必須完成下列步驟,然後才能複製資料:
使用任何支援的驗證類型,為來源 Azure Blob 儲存體建立 Azure Blob 儲存體連結服務。
將至少儲存體 Blob 資料讀者角色授與來源 Azure Blob 儲存體存取控制 (IAM) 中的 Snowflake 服務主體。
當您未在接收器中指定
storageIntegration
時:來源連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接從 Azure Data Lake Storage Gen2 複製資料,請針對 Azure Data Lake Storage Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 儲存體連結服務,以避免使用分段複製到 Snowflake。
來源資料格式包括 Parquet、分隔符號文字 或 JSON,並具有下列設定:
針對 Parquet 格式,壓縮轉碼器為 None 或 Snappy。
針對分隔符號文字格式:
rowDelimiter
為 \r\n 或任何單一字元。 如果資料列分隔符號不是 “\r\n”,則firstRowAsHeader
必須為 false,且不會指定skipLineCount
。compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
保留為預設值,或設定為「UTF-8」、「UTF-16」、「UTF-16BE」、「UTF-32」、「UTF-32BE」、「BIG5」、「EUC-JP」、「EUC-KR」、「GB18030」、「ISO-2022-JP」、「ISO-2022-KR」、「ISO-8859-1」、「ISO-8859-2」、「ISO-8859-2」、「ISO-8859-5」、「ISO-8859-6」、「ISO-8859-7」、「ISO-8859-8」、「ISO-8859-9」、「WINDOWS-1250」、「WINDOWS-1251」、「WINDOWS-1252」、「WINDOWS-1253」、「WINDOWS-1254」、「WINDOWS-1255」。quoteChar
為雙引號、單引號或空字串 (沒有引號字元)。
針對 JSON 格式,直接複製僅支援接收器 Snowflake 資料表只有單一資料行的案例,且此資料行的資料類型為 VARIANT、OBJECT 或 ARRAY。
compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。- 不會指定資料行對應。
在複製活動來源中:
- 不會指定
additionalColumns
。 - 如果來源是資料夾,
recursive
會設定為 true。 - 不會指定
prefix
、modifiedDateTimeStart
、modifiedDateTimeEnd
和enablePartitionDiscovery
。
- 不會指定
範例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
暫存複本到 Snowflake
您的來源資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡 Azure Blob 儲存體執行個體啟用內建分段複製。 分段複製功能也能提供更好的輸送量。 該服務會自動轉換資料,以符合 Snowflake 的資料格式需求。 之後系統會叫用 COPY 命令,將資料載入 Snowflake。 最後,它會清除 Blob 儲存體中的暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製。
如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStaging
和 stagingSettings
屬性。
當您在接收器中指定
storageIntegration
時,過渡暫存 Azure Blob 儲存體應該是您在 Snowflake 外部階段中參考的儲存體。 請確定您使用任何支援的驗證為其建立 Azure Blob 儲存體連結服務,並將至少儲存體 Blob 資料讀者角色授與暫存 Azure Blob 儲存體存取控制 (IAM) 中的 Snowflake 服務主體。當您未在接收器中指定
storageIntegration
時,根據 Snowflake COPY 命令所要求,暫存 Azure Blob 儲存體連結服務必須使用共用存取簽章驗證。
範例:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
對應資料流程屬性
在對應資料流中轉換資料時,您可以從 Snowflake 資料表中讀取並寫入資料。 如需詳細資訊,請參閱對應資料流程中的來源轉換和接收轉換。 您可以選擇使用 Snowflake 資料集或內嵌資料集做為來源和接收器型別。
來源轉換
下表列出 Snowflake 來源支援的屬性。 您可以在 [來源選項] 索引標籤中編輯這些屬性。連接器會使用 Snowflake 的內部資料傳輸 (英文)。
名稱 | 描述 | 必要 | 允許的值 | 資料流程指令碼屬性 |
---|---|---|---|---|
Table | 如果您選取 [資料表] 做為輸入,資料流程會在使用內嵌資料集時,從 Snowflake 資料集或來源選項中指定的資料表擷取全部資料。 | No | String | (僅限內嵌資料集) tableName schemaName |
查詢 | 如果您選取 [查詢] 做為輸入,請輸入查詢以從 Snowflake 擷取資料。 此設定會覆寫您在資料集中選擇的任何資料表。 如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable" 。 |
No | String | query |
啟用累加式擷取 (預覽) | 使用此選項可告訴 ADF 只處理自上次執行管線後已變更的資料列。 | No | 布林值 | enableCdc |
累加資料行 | 使用累加式擷取功能時,您必須選擇想要在來源資料表中用做浮水印的日期/時間/數值資料行。 | No | String | waterMarkColumn |
啟用 Snowflake 變更追蹤 (預覽) | 此選項可讓 ADF 利用 Snowflake 異動資料擷取技術,只處理先前管線執行後的差異資料。 這個選項會自動使用資料列插入、更新和刪除作業載入差異資料,而不需要任何累加資料行。 | No | 布林值 | enableNativeCdc |
網路變更 | 使用 Snowflake 變更追蹤時,您可以使用此選項來取得重複資料刪除變更的資料列或完整變更。 重複資料刪除變更的資料列只會顯示從指定時間點以來已變更的最新版本資料列,而完整變更會顯示每個已變更資料列的所有版本,包括已刪除或更新的資料列。 例如,如果您更新資料列,您將會在完整變更中看到刪除版本和插入版本,但是在重複資料刪除變更資料列中只會看到插入版本。 依據您的使用案例,您可以選擇適合您需求的選項。 預設選項為 False,這表示完整變更。 | No | 布林值 | netChanges |
包含系統資料行 | 使用 Snowflake 變更追蹤時,您可以使用 systemColumns 選項來控制在變更追蹤輸出中包含或排除 Snowflake 提供的中繼資料串流資料行。 根據預設,systemColumns 會設定為 true,這表示包含中繼資料串流資料行。 如果您想要排除資料行,您可以將 systemColumns 設定為 False。 | No | 布林值 | systemColumns |
從頭開始讀取 | 使用累加擷取和變更追蹤設定此選項時,會指示 ADF 在第一次執行管線時讀取所有資料列,並開啟累加擷取。 | No | 布林值 | skipInitialLoad |
Snowflake 來源指令碼範例
使用 Snowflake 資料集做為來源型別時,相關聯的資料流程指令碼為:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
如果您使用內嵌資料集,相關聯的資料流程指令碼為:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
原生變更追蹤
Azure Data Factory 現在於 Snowflake 中支援稱為變更追蹤的原生功能,其中包含以記錄形式追蹤變更。 Snowflake 的這項功能可讓我們追蹤一段時間的資料變更,使其適用於累加資料載入和稽核用途。 若要利用這項功能,當您啟用異動資料擷取並選取 Snowflake 變更追蹤時,我們會為來源資料表建立 Stream 物件,以啟用來源 Snowflake 資料表上的變更追蹤。 接著,我們會在查詢中使用 CHANGES 子句,只從來源資料表擷取新的或更新的資料。 此外,建議排程管線,讓變更在為 snowflake 來源資料表設定的資料保留時間間隔內耗用,否則使用者可能會在擷取的變更中看到不一致的行為。
接收轉換
下表列出 Snowflake 接收器支援的屬性。 您可以在 [設定] 索引標籤中編輯這些屬性。使用內嵌資料集時,您將會看到其他設定,這些設定與資料集屬性一節所述的屬性相同。 連接器會使用 Snowflake 的內部資料傳輸 (英文)。
名稱 | 描述 | 必要 | 允許的值 | 資料流程指令碼屬性 |
---|---|---|---|---|
更新方法 | 指定您的 Snowflake 目的地允許的作業。 若要更新、upsert 或刪除資料列,必須使用 [變更資料列轉換] 來標記這些動作的資料列。 |
Yes | true 或 false |
deletable insertable updateable upsertable |
索引鍵資料行 | 對於更新、更新插入和刪除,必須設定索引鍵資料行,以決定要改變哪一個資料列。 | No | 陣列 | 金鑰 |
資料表動作 | 決定在寫入之前,是否要重新建立或移除目的地資料表中的所有資料列。 - 無:不會對資料表執行任何動作。 - 重新建立:將會捨棄並重新建立資料表。 如果要動態建立新的資料表,則為必要。 - 截斷:將會移除目標資料表中的所有資料列。 |
No | true 或 false |
recreate truncate |
Snowflake 接收器指令碼範例
使用 Snowflake 資料集做為接收器型別時,相關聯的資料流程指令碼為:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
如果您使用內嵌資料集,相關聯的資料流程指令碼為:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
查詢下推最佳化
藉由將管線記錄層級設定為「無」,我們會排除中繼轉換計量的傳輸、防止 Spark 最佳化的潛在障礙,以及啟用 Snowflake 所提供的查詢下推最佳化。 此下推最佳化可讓具有廣泛資料集的大型 Snowflake 資料表大幅提升效能。
注意
我們不支援 Snowflake 中的暫存資料表,因為這對於工作階段或建立這些暫存資料表的使用者而言是本機,因此無法存取其他工作階段,而且容易被 Snowflake 覆寫為一般資料表。 雖然 Snowflake 提供暫時性資料表作為可全域存取的替代方案,但需要手動刪除,這與使用暫存資料表的主要目標相矛盾,也就是避免來源結構描述中的任何刪除作業。
查閱活動屬性
如需屬性的詳細資訊,請參閱查閱活動。
升級 Snowflake 連接器
若要升級 Snowflake 連接器,您可以執行並存升級或就地升級。
並存升級
若要執行並存升級,請完成下列步驟:
- 建立新的 Snowflake 連結服務,並藉由參考連結的服務屬性加以設定。
- 根據新建立的 Snowflake 連結服務建立數據集。
- 將新的連結服務和數據集取代為管線中以舊版對象為目標的現有連結服務和數據集。
就地升級
若要執行就地升級,您必須編輯現有的連結服務承載,並更新數據集以使用新的連結服務。
將類型從 Snowflake 更新為 SnowflakeV2。
將連結的服務承載從舊版格式修改為新的模式。 您可以在變更上述類型之後,填入使用者介面中的每個字段,或直接透過 JSON 編輯器更新承載。 如需支持的連線屬性,請參閱本文中的連結服務屬性一節。 下列範例顯示舊版和新 Snowflake 連結服務的承載差異:
舊版 Snowflake 鏈接服務 JSON 承載:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
新的 Snowflake 連結服務 JSON 承載:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
更新數據集以使用新的連結服務。 您可以根據新建立的連結服務建立新的數據集,或將現有數據集的類型屬性從 SnowflakeTable 更新為 SnowflakeV2Table。
Snowflake 與 Snowflake (舊版) 之間的差異
Snowflake 連接器提供新功能,並與 Snowflake (舊版) 連接器的大部分功能相容。 下表顯示 Snowflake 與 Snowflake (舊版) 之間的功能差異。
Snowflake | Snowflake (舊版) |
---|---|
支援基本和金鑰組驗證。 | 支援基本驗證。 |
指令碼活動中目前不支援指令碼參數。 或者,針對指令碼參數使用動態運算式。 如需詳細資訊,請參閱 Azure Data Factory 和 Azure Synapse Analytics 中的運算式和函式。 | 支援指令碼活動的指令碼參數。 |
支援查閱活動中的 BigDecimal。 NUMBER 類型 (如 Snowflake 中所定義) 將會在查閱活動中顯示為字串。 如果您想要將它隱含至數值類型,您可以使用管線參數搭配 int 函式或 float 函式。 例如,int(activity('lookup').output.firstRow.VALUE) 、float(activity('lookup').output.firstRow.VALUE) |
查閱活動中不支援 BigDecimal。 |
accountIdentifier 、warehouse 、 database schema 和 role 屬性可用來建立連接。 |
屬性 connectionstring 是用來建立連接。 |
Snowflake 中的 timestamp 數據類型會在 Lookup 和 Script 活動中讀取為 DateTimeOffset 數據類型。 | Snowflake 中的 timestamp 數據類型會在查閱和腳本活動中讀取為 DateTime 數據類型。 如果您在升級連接器之後仍需要使用 Datetime 值作為管線中的參數,您可以使用 formatDateTime 函式或 concat 函式,將 DateTimeOffset 類型轉換為 DateTime 類型。 例如: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE) 、concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') |
相關內容
如需複製活動支援作為來源和接收器的資料存放區清單,請參閱支援的資料存放區和格式。