Azure Databricks で Ray クラスターを起動する
Azure Databricks では、クラスターとジョブの構成を Apache Spark ジョブの場合と同じように処理することで、Ray クラスターを開始するプロセスが簡略化されます。 これは、Ray クラスターが実際にはマネージド Apache Spark クラスター上で起動されるためです。
ローカル コンピューターで Ray を実行する
import ray
ray.init()
Azure Databricks で Ray を実行する
from ray.util.spark import setup_ray_cluster
import ray
# If the cluster has four workers with 8 CPUs each as an example
setup_ray_cluster(num_worker_nodes=4, num_cpus_per_worker=8)
# Pass any custom configuration to ray.init
ray.init(ignore_reinit_error=True)
このアプローチは、数個から数百個のノードまで、任意のクラスター スケールで機能します。 Azure Databricks 上の Ray クラスターでは、自動スケーリングもサポートされています。
Ray クラスター作成後、Azure Databricks ノートブック内で任意の Ray アプリケーション コードを実行できます。
重要
Databricks は、%pip install <your-library-dependency>
を使用してアプリケーションに必要なすべてのライブラリをインストールすることで、Ray クラスターとアプリケーションがライブラリを使用できるようにすることを推奨しています。 Ray init 関数呼び出しで依存関係を指定すると、Apache Spark ワーカー ノードがアクセスできない場所に依存関係がインストールされ、バージョンの非互換性とインポート エラーが発生します。
たとえば、以下のようにして、Azure Databricks ノートブック内で単純な Ray アプリケーションを実行できます。
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
Ray クラスターをシャットダウンする
Ray クラスターは、次の状況で自動的にシャットダウンされます。
- Azure Databricks クラスターから対話型ノートブックをデタッチする。
- Azure Databricks ジョブが完了する。
- Azure Databricks クラスターが再起動または終了される。
- 指定されたアイドル時間の間アクティビティが存在しない。
Azure Databricks で実行されている Ray クラスターをシャットダウンするために、ray.utils.spark.shutdown_ray_cluster
API を呼び出すことができます。
from ray.utils.spark import shutdown_ray_cluster
import ray
shutdown_ray_cluster()
ray.shutdown()