Azure Data Factory または Azure Synapse Analytics のパイプラインでカスタム アクティビティを使用する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
Azure Data Factory または Synapse のパイプラインでは、2 種類のアクティビティを使用できます。
- サポートされるソース データ ストアとシンク データ ストアの間でデータを移動するデータ移動アクティビティ。
- Azure HDInsightやAzure Batch などのコンピューティング サービスを使用してデータを変換するデータ変換アクティビティ。
サービスでサポートされていないデータ ストアとの間でデータを移動する場合や、サービスでサポートされていない方法でデータを変換または処理する場合は、独自のデータ移動ロジックまたはデータ変換ロジックでカスタム アクティビティを作成して、パイプラインでそのアクティビティを使用できます。 カスタム アクティビティでは、仮想マシンの Azure Batch プールでカスタマイズされたコード ロジックを実行します。
注意
Azure を操作するには、Azure Az PowerShell モジュールを使用することをお勧めします。 作業を始めるには、「Azure PowerShell をインストールする」を参照してください。 Az PowerShell モジュールに移行する方法については、「AzureRM から Az への Azure PowerShell の移行」を参照してください。
Azure Batch サービスを初めて利用する場合は、次の記事をご覧ください。
- Azure Batch の基本 」をご覧ください。
- Azure Batch アカウントの作成方法については、New-AzBatchAccount コマンドレットをご覧ください。Azure portal を使用した Azure Batch アカウントの作成方法については、Azure portal をご覧ください。 このコマンドレットの使用方法の詳細については、PowerShell を使用した Azure Batch アカウントの管理に関する記事をご覧ください。
- Azure Batch プールの作成方法については、New-AzBatchPool コマンドレットをご覧ください。
重要
新しい Azure Batch プールを作成するときは、'CloudServiceConfiguration' ではなく 'VirtualMachineConfiguration' を使用する必要があります。 詳細については、Azure Batch プールの移行に関するガイダンスを参照してください。
UI を使用してパイプラインにカスタム アクティビティを追加する
パイプラインでカスタム アクティビティを使用するには、次の手順を実行します:
パイプラインの [アクティビティ] ペイン内で Custom を検索し、カスタム アクティビティをパイプライン キャンバスにドラッグします。
まだ選択されていない場合は、キャンバスで新しいカスタム アクティビティを選択します。
[Azure Batch] タブを選択し、カスタム アクティビティを実行する新しい Azure Batch にリンクされたサービスを選択するか、新規作成します。
[設定] タブを選択し、Azure Batch で実行するコマンドと、オプションの詳細を指定します。
Azure Batch のリンクされたサービス
次の JSON では、サンプルの Azure Batch のリンクされたサービスを定義しています。 詳細については、サポートされるコンピューティング環境に関する記事を参照してください。
{
"name": "AzureBatchLinkedService",
"properties": {
"type": "AzureBatch",
"typeProperties": {
"accountName": "batchaccount",
"accessKey": {
"type": "SecureString",
"value": "access key"
},
"batchUri": "https://batchaccount.region.batch.azure.com",
"poolName": "poolname",
"linkedServiceName": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Azure Batch のリンクされたサービスの詳細については、計算のリンクされたサービスに関する記事をご覧ください。
カスタム アクティビティ
次の JSON スニペットでは、単純なカスタム アクティビティを使用するパイプラインを定義しています。 このアクティビティ定義には、Azure Batch のリンクされたサービスへの参照が含まれています。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "helloworld.exe",
"folderPath": "customactv2/helloworld",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
}
}
}]
}
}
このサンプルでは、helloworld.exe は、resourceLinkedService で使用される Azure ストレージ アカウントの customactv2/helloworld フォルダーに格納されているカスタム アプリケーションです。 カスタム アクティビティでは、Azure Batch で実行されるこのカスタム アプリケーションを送信します。 コマンドは、Azure Batch プール ノードのターゲット オペレーティング システムで実行できる任意の優先アプリケーションに置き換えることができます。
次の表は、このアクティビティに固有のプロパティの名前と説明です。
プロパティ | 内容 | 必須 |
---|---|---|
name | パイプラインのアクティビティの名前。 | はい |
description | アクティビティの動作を説明するテキスト。 | いいえ |
type | カスタム アクティビティの場合、アクティビティの種類は Custom です。 | はい |
linkedServiceName | Azure Batch にリンクされたサービス。 このリンクされたサービスの詳細については、計算のリンクされたサービスに関する記事をご覧ください。 | はい |
command | 実行されるカスタム アプリケーションのコマンド。 アプリケーションが Azure Batch プール ノードで既に使用可能な場合は、resourceLinkedService と folderPath を省略できます。 たとえば、Windows バッチ プール ノードでネイティブでサポートされている cmd /c dir をコマンドとして指定できます。 |
はい |
resourceLinkedService | カスタム アプリケーションが格納されているストレージ アカウントへの Azure Storage のリンクされたサービス。 | なし * |
folderPath | カスタム アプリケーションとそのすべての依存関係のフォルダーのパス。 依存関係ファイルをサブフォルダーに置いている場合 (つまり、folderPath の下のフォルダー階層構造内に置いている場合)、現在の動作では、ファイルが Azure Batch にコピーされるときに、フォルダー構造がフラット化されます。 つまり、すべてのファイルは、サブフォルダーを使用せず、1 つのフォルダーにコピーされます。 この問題を回避するには、ファイルを圧縮し、圧縮されたファイルをコピーした後、目的の場所でカスタム コードと共に解凍することを検討してください。 |
なし * |
referenceObjects | 既存のリンクされたサービスとデータセットの配列。 カスタム コードがサービスのリソースを参照できるように、参照されているリンク サービスとデータセットが JSON 形式でカスタム アプリケーションに渡されます。 | いいえ |
extendedProperties | カスタム コードが追加のプロパティを参照できるように、JSON 形式でカスタム アプリケーションに渡すことができるユーザー定義プロパティ。 | いいえ |
retentionTimeInDays | カスタム アクティビティ用に送信するファイルのリテンション期間。 既定値は 30 日です。 | いいえ |
* プロパティ resourceLinkedService
と folderPath
は、両方とも指定するか、両方とも省略する必要があります。
Note
リンクされたサービスをカスタム アクティビティで ReferenceObjects として渡す場合は、セキュリティ上の観点から、Azure Key Vault 対応のリンクされたサービスを渡して (セキュリティで保護された文字列が含まれていないため)、シークレット名を使用して Key Vault のコードから直接資格情報を取り込むことをお勧めします。 Azure Key Vault 対応のリンクされたサービスを参照し、Key Vault から資格情報を取得して、コード内のストレージにアクセスする例がここに記載されています。
Note
現在、カスタム アクティビティの resourceLinkedService では Azure Blob Storage のみがサポートされており、既定で作成される唯一のリンクされたサービスであり、ADLS Gen2 などの他のコネクタを選択するオプションはありません。
カスタム アクティビティのアクセス許可
カスタム アクティビティでは、Azure Batch の自動ユーザー アカウントが "タスク スコープのある管理者以外のアクセス" (既定の自動ユーザーの仕様) に設定されます。 自動ユーザー アカウントのアクセス許可レベルを変更することはできません。 詳細については、「Batch のユーザー アカウントでタスクを実行する」の「自動ユーザー アカウント」を参照してください。
コマンドの実行
カスタム アクティビティを使用してコマンドを直接実行できます。 次の例は、Azure Batch プールのターゲット ノードで "echo hello world" コマンドを実行し、出力を stdout に出力します。
{
"name": "MyCustomActivity",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "cmd /c echo hello world"
}
}]
}
}
オブジェクトとプロパティを渡す
次のサンプルは、referenceObjects と extendedProperties を使用して、オブジェクトとユーザー定義プロパティをサービスからカスタム アプリケーションに渡す方法を示しています。
{
"name": "MyCustomActivityPipeline",
"properties": {
"description": "Custom activity sample",
"activities": [{
"type": "Custom",
"name": "MyCustomActivity",
"linkedServiceName": {
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"command": "SampleApp.exe",
"folderPath": "customactv2/SampleApp",
"resourceLinkedService": {
"referenceName": "StorageLinkedService",
"type": "LinkedServiceReference"
},
"referenceObjects": {
"linkedServices": [{
"referenceName": "AzureBatchLinkedService",
"type": "LinkedServiceReference"
}]
},
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
},
"PropertyBagPropertyName1": "PropertyBagValue1",
"propertyBagPropertyName2": "PropertyBagValue2",
"dateTime1": "2015-04-12T12:13:14Z"
}
}
}]
}
}
アクティビティを実行すると、referenceObjects と extendedProperties が、SampleApp.exe の同じ実行フォルダーに配置されている次のファイルに格納されます。
activity.json
extendedProperties とカスタム アクティビティのプロパティが格納されます。
linkedServices.json
referenceObjects プロパティで定義されているリンクされたサービスの配列が格納されます。
datasets.json
referenceObjects プロパティで定義されているデータセットの配列が格納されます。
次のサンプル コードは、SampleApp.exe が JSON ファイルの必要な情報にアクセスする方法を示しています。
using Newtonsoft.Json;
using System;
using System.IO;
namespace SampleApp
{
class Program
{
static void Main(string[] args)
{
//From Extend Properties
dynamic activity = JsonConvert.DeserializeObject(File.ReadAllText("activity.json"));
Console.WriteLine(activity.typeProperties.extendedProperties.connectionString.value);
// From LinkedServices
dynamic linkedServices = JsonConvert.DeserializeObject(File.ReadAllText("linkedServices.json"));
Console.WriteLine(linkedServices[0].properties.typeProperties.accountName);
}
}
}
実行の出力を取得する
次の PowerShell コマンドを使用して、パイプラインの実行を開始できます。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
パイプラインが実行されているときは、次のコマンドを使用して実行出力を確認できます。
while ($True) {
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if(!$result) {
Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
}
elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
}
else {
Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
($result | Format-List | Out-String)
Start-Sleep -Seconds 15
}
Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
$result.Error -join "`r`n"
カスタム アプリケーションの stdout と stderr は、タスクの GUID を使用して Azure Batch のリンクされたサービスを作成したときに定義した、Azure Storage のリンクされたサービスの adfjobs コンテナーに保存されます。 次のスニペットに示すように、アクティビティの実行の出力から詳細なパスを取得できます。
Pipeline ' MyCustomActivity' run finished. Result:
ResourceGroupName : resourcegroupname
DataFactoryName : datafactoryname
ActivityName : MyCustomActivity
PipelineRunId : xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
PipelineName : MyCustomActivity
Input : {command}
Output : {exitcode, outputs, effectiveIntegrationRuntime}
LinkedServiceName :
ActivityRunStart : 10/5/2017 3:33:06 PM
ActivityRunEnd : 10/5/2017 3:33:28 PM
DurationInMs : 21203
Status : Succeeded
Error : {errorCode, message, failureType, target}
Activity Output section:
"exitcode": 0
"outputs": [
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stdout.txt",
"https://<container>.blob.core.windows.net/adfjobs/<GUID>/output/stderr.txt"
]
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
Activity Error section:
"errorCode": ""
"message": ""
"failureType": ""
"target": "MyCustomActivity"
ダウンストリームのアクティビティで stdout.txt の内容を使用する場合は、式 "@activity('MyCustomActivity').output.outputs[0]" で stdout.txt ファイルへのパスを取得できます。
重要
- activity.json、linkedServices.json、datasets.json は、Batch タスクのランタイム フォルダーに格納されます。 この例では、activity.json、linkedServices.json、datasets.json は、
https://adfv2storage.blob.core.windows.net/adfjobs/<GUID>/runtime/
パスに格納されています。 必要に応じて、パスを個別にクリーンアップする必要があります。 - セルフホステッド統合ランタイムを使用しているリンクされたサービスでは、顧客が定義したプライベート ネットワーク環境内に資格情報を保持できるように、セルフホステッド統合ランタイムによってキーやパスワードなどの機密情報が暗号化されます。 この場合、カスタム アプリケーション コードから一部の機密フィールドを参照したときにフィールドが見つからない可能性があります。 必要に応じて、リンクされたサービスの参照を使用するのではなく、extendedProperties で SecureString を使用してください。
別のアクティビティに出力を渡す
カスタム アクティビティ内のコードからサービスにカスタム値を送信して戻すことができます。 そのためには、アプリケーションからの outputs.json
にそれを書き込みます。 サービスでは、outputs.json
の内容がコピーされて、customOutput
プロパティの値としてアクティビティの出力に追加されます。 (サイズの制限は 2 MBです。)ダウンストリーム アクティビティで outputs.json
の内容を使用する場合は、式 @activity('<MyCustomActivity>').output.customOutput
を使用して値を取得できます。
SecureString 出力の取得
SecureString 型として指定された機密プロパティ値 (この記事の一部のサンプルに含まれています) は、ユーザー インターフェイスの [監視] タブでマスクされます。 ただし、実際のパイプライン実行では、SecureString プロパティは activity.json
ファイル内でプレーン テキストの JSON としてシリアル化されます。 次に例を示します。
"extendedProperties": {
"connectionString": {
"type": "SecureString",
"value": "aSampleSecureString"
}
}
このシリアル化は、真にセキュアなものではなく、セキュリティの確保を意図したものではありません。 [監視] タブで値をマスクすることを、サービスに示すことを意図しています。
カスタム アクティビティから SecureString 型のプロパティにアクセスするには、.EXE と同じフォルダーに配置されている activity.json
ファイルを読み取り、JSON を逆シリアル化した後、JSON プロパティにアクセスします (extendedProperties => [propertyName] => value)。
Azure Batch の自動スケール
自動スケール 機能で、Azure Batch プールを作成することもできます。 たとえば、専用 VM 数が 0 の Azure Batch プールと、保留中のタスクの数に基づく自動スケールの数式を作成できます。
このサンプル式は、次の動作を実現します。プールが最初に作成された場合、1 つの VM から開始されます。 $PendingTasks メトリックにより、実行中 + アクティブ (キューに登録済み) 状態のタスク数が定義されます。 この数式により、最後の 180 秒間の保留タスク平均数が判明し、TargetDedicated が適宜設定されます。 それにより、TargetDedicated が 25 台の仮想マシンを超えることはありません。 そのため、新しいタスクが送信されると、プールが自動的に増加します。タスクが完了すると、VM は 1 台ずつ解放され、自動スケールにより VM が減らされます。 startingNumberOfVMs と maxNumberofVMs はニーズに合わせて調整できます。
自動スケールの数式:
startingNumberOfVMs = 1;
maxNumberofVMs = 25;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicated=min(maxNumberofVMs,pendingTaskSamples);
詳細については、「 Azure Batch プール内のコンピューティング ノードの自動スケール 」をご覧ください。
プールで既定の autoScaleEvaluationIntervalを使用する場合、Batch サービスがカスタム アクティビティを実行する前に VM を準備するのに 15 ~ 30 分かかることがあります。 プールが異なる autoScaleEvaluationInterval を使用する場合、Batch サービスは autoScaleEvaluationInterval + 10 分を要することがあります。
関連するコンテンツ
別の手段でデータを変換する方法を説明している次の記事を参照してください。