Verteiltes Training von XGBoost-Modellen mit xgboost.spark
Wichtig
Dieses Feature befindet sich in der Public Preview.
Das Python-Paket „xgboost>=1.7“ enthält das neue Modul xgboost.spark
. Dieses Modul enthält die PySpark-Schätzer xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
und xgboost.spark.SparkXGBRanker
für XGBoost. Diese neuen Klassen unterstützen die Einbindung von XGBoost-Schätzern in SparkML-Pipelines. Details zur API finden Sie in der Dokumentation zur Python-Spark-API für XGBoost.
Anforderungen
Databricks Runtime 12.0 ML und höher
xgboost.spark
-Parameter
Die im Modul xgboost.spark
definierten Schätzer unterstützen größtenteils dieselben Parameter und Argumente, die auch in standardmäßigem XGBoost verwendet werden.
- Die Parameter für den Klassenkonstruktor sowie für die Methoden
fit
undpredict
sind weitgehend identisch mit denen im Modulxgboost.sklearn
. - Benennung, Werte und Standardeinstellungen sind größtenteils identisch mit denen, die unter XGBoost-Parametern beschrieben sind.
- Ausnahmen sind einige nicht unterstützte Parameter (z. B.
gpu_id
,nthread
,sample_weight
,eval_set
) und die spezifischen Parameter, die für den Schätzerpyspark
hinzugefügt wurden (z. B.featuresCol
,labelCol
,use_gpu
,validationIndicatorCol
). Weitere Informationen finden Sie in der Dokumentation zur Python-Spark-API für XGBoost.
Verteiltes Training
Im Modul xgboost.spark
definierte PySpark-Schätzer unterstützen verteiltes XGBoost-Training mithilfe des Parameters num_workers
. Erstellen Sie für verteiltes Training einen Klassifizierer oder Regressor, und legen Sie num_workers
während des verteilten Trainings auf die Anzahl gleichzeitig ausgeführter Spark-Aufgaben fest. Um alle Spark-Aufgabenslots zu verwenden, legen Sie num_workers=sc.defaultParallelism
fest.
Zum Beispiel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Hinweis
- Sie können
mlflow.xgboost.autolog
nicht mit verteiltem XGBoost verwenden. Um ein Spark-Modell für XGBoost mit MLflow zu protokollieren, verwenden Siemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Verteiltes XGBoost ist in einem Cluster mit aktivierter automatischer Skalierung nicht möglich. Neue Workerknoten, die mit diesem Paradigma für elastische Skalierung beginnen, können keine neuen Aufgabengruppen empfangen und bleiben im Leerlauf. Anweisungen zum Deaktivieren der automatischen Skalierung finden Sie unter Aktivieren der automatischen Skalierung.
Aktivieren der Optimierung für das Training mit Datasets mit Sparsefeatures
Im xgboost.spark
Modul definierte PySpark-Schätzer unterstützen die Optimierung für das Training mit Datasets mit Sparsefeatures.
Um die Optimierung von Datasets mit Sparsefeatures zu ermöglichen, müssen Sie ein Dataset für die Methode fit
bereitstellen, das eine Featurespalte enthält, die aus Werten des Typs pyspark.ml.linalg.SparseVector
besteht, und den Schätzerparameter enable_sparse_data_optim
auf True
festlegt. Darüber hinaus müssen Sie den Parameter missing
auf 0.0
festlegen.
Zum Beispiel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU-Training
PySpark-Schätzer, die im Modul xgboost.spark
definiert sind, unterstützen das Training auf GPUs. Legen Sie den Parameter use_gpu
auf True
fest, um GPU-Training zu aktivieren.
Hinweis
Für jede Spark-Aufgabe, die im verteilten XGBoost-Training verwendet wird, wird nur eine GPU im Training genutzt, wenn das Argument use_gpu
auf True
festgelegt ist. Databricks empfiehlt für die Spark-Clusterkonfiguration 1
den Standardwert spark.task.resource.gpu.amount
. Andernfalls befinden sich die zusätzlichen GPUs, die dieser Spark-Aufgabe zugeteilt sind, im Leerlauf.
Zum Beispiel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Problembehandlung
Wenn während des Trainings mit mehreren Knoten die Meldung NCCL failure: remote process exited or there was a network error
angezeigt wird, weist sie in der Regel auf ein Problem mit der Netzwerkkommunikation zwischen GPUs hin. Dieses Problem tritt auf, wenn NCCL (NVIDIA Collective Communications Library) bestimmte Netzwerkschnittstellen für die GPU-Kommunikation nicht verwenden kann.
Stellen Sie zum Beheben des Problems den SparkConf-Wert des Clusters für spark.executorEnv.NCCL_SOCKET_IFNAME
auf eth
ein. Damit wird im Grunde die Umgebungsvariable NCCL_SOCKET_IFNAME
für alle Worker in einem Knoten auf eth
eingestellt.
Notebook mit Beispielen
Dieses Notebook zeigt die Verwendung des Python-Pakets xgboost.spark
mit Spark MLlib.
PySpark-XGBoost Notebook
Migrationsleitfaden für das veraltete Modul sparkdl.xgboost
- Ersetzen Sie
from sparkdl.xgboost import XgboostRegressor
durchfrom xgboost.spark import SparkXGBRegressor
und anschließendfrom sparkdl.xgboost import XgboostClassifier
durchfrom xgboost.spark import SparkXGBClassifier
. - Ändern Sie alle Parameternamen im Schätzerkonstruktor vom Stil camelCase in den Stil snake_case. Ändern Sie beispielsweise
XgboostRegressor(featuresCol=XXX)
inSparkXGBRegressor(features_col=XXX)
. - Die Parameter
use_external_storage
undexternal_storage_precision
wurden entfernt.xgboost.spark
-Schätzer verwenden die Dateniterations-API von DMatrix, um Arbeitsspeicher effizienter zu nutzen. Der ineffiziente externe Speichermodus ist nicht mehr nötig. Bei extrem großen Datasets empfiehlt Databricks das Erhöhen des Parametersnum_workers
, wodurch jede Trainingsaufgabe die Daten in kleinere, überschaubarere Datenpartitionen unterteilt. Erwägen Sie die Festlegung vonnum_workers = sc.defaultParallelism
, womitnum_workers
auf die Gesamtanzahl der Spark-Aufgabenslots im Cluster festgelegt wird. - Für in
xgboost.spark
definierte Schätzer führt die Einstellungnum_workers=1
das Modelltraining mit einer einzelnen Spark-Aufgabe aus. Dabei wird die Anzahl der CPU-Kerne genutzt, die von der Spark-Clusterkonfigurationseinstellungspark.task.cpus
angegeben wird (standardmäßig 1). Um mehr CPU-Kerne zum Trainieren des Modells zu verwenden, erhöhen Sienum_workers
oderspark.task.cpus
. Sie können den Parameternthread
odern_jobs
nicht für Schätzer festlegen, die inxgboost.spark
definiert sind. Dieses Verhalten unterscheidet sich vom vorherigen Verhalten von Schätzern, die im veralteten Paketsparkdl.xgboost
definiert waren.
Konvertieren eines sparkdl.xgboost
-Modells in ein xgboost.spark
-Modell
sparkdl.xgboost
-Modelle werden in einem anderen Format als xgboost.spark
-Modelle gespeichert und weisen unterschiedliche Parametereinstellungen auf. Verwenden Sie die folgende Hilfsfunktion, um das Modell zu konvertieren:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Wenn Sie über ein pyspark.ml.PipelineModel
-Modell verfügen, das ein sparkdl.xgboost
-Modell als letzte Phase enthält, können Sie die Phase des sparkdl.xgboost
-Modells durch das konvertierte xgboost.spark
-Modell ersetzen.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)