在 Azure Data Factory 或 Azure Synapse Analytics 管線中使用自訂活動
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
您在 Azure Data Factory 或 Synapse 管線中可以使用兩種活動。
- 資料移動活動,可在支援的來源與接收資料存放區之間移動資料。
- 資料轉換活動,可使用 Azure HDInsight 和 Azure Batch 等計算服務來轉換資料。
若要將資料移入/移出服務不支援的資料存放區,或以服務不支援的方式轉換/處理資料,您可以利用自己的資料移動或轉換邏輯建立自訂活動,然後在管線中使用活動。 自訂活動會在虛擬機器的 Azure Batch 集區上執行自訂程式碼邏輯。
注意
建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱安裝 Azure PowerShell (部分機器翻譯)。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az。
如果您不熟悉 Azure Batch 服務,請參閱下列文章:
- Azure Batch 基本知識 ,以取得 Azure Batch 服務的概觀。
- New-AzBatchAccount Cmdlet 可建立 Azure Batch 帳戶 (或) Azure 入口網站,以使用 Azure 入口網站建立 Azure Batch 帳戶。 如需使用此 Cmdlet 的詳細指示,請參閱使用 PowerShell 管理 Azure Batch 帳戶一文。
- New-AzBatchPool Cmdlet 可建立 Azure Batch 集區。
重要
建立新的 Azure Batch 集區時,必須使用 'VirtualMachineConfiguration',而「不是」'CloudServiceConfiguration'。 如需詳細資訊,請參閱 Azure Batch 集區移轉指導。
使用 UI 將自訂活動新增至管線
若要在管線中使用自訂活動,請完成下列步驟:
在管線 [活動] 窗格中搜尋「自訂」,然後將自訂活動拖曳至管線畫布。
在畫布上選取新的自訂活動 (如果尚未選取)。
選取 [Azure Batch] 索引標籤,以選取或建立新的 Azure Batch 連結服務,用以執行自訂活動。
選取 [設定] 索引標籤,並指定要在 Azure Batch 上執行的命令,以及選用的進階詳細資料。
Azure Batch 已連結的服務
下列 JSON 會定義範例 Azure Batch 已連結的服務。 如需詳細資訊,請參閱支援的計算環境
{
"name": "AzureBatchLinkedService",
"properties": {
"type": "AzureBatch",
"typeProperties": {
"accountName": "batchaccount",
"accessKey": {
"type": "SecureString",
"value": "access key"
},
"batchUri": "https://batchaccount.region.batch.azure.com",
"poolName": "poolname",
"linkedServiceName": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
若要深入了解 Azure Batch 已連結的服務,請參閱計算已連結的服務一文。
自訂活動
下列 JSON 片段會定義具有範例自訂活動的管線。 活動定義具有對 Azure Batch 已連結的服務之參考。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "helloworld.exe",
"folderPath": "customactv2/helloworld",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}]
}
}
在此範例中,helloworld.exe 是自訂應用程式,儲存在 resourceLinkedService 中使用之 Azure 儲存體帳戶的 customactv2/helloworld 資料夾。 自訂活動會提交此自訂應用程式以在 Azure Batch 上執行。 您可以取代在 Azure Batch 集區節點之目標作業系統上執行之任何偏好應用程式的命令。
下表描述此活動特有的屬性之名稱和描述。
屬性 | 描述 | 必要 |
---|---|---|
NAME | 管線中的活動名稱 | Yes |
description | 說明活動用途的文字。 | No |
type | 針對自訂活動,活動類型是自訂。 | Yes |
linkedServiceName | Azure Batch 的已連結的服務。 若要深入了解此已連結的服務,請參閱計算已連結的服務一文。 | Yes |
命令 | 要執行的自訂應用程式命令。 如果應用程式已經可以在 Azure Batch 集區節點上使用,則可以略過 resourceLinkedService 和 folderPath。 例如,您可以將命令指定為 cmd /c dir ,該命令原生受 Windows Batch 集區節點支援。 |
Yes |
resourceLinkedService | 對儲存體帳戶 (自訂應用程式儲存所在) 的 Azure 儲存體已連結的服務 | 否 * |
folderPath | 自訂應用程式及其所有相依項目的資料夾路徑 如果您將相依性儲存在子資料夾中 (也就是 folderPath 下的階層式資料夾結構),當您將檔案複製到 Azure Batch 時,目前的資料夾結構會遭到壓平合併。 也就是所有檔案會複製到沒有子資料夾的單一資料夾中。 若要解決這個問題行為,請考慮壓縮檔案並複製壓縮的檔案,然後在所需位置中以自訂程式碼來將其解壓縮。 |
否 * |
referenceObjects | 現有已連結的服務和資料集的陣列。 參考的連結服務和資料集以 JSON 格式傳遞至自訂應用程式,讓您的自訂程式碼可以參考服務的資源 | No |
extendedProperties | 使用者定義的屬性,可以傳遞至 JSON 格式的自訂應用程式,讓您的自訂程式碼可以參考其他屬性 | No |
retentionTimeInDays | 檔案 (提交給自訂活動) 的保留時間。 預設值為 30 天。 | No |
* resourceLinkedService
和 folderPath
屬性必須兩者都指定或都省略。
注意
如果您要在自訂活動中將連結服務當作 referenceObjects 傳遞,較安全的作法是傳遞已啟用 Azure Key Vault 的連結服務 (因為不含任何安全字串),並在程式碼中使用祕密名稱直接從Key Vault 擷取認證。 這裡有一個範例參考已啟用 AKV 的連結服務、從 Key Vault 擷取認證,然後在程式碼中存取儲存體。
注意
自訂活動中目前僅支援 resourceLinkedService 的 Azure Blob 儲存體,而且它是預設唯一建立的連結服務,沒有選擇其他連接器的選項,例如 ADLS Gen2。
自訂活動權限
自訂活動會將 Azure Batch 自動使用者帳戶設定為具有工作範圍 (預設的自動使用者規格) 的非系統管理員存取權。 您無法變更自動使用者帳戶的權限等級。 如需詳細資訊,請參閱在 Batch 中的使用者帳戶執行工作 | 自動使用者帳戶。
執行命令
您可以使用自訂活動直接執行命令。 下列範例會在目標 Azure Batch 集區的節點上執行 "echo hello world" 命令,並將輸出列印至 stdout。
{
"name": "MyCustomActivity",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "cmd /c echo hello world"
}
}]
}
}
傳遞物件和屬性
此範例示範如何使用 referenceObjects 和 extendedProperties,將物件和使用者定義的屬性從服務傳遞至自訂應用程式。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "SampleApp.exe",
"folderPath": "customactv2/SampleApp",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
},
"referenceObjects": {
"linkedServices": [{
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
}]
},
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
},
"PropertyBagPropertyName1": "PropertyBagValue1",
"propertyBagPropertyName2": "PropertyBagValue2",
"dateTime1": "2015-04-12T12:13:14Z"
}
}
}]
}
}
當活動執行時,referenceObjects 和 extendedProperties 會儲存在部署至與 SampleApp.exe 相同執行資料夾的下列檔案:
activity.json
儲存 extendedProperties 和自訂活動的屬性。
linkedServices.json
儲存 referenceObjects 屬性中定義之已連結的服務的陣列。
datasets.json
儲存 referenceObjects 屬性中定義之資料集的陣列。
下列範例程式碼示範 SampleApp.exe 如何從 JSON 檔案存取必要的資訊:
using Newtonsoft.Json;
using System;
using System.IO;
namespace SampleApp
{
class Program
{
static void Main(string[] args)
{
//From Extend Properties
dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);
// From LinkedServices
dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
}
}
}
擷取執行輸出
您可以使用下列 PowerShell 命令啟動管線的執行:
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
管線正在執行時,您可以使用下列命令檢查執行的輸出:
while ($True) {
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if(!$result) {
Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
}
elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
}
else {
Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
($result | Format-List | Out-String)
Start-Sleep -Seconds 15
}
Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"
您的自訂應用程式的 stdout 和 stderr 會儲存至您在使用工作的 GUID 建立 Azure Batch 已連結的服務時,定義的 Azure 儲存體已連結的服務中的 adfjobs 容器。 您可以從活動執行輸出取得詳細路徑,如下列程式碼片段所示:
Pipeline ' MyCustomActivity' run finished. Result:
ResourceGroupName : resourcegroupname
DataFactoryName : datafactoryname
ActivityName : MyCustomActivity
PipelineRunId : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName : MyCustomActivity
Input : {command}
Output : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart : 10/5/2017 3:33:06 PM
ActivityRunEnd : 10/5/2017 3:33:28 PM
DurationInMs : 21203
Status : Succeeded
Error : {errorCode, message, failureType, target}
Activity Output section:
"exitcode": 0
"outputs": [
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"
如果要在下游活動中取用 stdout.txt 的內容,您可以在 "@activity('MyCustomActivity').output.outputs[0]" 運算式中取得 stdout.txt 檔案的路徑。
重要
- activity.json、linkedServices.json 和 datasets.json 會儲存在 Batch 工作的執行階段資料夾。 在此範例中,activity.json、linkedServices.json 及 datasets.json 儲存在
https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/
路徑中。 您必須視需要個別加以清除。 - 如果已連結的服務使用自我裝載整合執行階段,機密資訊 (例如金鑰或密碼) 就會由自我裝載整合執行階段進行加密,以確保認證會保留在客戶定義的私人網路環境中。 某些機密欄位由您的自訂應用程式以這樣的方式參考時,可能會遺失。 視需要在 extendedProperties 中使用 SecureString,而不是使用已連結的服務參考。
將輸出傳遞到其他活動
您可以在自訂活動中從程式碼將自訂值傳回給服務。 您可以從應用程式中將它們寫入到 outputs.json
執行此動作。 服務複製 outputs.json
的內容,並當作 customOutput
屬性的值附加至活動輸出。 (大小限制為 2 MB)。如果您想要在下游活動中取用 outputs.json
的內容,您可以使用運算式 @activity('<MyCustomActivity>').output.customOutput
來取得值。
擷取 SecureString 輸出
指定為 SecureString 型別的敏感性屬性值 (如本文中的部分範例所示),在使用者介面上的 [監視] 索引標籤中都加上遮罩。 但是,在實際的管線執行中,SecureString 屬性會在 activity.json
檔案中序列化為純文字形式的 JSON。 例如:
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
}
}
此序列化不是真正的安全,而且也不是以安全為目的。 用意是提示服務將 [監視] 索引標籤中的值加上遮罩。
若要從自訂活動中存取 SecureString 型別的屬性,請讀取 activity.json
檔案 (與您的 .EXE 位在相同資料夾中)、將 JSON 還原序列化,然後存取 JSON 屬性 (extendedProperties => [propertyName] => value)。
Azure Batch 的自動調整
您也可以建立具有 自動調整 功能的 Azure Batch 集區。 例如,您可以用 0 個專用 VM 和基於暫止工作數目的自動調整公式,建立 Azure Batch 集區。
這裡的範例公式會有下列行為:當集區一開始建立時,它會以 1 部 VM 啟動。 $PendingTasks 計量會定義執行中 + 作用中 (已排入佇列) 狀態的工作數目。 公式會尋找過去 180 秒內的平均擱置中工作數目,並據以設定 TargetDedicated。 它會確保 TargetDedicated 一律不會超過 25 部 VM。 因此,當提交新工作時,集區會自動成長而且工作會完成,VM 會依序成為可用,而且自動調整規模功能會壓縮那些 VM。 您可以視需要調整 startingNumberOfVMs 及 maxNumberofVMs。
自動調整公式:
startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);
如需詳細資訊,請參閱 自動調整 Azure Batch 集區中的計算節點 。
如果集區使用預設的 autoScaleEvaluationInterval,Batch 服務在執行自訂活動之前,可能需要 15-30 分鐘的時間準備 VM。 如果集區使用不同的 autoScaleEvaluationInterval,Batch 服務可能需要 autoScaleEvaluationInterval + 10 分鐘。
相關內容
請參閱下列文章,其說明如何以其他方式轉換資料: