次の方法で共有


Azure Databricks で Ray クラスターを作成して接続する

Azure Databricks で Ray コンピューティング クラスターを作成、構成、実行する方法について説明します

要件

Ray クラスターを作成するには、次の設定で Databricks All-Purpose Compute リソースにアクセスできる必要があります。

  • Databricks Runtime 12.2 LTS ML 以降。
  • アクセス モードは、単一ユーザーまたは分離なし共有である必要があります。

Note

現在、Ray クラスターはサーバーレス コンピューティングではサポートされていません。

Ray をインストールする

Databricks Runtime ML 15.0 以降では、Ray は Azure Databricks クラスターにプレインストールされています。

15.0 より前にリリースされたランタイムの場合は、pip を使って Ray をクラスターにインストールします。

%pip install ray[default]>=2.3.0

Azure Databricks クラスターにユーザー固有の Ray クラスターを作成する

Ray クラスターを作成するには、ray.util.spark.setup_ray_cluster API を使用します。

Note

ノートブックで Ray クラスターを作成すると、現在のノートブック ユーザーのみが使用できます。 Ray クラスターは、ノートブックがクラスターからデタッチされるか、非アクティブな (Ray にタスクが送信されない) 状態で 30 分経過すると、自動的にシャットダウンされます。 すべてのユーザーによって共有されていて、アクティブに実行されているノートブックの対象ではない Ray クラスターを作成する場合は、代わりに ray.util.spark.setup_global_ray_cluster API を使います。

固定サイズ Ray クラスター

固定サイズ Ray クラスターは、Azure Databricks クラスターにアタッチされている Azure Databricks ノートブックで次のコマンドを実行して開始できます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

自動スケーリング Ray クラスター

自動スケーリング Ray クラスターを開始する方法については、「Azure Databricks で Ray クラスターをスケーリングする」をご覧ください。

グローバル モード Ray クラスターの開始

Ray 2.9.0 以降を使うと、Azure Databricks クラスター上にグローバル モード Ray クラスターを作成できます。 グローバル モード Ray クラスターを使うと、Azure Databricks クラスターにアタッチされているすべてのユーザーも、Ray クラスターを使用できます。 Ray クラスターのこの実行モードには、シングルユーザー の Ray クラスター インスタンスを実行するときにシングルユーザー クラスターが持つアクティブなタイムアウト機能はありません。

複数のユーザーが Ray タスクにアタッチして実行できるグローバル Ray クラスターを開始するには、まず Azure Databricks ノートブック ジョブを作成し、それを共有モード Azure Databricks クラスターにアタッチしてから、次のコマンドを実行します。

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

これはブロック呼び出しであり、ユーザーがノートブックのコマンド セルの [割り込み] ボタンをクリックして呼び出しを中断するか、ノートブックを Azure Databricks クラスターからデタッチするか、Azure Databricks クラスターを終了するまで、アクティブ状態のままになります。 これらを行わない場合、グローバル モードの Ray クラスターは引き続き実行され、承認されたユーザーによるタスクの送信に使用できます。 グローバル モード クラスターの詳細については、Ray API のドキュメントを参照してください。

グローバル モード クラスターには、次のプロパティがあります。

  • Azure Databricks クラスターに作成できるでアクティブなグローバル モード Ray クラスターは、一度に 1 つだけです。
  • Azure Databricks クラスターでは、すべてのユーザーが、アタッチされているすべての Azure Databricks ノートブックで、アクティブなグローバル モード Ray クラスターを使用できます。 ray.init() を実行して、アクティブなグローバル モードの Ray クラスターに接続できます。 複数のユーザーがこの Ray クラスターにアクセスできるため、リソースの競合が問題になる可能性があります。
  • グローバル モードの Ray クラスターは、setup_ray_cluster 呼び出しが中断されるまで稼働します。 シングル ユーザーの Ray クラスターが持つ自動シャットダウン タイムアウト機能はありません。

Ray GPU クラスターを作成する

GPU クラスターの場合、次の方法でこれらのリソースを Ray クラスターに追加できます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray クライアントを使用してリモート Ray クラスターに接続する

Ray バージョン 2.3.0 以降では、setup_ray_cluster API を使って Ray クラスターを作成でき、同じノートブック内で ray.init() API を呼び出してこの Ray クラスターに接続できます。 リモート接続文字列を取得するには、次の方法を使います。

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

その後、上のリモート接続文字列を使ってリモート クラスターに接続できます。

import ray
ray.init(remote_conn_str)

Ray クライアントでは、ray.data モジュールで定義されている Ray データセット API はサポートされていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内部でラップできます。

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

構成する必要がある値は、https:// で始まる Azure Databricks ワークスペースの URL と /driver-proxy/o/ の後に、Ray クラスターの起動後に表示される Ray ダッシュボードのプロキシ URL で示されている値を付けたものです。

Ray Job CLI は、外部システムから Ray クラスターにジョブを送信するために使われますが、Azure Databricks 上の Ray クラスターでジョブを送信する場合は必要ありません。 Azure Databricks ジョブを使ってジョブをデプロイし、アプリケーションごとに Ray クラスターを作成し、Azure Databricks アセット バンドルやワークフロー トリガーなどの既存の Azure Databricks ツールを使ってジョブをトリガーすることをお勧めします。

ログ出力の場所を設定する

引数 collect_log_to_path を設定して、Ray クラスター ログを収集する宛先パスを指定できます。 ログ収集は、Ray クラスターがシャットダウンされた後に実行されます。

Azure Databricks では、Apache Spark クラスターを終了した場合でもログを保持するため、/dbfs/ で始まるパスまたは Unity Catalog ボリューム パスを設定することをお勧めします。 そうしないと、クラスターのシャットダウン時にクラスター上のローカル ストレージが削除されるため、ログは復旧できません。

Ray クラスターを作成したら、任意の Ray アプリケーション コードをノートブック内で直接実行できます。 [新しいタブで Ray クラスター ダッシュボードを開く] をクリックすると、クラスターの Ray ダッシュボードを表示できます。

Ray ダッシュボード アクター ページでスタック トレースとフレーム グラフを有効にする

Ray の [ダッシュボード アクター] ページでは、アクティブな Ray アクターのスタック トレースとフレーム グラフを表示できます。 この情報を表示するには、Ray クラスターを起動する前に、次のコマンドを使って py-spy をインストールします。

%pip install py-spy

ベスト プラクティスを作成して構成する

このセクションでは、Ray クラスターの作成と構成に関するベスト プラクティスについて説明します。

GPU 以外のワークロード

Ray クラスターは Azure Databricks Spark クラスターの上で実行されます。 一般的なシナリオでは、Spark ジョブと Spark UDF を使って、GPU リソースを必要としない単純なデータ前処理タスクを行います。 その後、Ray を使って GPU の利点を生かした複雑な機械学習タスクを実行します。 この場合、Azure Databricks では、すべての Apache Spark DataFrame 変換と Apache Spark UDF 実行で GPU リソースが使われないように、Apache Spark クラスター レベルの構成パラメーター spark.task.resource.gpu.amount を 0 に設定することをお勧めします。

この構成の利点は次のとおりです。

  • 通常、GPU インスタンスの種類には GPU デバイスより多くの CPU コアがあるため、Apache Spark ジョブの並列性が向上します。
  • 複数のユーザーで Apache Spark クラスターを共有している場合、このように構成すると、Apache Spark ジョブでの Ray ワークロードの同時実行による GPU リソースの競合が発生しなくなります。

Ray タスクで使用する場合に transformers トレーナー MLflow 統合を無効にする

transformers トレーナー MLflow 統合は、transformers ライブラリ内から既定で有効にされます。 Ray トレーニングを使って transformers モデルを微調整した場合、資格情報の問題のため Ray タスクが失敗します。 ただし、トレーニングに MLflow を直接使用する場合は、この問題は発生しません。 この問題を回避するには、Apache Spark クラスターを起動するときに、Azure Databricks クラスター構成内から DISABLE_MLFLOW_INTEGRATION 環境変数を 'TRUE' に設定します。

Address Ray リモート関数のピクル化エラー

Ray タスクを実行するため、Ray はタスク関数を Pickle 化します。 Pickle 化が失敗する場合は、エラーが発生しているコードの部分を診断する必要があります。 Pickle 化エラーの一般的な原因は、外部参照、クロージャ、ステートフル オブジェクトの参照の処理です。 検証してすばやく修正できる最も簡単なエラーの 1 つは、import ステートメントをタスク関数宣言内に移動することで解決できます。

たとえば、幅広く使われる datasets.load_dataset 関数は、Azure Databricks Runtime ドライバー側でパッチされ、参照を Pickle 化できなくします。 これには、次のようにタスク関数を記述するだけで対処できます。

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Ray タスクがメモリ不足 (OOM) エラーで予期せず強制終了される場合に、Ray メモリ モニターを無効にする

Ray 2.9.3 の Ray メモリ モニターにいくつかの既知の問題があり、何の理由もなく Ray タスクが誤って停止される可能性があります。 この問題に対処するには、Apache Spark クラスターを起動するときに、Azure Databricks クラスター構成内で環境変数 RAY_memory_monitor_refresh_ms0 に設定して、Ray メモリ モニターを無効にできます。

Ray から Spark データを読み取る

一般的なユース ケースは、Spark DataFrame からデータを Ray に読み込んで、さらに処理することです。 Databricks Runtime 15.0 for ML 以降では、関数を使用して、Spark DataFrame に含まれるデータを直接 Ray に読み込むのを簡略化できます。

この機能を効果的に使用するには、Spark クラスター構成 spark.databricks.pyspark.dataFrameChunk.enabledtrue に設定されていることを確認してから、ray.init() を実行して Ray クラスターを起動します。

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray はデータを一時的に書き込む必要がなく、Spark DataFrame の内容を直接フェッチし、直接処理できるようにします。

Spark から Ray データを読み取る

Ray から Spark データを読み取るのと同様に、Ray タスクの結果を Spark DataFrame として再び読み取る機能も、Unity Catalog を使用してサポートされています。 この機能を使用するには、Unity Catalog 対応のワークスペース内から Databricks Runtime 15.0 for ML 以降を実行している必要があります。

この機能を使用するには、環境変数 "_RAY_UC_VOLUMES_FUST_TEMP_DIR" が有効でアクセス可能な Unity Catalog ボリューム パス ("/Volumes/MyCatalog/MySchema/MyVolume/MyRayData" など) に設定されていることを確認します。

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

15.0 for ML より前の Databricks Runtime バージョンでは、ray.data モジュールから Ray Parquet Writer (ray_dataset.write_parquet()) を使用して、オブジェクト ストアの場所に直接書き込むことができます。 Spark では、ネイティブ リーダーでこの Parquet データを読み取ることができます。

データのバッチへの変換関数の適用

データをバッチ処理するときは、Ray Data API と共に map_batches 関数を使うことをお勧めします。 この方法を使うと、バッチ処理にメリットがある大規模なデータセットや複雑な計算の場合に特に、効率とスケーラビリティが向上します。 ray.data.from_spark API を使って、任意の Spark DataFrame を Ray データセットに変換できます。 この変換 API の呼び出しで処理された出力を、API ray.data.write_databricks_table を使って Azure Databricks の UC テーブルに書き出すことができます。

Ray タスクでの MLflow の使用 Ray タスクで MLflow を使うには、次のようにする必要があります。

  • Ray タスク内で Azure Databricks MLflow の資格情報を定義します。
  • Apache Spark ドライバー内で MLflow の実行を作成し、作成された run_id を Ray タスクに渡します。

次に示すのは、これを行うコードの例です。

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars( <Databricks>")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray タスクでノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用する

現在、Ray には、Ray タスクでノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用できない既知の問題があります。 Ray ジョブ内で追加の依存関係を利用するには、タスク内でこれらの依存関係を使う Ray-on-Spark クラスターを起動する前に、%pip マジック コマンドを使ってライブラリを手動でインストールする必要があります。 たとえば、Ray クラスターの起動に使われる Ray のバージョンを更新するには、ノートブックで次のコマンドを実行します。

%pip install ray==<The Ray version you want to use> --force-reinstall

その後、ノートブックで次のコマンドを実行して Python カーネルを再起動します。

dbutils.library.restartPython()

次のステップ