xgboost.spark
を使用した XGBoost モデルの分散トレーニング
重要
この機能はパブリック プレビュー段階にあります。
Python パッケージ xgboost>=1.7 には、新しいモジュール xgboost.spark
が含まれています。 このモジュールには、xgboost PySpark 推定器 xgboost.spark.SparkXGBRegressor
、xgboost.spark.SparkXGBClassifier
および xgboost.spark.SparkXGBRanker
が含まれています。 これらの新しいクラスは、SparkML パイプラインに XGBoost 推定器を含めることをサポートします。 API の詳細については、XGBoost Python Spark API ドキュメントを参照してください。
必要条件
Databricks Runtime 12.0 ML 以降。
xgboost.spark
パラメーター
xgboost.spark
モジュールで定義されている推定器は、標準の XGBoost で使用されるのと同じパラメーターと引数のほとんどをサポートしています。
- クラス コンストラクター、
fit
メソッド、およびpredict
メソッドのパラメーターは、xgboost.sklearn
モジュールのものとほとんど同じです。 - 名前付け、値、既定値は、「XGBoost パラメーター」で説明されているものとほぼ同じです。
- 例外は、サポートされていないいくつかのパラメーター (
gpu_id
、nthread
、sample_weight
、eval_set
など) と、追加されたpyspark
推定器固有のパラメーター (featuresCol
、labelCol
、use_gpu
、validationIndicatorCol
など) です。 詳細については、XGBoost Python Spark API ドキュメントを参照してください。
分散トレーニング
xgboost.spark
モジュールで定義されている PySpark 推定器は、num_workers
パラメーターを使用した分散 XGBoost トレーニングをサポートします。 分散トレーニングを使用するには、分類子またはリグレッサーを作成し、分散トレーニング中に同時に実行される Spark タスクの数を num_workers
に設定します。 すべての Spark タスク スロットを使用するには、num_workers=sc.defaultParallelism
を設定します。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 分散 XGBoost では
mlflow.xgboost.autolog
を使用できません。 MLflow を使用して xgboost Spark モデルをログに記録するには、mlflow.spark.log_model(spark_xgb_model, artifact_path)
を使用します。 - 自動スケールが有効になっているクラスターで分散 XGBoost を使用することはできません。 このエラスティック スケーリング パラダイムで開始される新しいワーカー ノードは、新しいタスク セットを受け取ることができず、アイドル状態のままになります。 自動スケールを無効にする手順については、自動スケールを有効にする を参照してください。
スパース特徴データセットのトレーニングの最適化を有効にする
xgboost.spark
モジュールで定義されている PySpark 推定器は、スパース特徴を持つデータセットに対するトレーニングの最適化をサポートします。
スパース特徴セットの最適化を有効にするには、型 fit
の値で構成される特徴列を含むデータセットを pyspark.ml.linalg.SparseVector
メソッドに指定し、推定器パラメーター enable_sparse_data_optim
を True
に設定する必要があります。 さらに、missing
パラメーターを 0.0
に設定する必要があります。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU トレーニング
xgboost.spark
モジュールで定義されている PySpark 推定器は、GPU のトレーニングをサポートします。 GPU トレーニングを有効にするには、パラメーター use_gpu
を True
に設定します。
注意
XGBoost 分散トレーニングで使用される Spark タスクごとに、use_gpu
引数が True
に設定されている場合、トレーニングで使用される GPU は 1 つだけです。 Databricks では、Spark クラスター構成 1
に既定値の spark.task.resource.gpu.amount
を使用することを推奨しています。 それ以外の場合、この Spark タスクに割り当てられた追加の GPU はアイドル状態となります。
次に例を示します。
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
トラブルシューティング
マルチノード トレーニング中に NCCL failure: remote process exited or there was a network error
メッセージが表示された場合、通常、GPU 間のネットワーク通信に問題があることを示します。 この問題は、NCCL (NVIDIA Collective Communications Library) が GPU 通信に特定のネットワーク インターフェイスを使用できない場合に発生します。
解決するには、クラスターの sparkConf for spark.executorEnv.NCCL_SOCKET_IFNAME
を eth
に設定します。 これにより、基本的に環境変数 NCCL_SOCKET_IFNAME
がノード内のすべてのワーカーに対して eth
に設定されます。
ノートブックの例
このノートブックは、Spark MLlib での Python パッケージ xgboost.spark
の使用を示しています。
PySpark-XGBoost ノートブック
非推奨の sparkdl.xgboost
モジュールの移行ガイド
from sparkdl.xgboost import XgboostRegressor
をfrom xgboost.spark import SparkXGBRegressor
に置き換え、from sparkdl.xgboost import XgboostClassifier
をfrom xgboost.spark import SparkXGBClassifier
に置き換えます。- 推定器コンストラクターのすべてのパラメーター名を camelCase スタイルから snake_case スタイルに変更します。 たとえば、
XgboostRegressor(featuresCol=XXX)
をSparkXGBRegressor(features_col=XXX)
に変更します。 - パラメーター
use_external_storage
とexternal_storage_precision
が削除されました。xgboost.spark
推定器では、DMatrix データ イテレーション API を使用してメモリをより効率的に使用することができます。 非効率的な外部ストレージ モードを使用する必要がなくなりました。 非常に大規模なデータセットの場合、Databricks ではnum_workers
パラメーターを増やすことを推奨しています。これにより、各トレーニング タスクでデータが小さく、管理しやすいデータ パーティションにパーティション分割されます。 クラスター内の Spark タスク スロットの合計数にnum_workers = sc.defaultParallelism
を設定するnum_workers
を設定することを検討してください。 xgboost.spark
で定義されている推定器に対してnum_workers=1
を設定すると、1 つの Spark タスクでモデル トレーニングが実行されます。 これにより、Spark クラスター構成設定spark.task.cpus
で指定された CPU コア数 (既定では 1) が使用されます。 より多くの CPU コアを使用してモデルをトレーニングするには、num_workers
またはspark.task.cpus
を増やします。nthread
で定義されている推定器に対してn_jobs
またはxgboost.spark
パラメーターを設定することはできません。 この動作は、非推奨のsparkdl.xgboost
パッケージで定義されている推定器の以前の動作とは異なります。
sparkdl.xgboost
モデルを xgboost.spark
モデルに変換する
sparkdl.xgboost
モデルは、xgboost.spark
モデルとは異なる形式で保存され、パラメーター設定が異なります。 モデルを変換するには、次のユーティリティ関数を使用します。
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
pyspark.ml.PipelineModel
モデルを最後のステージとして含む sparkdl.xgboost
モデルがある場合は、sparkdl.xgboost
モデルのステージを変換された xgboost.spark
モデルに置き換えることができます。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)