Freigeben über


Verteiltes Training von XGBoost-Modellen mit xgboost.spark

Wichtig

Dieses Feature befindet sich in der Public Preview.

Das Python-Paket „xgboost>=1.7“ enthält das neue Modul xgboost.spark. Dieses Modul enthält die PySpark-Schätzer xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier und xgboost.spark.SparkXGBRanker für XGBoost. Diese neuen Klassen unterstützen die Einbindung von XGBoost-Schätzern in SparkML-Pipelines. Details zur API finden Sie in der Dokumentation zur Python-Spark-API für XGBoost.

Anforderungen

Databricks Runtime 12.0 ML und höher

xgboost.spark-Parameter

Die im Modul xgboost.spark definierten Schätzer unterstützen größtenteils dieselben Parameter und Argumente, die auch in standardmäßigem XGBoost verwendet werden.

  • Die Parameter für den Klassenkonstruktor sowie für die Methoden fit und predict sind weitgehend identisch mit denen im Modul xgboost.sklearn.
  • Benennung, Werte und Standardeinstellungen sind größtenteils identisch mit denen, die unter XGBoost-Parametern beschrieben sind.
  • Ausnahmen sind einige nicht unterstützte Parameter (z. B. gpu_id, nthread, sample_weight, eval_set) und die spezifischen Parameter, die für den Schätzer pyspark hinzugefügt wurden (z. B. featuresCol, labelCol, use_gpu, validationIndicatorCol). Weitere Informationen finden Sie in der Dokumentation zur Python-Spark-API für XGBoost.

Verteiltes Training

Im Modul xgboost.spark definierte PySpark-Schätzer unterstützen verteiltes XGBoost-Training mithilfe des Parameters num_workers. Erstellen Sie für verteiltes Training einen Klassifizierer oder Regressor, und legen Sie num_workers während des verteilten Trainings auf die Anzahl gleichzeitig ausgeführter Spark-Aufgaben fest. Um alle Spark-Aufgabenslots zu verwenden, legen Sie num_workers=sc.defaultParallelism fest.

Zum Beispiel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Hinweis

  • Sie können mlflow.xgboost.autolog nicht mit verteiltem XGBoost verwenden. Um ein Spark-Modell für XGBoost mit MLflow zu protokollieren, verwenden Sie mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Verteiltes XGBoost ist in einem Cluster mit aktivierter automatischer Skalierung nicht möglich. Neue Workerknoten, die mit diesem Paradigma für elastische Skalierung beginnen, können keine neuen Aufgabengruppen empfangen und bleiben im Leerlauf. Anweisungen zum Deaktivieren der automatischen Skalierung finden Sie unter Aktivieren der automatischen Skalierung.

Aktivieren der Optimierung für das Training mit Datasets mit Sparsefeatures

Im xgboost.spark Modul definierte PySpark-Schätzer unterstützen die Optimierung für das Training mit Datasets mit Sparsefeatures. Um die Optimierung von Datasets mit Sparsefeatures zu ermöglichen, müssen Sie ein Dataset für die Methode fit bereitstellen, das eine Featurespalte enthält, die aus Werten des Typs pyspark.ml.linalg.SparseVector besteht, und den Schätzerparameter enable_sparse_data_optim auf True festlegt. Darüber hinaus müssen Sie den Parameter missing auf 0.0 festlegen.

Zum Beispiel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU-Training

PySpark-Schätzer, die im Modul xgboost.spark definiert sind, unterstützen das Training auf GPUs. Legen Sie den Parameter use_gpu auf True fest, um GPU-Training zu aktivieren.

Hinweis

Für jede Spark-Aufgabe, die im verteilten XGBoost-Training verwendet wird, wird nur eine GPU im Training genutzt, wenn das Argument use_gpu auf True festgelegt ist. Databricks empfiehlt für die Spark-Clusterkonfiguration 1 den Standardwert spark.task.resource.gpu.amount. Andernfalls befinden sich die zusätzlichen GPUs, die dieser Spark-Aufgabe zugeteilt sind, im Leerlauf.

Zum Beispiel:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Problembehandlung

Wenn während des Trainings mit mehreren Knoten die Meldung NCCL failure: remote process exited or there was a network error angezeigt wird, weist sie in der Regel auf ein Problem mit der Netzwerkkommunikation zwischen GPUs hin. Dieses Problem tritt auf, wenn NCCL (NVIDIA Collective Communications Library) bestimmte Netzwerkschnittstellen für die GPU-Kommunikation nicht verwenden kann.

Stellen Sie zum Beheben des Problems den SparkConf-Wert des Clusters für spark.executorEnv.NCCL_SOCKET_IFNAME auf eth ein. Damit wird im Grunde die Umgebungsvariable NCCL_SOCKET_IFNAME für alle Worker in einem Knoten auf eth eingestellt.

Notebook mit Beispielen

Dieses Notebook zeigt die Verwendung des Python-Pakets xgboost.spark mit Spark MLlib.

PySpark-XGBoost Notebook

Notebook abrufen

Migrationsleitfaden für das veraltete Modul sparkdl.xgboost

  • Ersetzen Sie from sparkdl.xgboost import XgboostRegressor durch from xgboost.spark import SparkXGBRegressor und anschließend from sparkdl.xgboost import XgboostClassifier durch from xgboost.spark import SparkXGBClassifier.
  • Ändern Sie alle Parameternamen im Schätzerkonstruktor vom Stil camelCase in den Stil snake_case. Ändern Sie beispielsweise XgboostRegressor(featuresCol=XXX) in SparkXGBRegressor(features_col=XXX).
  • Die Parameter use_external_storage und external_storage_precision wurden entfernt. xgboost.spark-Schätzer verwenden die Dateniterations-API von DMatrix, um Arbeitsspeicher effizienter zu nutzen. Der ineffiziente externe Speichermodus ist nicht mehr nötig. Bei extrem großen Datasets empfiehlt Databricks das Erhöhen des Parameters num_workers, wodurch jede Trainingsaufgabe die Daten in kleinere, überschaubarere Datenpartitionen unterteilt. Erwägen Sie die Festlegung von num_workers = sc.defaultParallelism, womit num_workers auf die Gesamtanzahl der Spark-Aufgabenslots im Cluster festgelegt wird.
  • Für in xgboost.spark definierte Schätzer führt die Einstellung num_workers=1 das Modelltraining mit einer einzelnen Spark-Aufgabe aus. Dabei wird die Anzahl der CPU-Kerne genutzt, die von der Spark-Clusterkonfigurationseinstellung spark.task.cpusangegeben wird (standardmäßig 1). Um mehr CPU-Kerne zum Trainieren des Modells zu verwenden, erhöhen Sie num_workers oder spark.task.cpus. Sie können den Parameter nthread oder n_jobs nicht für Schätzer festlegen, die in xgboost.spark definiert sind. Dieses Verhalten unterscheidet sich vom vorherigen Verhalten von Schätzern, die im veralteten Paket sparkdl.xgboost definiert waren.

Konvertieren eines sparkdl.xgboost-Modells in ein xgboost.spark-Modell

sparkdl.xgboost-Modelle werden in einem anderen Format als xgboost.spark-Modelle gespeichert und weisen unterschiedliche Parametereinstellungen auf. Verwenden Sie die folgende Hilfsfunktion, um das Modell zu konvertieren:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Wenn Sie über ein pyspark.ml.PipelineModel-Modell verfügen, das ein sparkdl.xgboost-Modell als letzte Phase enthält, können Sie die Phase des sparkdl.xgboost-Modells durch das konvertierte xgboost.spark-Modell ersetzen.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)