パイプラインで並列ジョブを使用する
適用対象:Azure CLI ml extension v2 (現行)
Python SDK azure-ai-ml v2 (現行)
この記事では、CLI v2 と Python SDK v2 を使用して Azure Machine Learning パイプラインで並列ジョブを実行する方法について説明します。 並列ジョブを使うことで、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。
機械学習エンジニアは、常にトレーニングまたは推論タスクに関するスケールの要件を抱えています。 たとえば、データ科学者が売上予測モデルをトレーニングするための 1 つのスクリプトを提供したら、機械学習エンジニアはこのトレーニング タスクを個々の各データ ストアに適用する必要があります。 このスケールアウト プロセスの課題には、遅延を引き起こす実行時間の長さや、タスクの実行を維持するために手動による介入を必要とする予期しない問題などがあります。
Azure Machine Learning の並列処理の主要な役割は、1 つのシリアル タスクをミニバッチに分割し、それらのミニバッチを複数のコンピューティングにディスパッチして並列に実行することです。 並列ジョブでは、エンドツーエンドの実行時間が大幅に短縮され、エラーも自動的に処理されます。 Azure Machine Learning の並列ジョブを使用して、パーティション分割されたデータに基づいて多くのモデルをトレーニングしたり、大規模バッチ推論タスクを促進したりすることについて考えるとします。
たとえば、多数の画像に対して物体検出モデルを実行しているシナリオの場合、Azure Machine Learning の並列ジョブを使用すると、特定のコンピューティング クラスター上でカスタム コードを並列で実行するために画像を簡単に分散できます。 並列化により、時間コストを大幅に削減できます。 Azure Machine Learning の並列ジョブでは、プロセスを簡略化して自動化し、ジョブの効率をさらに上げることもできます。
前提条件
- Azure Machine Learning アカウントとワークスペースを持っている。
- Azure Machine Learning パイプラインについて理解します。
- Azure CLI と
ml
拡張機能をインストールします。 詳しくは、CLI (v2) のインストール、設定、使用に関するページをご覧ください。ml
拡張機能は、az ml
コマンドを初めて実行したときに自動的にインストールされます。 - CLI v2 を使用して Azure Machine Learning パイプラインとコンポーネントを作成して実行する方法について理解します。
並列ジョブ ステップを持つパイプラインを作成して実行する
Azure Machine Learning の並列ジョブは、パイプライン ジョブのステップとしてのみ使用できます。
次の例は、Azure Machine Learning の例リポジトリにあるパイプラインで並列ジョブを使用してパイプライン ジョブを実行する方法に由来します。
並列化を準備する
この並列ジョブ ステップには準備が必要です。 定義済みの関数を実装するエントリ スクリプトが必要です。 また、並列ジョブ定義で次の属性を設定する必要があります。
- 入力データを定義してバインドします。
- データの分割方法を設定します。
- コンピューティング リソースを構成します。
- エントリ スクリプトを呼び出します。
次のセクションでは、並列ジョブを準備する方法について説明します。
入力とデータ分割の設定を宣言する
並列ジョブでは、1 つの主要な入力を分割して並列に処理する必要があります。 主要な入力データ形式は、表形式データまたはファイルのリストのいずれかです。
データ形式が異なると、入力の種類、入力モード、データの分割方法が異なります。 次の表では、このオプションについて説明します。
データ形式 | 入力型 | 入力モード | データ分割の方法 |
---|---|---|---|
ファイル一覧 | mltable または uri_folder |
ro_mount または download |
サイズ別 (ファイルの数) またはパーティション別 |
表形式データ | mltable |
direct |
サイズ別 (推定物理サイズ) またはパーティション別 |
Note
表形式の mltable
を主要な入力データとして使用する場合は、次の手順を実行する必要があります。
- この Conda ファイルの 9 行目にあるように、
mltable
ライブラリをご利用の環境内にインストールします。 - 指定したパスの下に、
transformations: - read_delimited:
セクションを入力した MLTable 仕様ファイルを配置します。例については、「データ資産の作成と管理」を参照してください。
並列ジョブ YAML または Python で input_data
属性を使用して主要な入力データを宣言し、${{inputs.<input name>}}
を使用して並列ジョブの定義済みの input
にデータをバインドできます。 次に、データの分割方法に応じて、主要な入力のデータ分割属性を定義します。
データ分割の方法 | 属性名 | 属性の型 | ジョブの例 |
---|---|---|---|
サイズ別 | mini_batch_size |
string | アヤメのバッチ予測 |
パーティション別 | partition_keys |
文字列のリスト | オレンジ ジュースの売上予測 |
並列化用にコンピューティング リソースを構成する
データ分割属性を定義したら、instance_count
属性と max_concurrency_per_instance
属性を設定して、並列化のためのコンピューティング リソースを構成します。
Attribute name | タイプ | 説明 | 既定値 |
---|---|---|---|
instance_count |
整数 (integer) | ジョブに使用するノードの数。 | 1 |
max_concurrency_per_instance |
整数 (integer) | 各ノードのプロセッサの数。 | GPU コンピューティングの場合: 1。 CPU コンピューティングの場合: コアの数。 |
これらの属性は、次の図に示すように、指定したコンピューティング クラスターと連携します。
エントリ スクリプトを呼び出す
エントリ スクリプトは、カスタム コードを使用して次の 3 つの定義済み関数を実装する 1 つの Python ファイルです。
関数名 | 必須 | Description | 入力 | 戻り値 |
---|---|---|---|---|
Init() |
年 | ミニバッチの実行を開始する前の一般的な準備を実施します。 たとえば、この関数を使用して、モデルをグローバル オブジェクトに読み込みます。 | -- | -- |
Run(mini_batch) |
年 | ミニバッチ用のメイン実行ロジックを実装します。 | mini_batch は、入力データが表形式データの場合は Pandas DataFrame であり、入力データがディレクトリの場合はファイル パス リストです。 |
データフレーム、リスト、またはタプル。 |
Shutdown() |
N | コンピューティングをプールに返す前にカスタム クリーンアップを実行する省略可能な関数。 | -- | -- |
重要
Init()
関数または Run(mini_batch)
関数で引数を解析するときに例外を回避するには、parse_args
の代わりに parse_known_args
を使用します。 引数パーサーを含むエントリ スクリプトについては、iris_score の例をご覧ください。
重要
Run(mini_batch)
関数は、データフレーム、リスト、またはタプル項目のいずれかを返す必要があります。 この並列ジョブでは、その戻り値の数を使って、そのミニバッチで成功した項目の数を測定します。 ミニバッチ数は、すべての項目が処理されている場合、戻り値リストの数と等しくなる必要があります。
並列ジョブは、次の図に示すように、各プロセッサで関数を実行します。
次のエントリ スクリプトの例を参照してください。
エントリ スクリプトを呼び出すには、並列ジョブ定義で次の 2 つの属性を設定します。
Attribute name | タイプ | 説明 |
---|---|---|
code |
string | ジョブにアップロードして使用するためのソース コード ディレクトリへのローカル パス。 |
entry_script |
string | 定義済みの並列関数の実装を含む Python ファイル。 |
並列ジョブ ステップの例
次の並列ジョブ ステップでは、入力の種類、モード、データの分割方法を宣言し、入力をバインドし、コンピューティングを構成し、エントリ スクリプトを呼び出します。
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
自動化の設定を検討する
Azure Machine Learning の並列ジョブでは、手動による介入なしでジョブを自動的に制御できるオプションの設定が多数公開されています。 次の表にこれらの設定を示します。
キー | Type | 説明 | 使用できる値 | 規定値 | 属性またはプログラム引数で設定する |
---|---|---|---|---|---|
mini_batch_error_threshold |
integer | この並列ジョブで無視する失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 ミニバッチは、次の場合に失敗としてマークされます。 - run() からの戻り値の数が、ミニバッチの入力数未満である場合。- 例外がカスタム run() コードで捕捉された場合。 |
[-1, int.max] |
-1 は、失敗したすべてのミニバッチを無視することを示します |
属性 mini_batch_error_threshold |
mini_batch_max_retries |
integer | ミニバッチが失敗またはタイムアウトしたときの再試行回数。すべての再試行が失敗した場合、ミニバッチは mini_batch_error_threshold 計算に従って失敗としてマークされます。 |
[0, int.max] |
2 |
属性 retry_settings.max_retries |
mini_batch_timeout |
integer | カスタム run() 関数を実行するためのタイムアウト (秒単位)。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗としてマークされて再試行がトリガーされます。 |
(0, 259200] |
60 |
属性 retry_settings.timeout |
item_error_threshold |
integer | 失敗した項目数のしきい値。 失敗した項目の数は、入力数と各ミニバッチから返された数の間のギャップによってカウントされます。 失敗した項目の合計がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 | [-1, int.max] |
-1 は、並列ジョブ中のすべての失敗を無視することを示します |
プログラム引数--error_threshold |
allowed_failed_percent |
integer | mini_batch_error_threshold に似ていますが、回数でなく、失敗したミニバッチの割合を使います。 |
[0, 100] |
100 |
プログラム引数--allowed_failed_percent |
overhead_timeout |
integer | 各ミニバッチの初期化のタイムアウト (秒)。 たとえば、ミニバッチのデータを読み込んで run() 関数に渡します。 |
(0, 259200] |
600 |
プログラム引数--task_overhead_timeout |
progress_update_timeout |
integer | ミニバッチの実行の進行状況を監視するためのタイムアウト (秒)。 このタイムアウト設定内で進行状況の更新が受け取られない場合、並列ジョブは失敗としてマークされます。 | (0, 259200] |
他の設定によって動的に計算 | プログラム引数--progress_update_timeout |
first_task_creation_timeout |
integer | ジョブの開始から最初のミニバッチの実行までの時間を監視するためのタイムアウト (秒単位)。 | (0, 259200] |
600 |
プログラム引数--first_task_creation_timeout |
logging_level |
string | ユーザー ログ ファイルにダンプするログのレベル。 | INFO 、 WARNING 、または DEBUG |
INFO |
属性 logging_level |
append_row_to |
string | ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} 式を使用して、並列ジョブの出力のいずれかを参照できます |
属性 task.append_row_to |
||
copy_logs_to_parent |
string | ジョブの進行状況、概要、ログを親パイプライン ジョブにコピーするかどうかを示すブール値オプション。 | True または False |
False |
プログラム引数--copy_logs_to_parent |
resource_monitor_interval |
integer | ノード リソースの使用状況 (CPU やメモリなど) を "logs/sys/perf" パスの下のログ フォルダーにダンプする時間間隔 (秒)。 注: ダンプ リソース ログが頻繁に記録されると、実行速度が若干遅くなります。 リソース使用状況のダンプを停止するには、この値を 0 に設定します。 |
[0, int.max] |
600 |
プログラム引数--resource_monitor_interval |
次のサンプル コードは、これらの設定を更新します。
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
program_arguments: >-
--model ${{inputs.score_model}}
--error_threshold 5
--allowed_failed_percent 30
--task_overhead_timeout 1200
--progress_update_timeout 600
--first_task_creation_timeout 600
--copy_logs_to_parent True
--resource_monitor_interva 20
append_row_to: ${{outputs.job_output_file}}
並列ジョブ ステップを持つパイプラインを作成する
次の例は、並列ジョブ ステップをインラインで使用した完全なパイプライン ジョブを示しています。
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
tag: tagvalue
owner: sdkteam
settings:
default_compute: azureml:cpu-cluster
jobs:
batch_prediction:
type: parallel
compute: azureml:cpu-cluster
inputs:
input_data:
type: mltable
path: ./neural-iris-mltable
mode: direct
score_model:
type: uri_folder
path: ./iris-model
mode: download
outputs:
job_output_file:
type: uri_file
mode: rw_mount
input_data: ${{inputs.input_data}}
mini_batch_size: "10kb"
resources:
instance_count: 2
max_concurrency_per_instance: 2
logging_level: "DEBUG"
mini_batch_error_threshold: 5
retry_settings:
max_retries: 2
timeout: 60
task:
type: run_function
code: "./script"
entry_script: iris_prediction.py
environment:
name: "prs-env"
version: 1
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: ./environment/environment_parallel.yml
program_arguments: >-
--model ${{inputs.score_model}}
--error_threshold 5
--allowed_failed_percent 30
--task_overhead_timeout 1200
--progress_update_timeout 600
--first_task_creation_timeout 600
--copy_logs_to_parent True
--resource_monitor_interva 20
append_row_to: ${{outputs.job_output_file}}
パイプライン ジョブを送信する
az ml job create
CLI コマンドを使うことにより、並列ステップを持つパイプライン ジョブを送信します。
az ml job create --file pipeline.yml
スタジオ UI で並列ステップを確認する
パイプライン ジョブを送信すると、SDK または CLI ウィジェットによって、Azure Machine Learning スタジオ UI のパイプライン グラフへの Web URL リンクが表示されます。
並列ジョブの結果を表示するには、パイプライン グラフで並列ステップをダブルクリックし、詳細パネルで [設定] タブを選択し、[実行設定] を展開し、[並列] セクションを展開します。
並列ジョブの失敗をデバッグするには、[出力とログ] タブを選択し、logs フォルダーを展開して、並列ジョブが失敗した理由を job_result.txt から確認します。 並列ジョブのログ記録構造については、同じフォルダー内の readme.txt を参照してください。