次の方法で共有


パイプラインで並列ジョブを使用する

適用対象:Azure CLI ml extension v2 (現行)Python SDK azure-ai-ml v2 (現行)

この記事では、CLI v2 と Python SDK v2 を使用して Azure Machine Learning パイプラインで並列ジョブを実行する方法について説明します。 並列ジョブを使うことで、繰り返されるタスクを強力なマルチノード コンピューティング クラスターに分散させて、ジョブの実行を高速化できます。

機械学習エンジニアは、常にトレーニングまたは推論タスクに関するスケールの要件を抱えています。 たとえば、データ科学者が売上予測モデルをトレーニングするための 1 つのスクリプトを提供したら、機械学習エンジニアはこのトレーニング タスクを個々の各データ ストアに適用する必要があります。 このスケールアウト プロセスの課題には、遅延を引き起こす実行時間の長さや、タスクの実行を維持するために手動による介入を必要とする予期しない問題などがあります。

Azure Machine Learning の並列処理の主要な役割は、1 つのシリアル タスクをミニバッチに分割し、それらのミニバッチを複数のコンピューティングにディスパッチして並列に実行することです。 並列ジョブでは、エンドツーエンドの実行時間が大幅に短縮され、エラーも自動的に処理されます。 Azure Machine Learning の並列ジョブを使用して、パーティション分割されたデータに基づいて多くのモデルをトレーニングしたり、大規模バッチ推論タスクを促進したりすることについて考えるとします。

たとえば、多数の画像に対して物体検出モデルを実行しているシナリオの場合、Azure Machine Learning の並列ジョブを使用すると、特定のコンピューティング クラスター上でカスタム コードを並列で実行するために画像を簡単に分散できます。 並列化により、時間コストを大幅に削減できます。 Azure Machine Learning の並列ジョブでは、プロセスを簡略化して自動化し、ジョブの効率をさらに上げることもできます。

前提条件

並列ジョブ ステップを持つパイプラインを作成して実行する

Azure Machine Learning の並列ジョブは、パイプライン ジョブのステップとしてのみ使用できます。

並列化を準備する

この並列ジョブ ステップには準備が必要です。 定義済みの関数を実装するエントリ スクリプトが必要です。 また、並列ジョブ定義で次の属性を設定する必要があります。

  • 入力データを定義してバインドします。
  • データの分割方法を設定します。
  • コンピューティング リソースを構成します。
  • エントリ スクリプトを呼び出します。

次のセクションでは、並列ジョブを準備する方法について説明します。

入力とデータ分割の設定を宣言する

並列ジョブでは、1 つの主要な入力を分割して並列に処理する必要があります。 主要な入力データ形式は、表形式データまたはファイルのリストのいずれかです。

データ形式が異なると、入力の種類、入力モード、データの分割方法が異なります。 次の表では、このオプションについて説明します。

データ形式 入力型 入力モード データ分割の方法
ファイル一覧 mltable または uri_folder ro_mount または download サイズ別 (ファイルの数) またはパーティション別
表形式データ mltable direct サイズ別 (推定物理サイズ) またはパーティション別

Note

表形式の mltable を主要な入力データとして使用する場合は、次の手順を実行する必要があります。

  • この Conda ファイルの 9 行目にあるように、mltable ライブラリをご利用の環境内にインストールします。
  • 指定したパスの下に、transformations: - read_delimited: セクションを入力した MLTable 仕様ファイルを配置します。例については、「データ資産の作成と管理」を参照してください。

並列ジョブ YAML または Python で input_data 属性を使用して主要な入力データを宣言し、${{inputs.<input name>}} を使用して並列ジョブの定義済みの input にデータをバインドできます。 次に、データの分割方法に応じて、主要な入力のデータ分割属性を定義します。

データ分割の方法 属性名 属性の型 ジョブの例
サイズ別 mini_batch_size string アヤメのバッチ予測
パーティション別 partition_keys 文字列のリスト オレンジ ジュースの売上予測

並列化用にコンピューティング リソースを構成する

データ分割属性を定義したら、instance_count 属性と max_concurrency_per_instance 属性を設定して、並列化のためのコンピューティング リソースを構成します。

Attribute name タイプ 説明 既定値
instance_count 整数 (integer) ジョブに使用するノードの数。 1
max_concurrency_per_instance 整数 (integer) 各ノードのプロセッサの数。 GPU コンピューティングの場合: 1。 CPU コンピューティングの場合: コアの数。

これらの属性は、次の図に示すように、指定したコンピューティング クラスターと連携します。

並列ジョブでの分散データの動作を示す図。

エントリ スクリプトを呼び出す

エントリ スクリプトは、カスタム コードを使用して次の 3 つの定義済み関数を実装する 1 つの Python ファイルです。

関数名 必須 Description 入力 戻り値
Init() ミニバッチの実行を開始する前の一般的な準備を実施します。 たとえば、この関数を使用して、モデルをグローバル オブジェクトに読み込みます。 -- --
Run(mini_batch) ミニバッチ用のメイン実行ロジックを実装します。 mini_batch は、入力データが表形式データの場合は Pandas DataFrame であり、入力データがディレクトリの場合はファイル パス リストです。 データフレーム、リスト、またはタプル。
Shutdown() N コンピューティングをプールに返す前にカスタム クリーンアップを実行する省略可能な関数。 -- --

重要

Init() 関数または Run(mini_batch) 関数で引数を解析するときに例外を回避するには、parse_args の代わりに parse_known_args を使用します。 引数パーサーを含むエントリ スクリプトについては、iris_score の例をご覧ください。

重要

Run(mini_batch) 関数は、データフレーム、リスト、またはタプル項目のいずれかを返す必要があります。 この並列ジョブでは、その戻り値の数を使って、そのミニバッチで成功した項目の数を測定します。 ミニバッチ数は、すべての項目が処理されている場合、戻り値リストの数と等しくなる必要があります。

並列ジョブは、次の図に示すように、各プロセッサで関数を実行します。

並列ジョブでのエントリ スクリプトの動作を示す図。

次のエントリ スクリプトの例を参照してください。

エントリ スクリプトを呼び出すには、並列ジョブ定義で次の 2 つの属性を設定します。

Attribute name タイプ 説明
code string ジョブにアップロードして使用するためのソース コード ディレクトリへのローカル パス。
entry_script string 定義済みの並列関数の実装を含む Python ファイル。

並列ジョブ ステップの例

次の並列ジョブ ステップでは、入力の種類、モード、データの分割方法を宣言し、入力をバインドし、コンピューティングを構成し、エントリ スクリプトを呼び出します。

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml

自動化の設定を検討する

Azure Machine Learning の並列ジョブでは、手動による介入なしでジョブを自動的に制御できるオプションの設定が多数公開されています。 次の表にこれらの設定を示します。

キー Type 説明 使用できる値 規定値 属性またはプログラム引数で設定する
mini_batch_error_threshold integer この並列ジョブで無視する失敗したミニバッチの数。 失敗したミニバッチの数がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。

ミニバッチは、次の場合に失敗としてマークされます。
- run() からの戻り値の数が、ミニバッチの入力数未満である場合。
- 例外がカスタム run() コードで捕捉された場合。
[-1, int.max] -1 は、失敗したすべてのミニバッチを無視することを示します 属性 mini_batch_error_threshold
mini_batch_max_retries integer ミニバッチが失敗またはタイムアウトしたときの再試行回数。すべての再試行が失敗した場合、ミニバッチは mini_batch_error_threshold 計算に従って失敗としてマークされます。 [0, int.max] 2 属性 retry_settings.max_retries
mini_batch_timeout integer カスタム run() 関数を実行するためのタイムアウト (秒単位)。 実行時間がこのしきい値を超えた場合、そのミニバッチは中止され、失敗としてマークされて再試行がトリガーされます。 (0, 259200] 60 属性 retry_settings.timeout
item_error_threshold integer 失敗した項目数のしきい値。 失敗した項目の数は、入力数と各ミニバッチから返された数の間のギャップによってカウントされます。 失敗した項目の合計がこのしきい値を超えた場合、並列ジョブは失敗としてマークされます。 [-1, int.max] -1 は、並列ジョブ中のすべての失敗を無視することを示します プログラム引数
--error_threshold
allowed_failed_percent integer mini_batch_error_threshold に似ていますが、回数でなく、失敗したミニバッチの割合を使います。 [0, 100] 100 プログラム引数
--allowed_failed_percent
overhead_timeout integer 各ミニバッチの初期化のタイムアウト (秒)。 たとえば、ミニバッチのデータを読み込んで run() 関数に渡します。 (0, 259200] 600 プログラム引数
--task_overhead_timeout
progress_update_timeout integer ミニバッチの実行の進行状況を監視するためのタイムアウト (秒)。 このタイムアウト設定内で進行状況の更新が受け取られない場合、並列ジョブは失敗としてマークされます。 (0, 259200] 他の設定によって動的に計算 プログラム引数
--progress_update_timeout
first_task_creation_timeout integer ジョブの開始から最初のミニバッチの実行までの時間を監視するためのタイムアウト (秒単位)。 (0, 259200] 600 プログラム引数
--first_task_creation_timeout
logging_level string ユーザー ログ ファイルにダンプするログのレベル。 INFOWARNING、または DEBUG INFO 属性 logging_level
append_row_to string ミニバッチの各実行から戻されたすべての値を集約して、このファイルに出力します。 ${{outputs.<output_name>}} 式を使用して、並列ジョブの出力のいずれかを参照できます 属性 task.append_row_to
copy_logs_to_parent string ジョブの進行状況、概要、ログを親パイプライン ジョブにコピーするかどうかを示すブール値オプション。 True または False False プログラム引数
--copy_logs_to_parent
resource_monitor_interval integer ノード リソースの使用状況 (CPU やメモリなど) を "logs/sys/perf" パスの下のログ フォルダーにダンプする時間間隔 (秒)。

注: ダンプ リソース ログが頻繁に記録されると、実行速度が若干遅くなります。 リソース使用状況のダンプを停止するには、この値を 0 に設定します。
[0, int.max] 600 プログラム引数
--resource_monitor_interval

次のサンプル コードは、これらの設定を更新します。

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

並列ジョブ ステップを持つパイプラインを作成する

次の例は、並列ジョブ ステップをインラインで使用した完全なパイプライン ジョブを示しています。

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

パイプライン ジョブを送信する

az ml job create CLI コマンドを使うことにより、並列ステップを持つパイプライン ジョブを送信します。

az ml job create --file pipeline.yml

スタジオ UI で並列ステップを確認する

パイプライン ジョブを送信すると、SDK または CLI ウィジェットによって、Azure Machine Learning スタジオ UI のパイプライン グラフへの Web URL リンクが表示されます。

並列ジョブの結果を表示するには、パイプライン グラフで並列ステップをダブルクリックし、詳細パネルで [設定] タブを選択し、[実行設定] を展開し、[並列] セクションを展開します。

並列ジョブの設定を示す Azure Machine Learning スタジオのスクリーンショット。

並列ジョブの失敗をデバッグするには、[出力とログ] タブを選択し、logs フォルダーを展開して、並列ジョブが失敗した理由を job_result.txt から確認します。 並列ジョブのログ記録構造については、同じフォルダー内の readme.txt を参照してください。

並列ジョブの結果を示す Azure Machine Learning スタジオの [ジョブ] タブのスクリーンショット。