Partager via


Entraînement distribué de modèles XGBoost à l’aide de xgboost.spark

Important

Cette fonctionnalité est disponible en préversion publique.

Le package Python xgboost>=1.7 contient un nouveau module xgboost.spark. Ce module comprend les estimateurs PySpark xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier et xgboost.spark.SparkXGBRanker. Ces nouvelles classes prennent en charge l’inclusion des estimateurs XGBoost dans les pipelines SparkML. Pour plus d’informations sur l’API, consultez la documentation de l’API XGBoost Python Spark.

Spécifications

Databricks Runtime 12.0 ML et versions ultérieures.

Paramètres xgboost.spark

Les estimateurs définis dans le module xgboost.spark prennent en charge la plupart des paramètres et arguments utilisés dans le XGBoost standard.

  • Les paramètres du constructeur de classe, de la méthode fit et de la méthode predict sont en grande partie identiques à ceux du module xgboost.sklearn.
  • Les noms, les valeurs et les paramètres par défaut sont pour la plupart identiques à ceux décrits dans Paramètres XGBoost.
  • Les exceptions concernent quelques paramètres non pris en charge (par exemple gpu_id, nthread, sample_weight, eval_set) ainsi que les paramètres spécifiques à l’estimateur pyspark, qui ont été ajoutés (par exemple featuresCol, labelCol, use_gpu, validationIndicatorCol). Pour plus d’informations, consultez la documentation de l’API XGBoost Python Spark.

Entraînement distribué

Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’entraînement XGBoost distribué à l’aide du paramètre num_workers. Pour utiliser l’entraînement distribué, créez un classifieur ou un régresseur, puis affectez à num_workers le nombre de tâches Spark s’exécutant simultanément durant l’entraînement distribué. Pour utiliser tous les emplacements de tâches Spark, définissez num_workers=sc.defaultParallelism.

Par exemple :

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Notes

  • Vous ne pouvez pas utiliser mlflow.xgboost.autolog avec XGBoost distribué. Pour journaliser un modèle Spark xgboost à l’aide de MLflow, utilisez mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Vous ne pouvez pas utiliser de XGBoost distribué sur un cluster pour lequel la mise à l’échelle automatique est activée. Les nouveaux nœuds Worker qui démarrent dans ce paradigme de mise à l’échelle élastique ne peuvent pas recevoir de nouveaux ensembles de tâches, et restent inactifs. Pour obtenir des instructions sur la désactivation de la mise à l’échelle automatique, consultez Activer la mise à l’échelle automatique.

Activer l’optimisation de l’entraînement sur un jeu de données aux caractéristiques éparses

Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’optimisation de l’entraînement sur les jeux de données ayant des caractéristiques éparses. Pour permettre l’optimisation des jeux de données aux caractéristiques éparses, vous devez fournir un jeu de données à la méthode fit, qui contient une colonne de caractéristiques composée des valeurs de type pyspark.ml.linalg.SparseVector, et affecter au paramètre enable_sparse_data_optim de l’estimateur la valeur True. De plus, vous devez affecter au paramètre missing la valeur 0.0.

Par exemple :

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Formation du GPU

Les estimateurs PySpark définis dans le module xgboost.spark prennent en charge l’entraînement sur les GPU. Affectez au paramètre use_gpu la valeur True pour activer l’entraînement basé sur les GPU.

Notes

Pour chaque tâche Spark utilisée dans l’entraînement distribué XGBoost, un seul GPU est utilisé dans l’entraînement quand l’argument use_gpu a la valeur True. Databricks recommande d’utiliser la valeur par défaut 1 pour la configuration de cluster Spark spark.task.resource.gpu.amount. Sinon, les GPU supplémentaires alloués à cette tâche Spark sont inactifs.

Par exemple :

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Dépannage

Lors d’un entraînement à plusieurs nœuds, si vous rencontrez un message NCCL failure: remote process exited or there was a network error, cela indique généralement un problème de communication réseau entre les GPU. Ce problème survient lorsque la bibliothèque NCCL (NVIDIA Collective Communications Library) ne peut pas utiliser certaines interfaces réseau pour la communication avec le GPU.

Pour résoudre ce problème, définissez le sparkConf du groupement pour spark.executorEnv.NCCL_SOCKET_IFNAME sur eth. Cela définit essentiellement la variable d’environnement NCCL_SOCKET_IFNAME sur eth pour tous les workers d’un nœud.

Exemple de bloc-notes

Ce notebook montre l’utilisation du package Python xgboost.spark avec Spark MLlib.

Notebook PySpark-XGBoost

Obtenir le notebook

Guide de migration du module déprécié sparkdl.xgboost

  • Remplacez from sparkdl.xgboost import XgboostRegressor par from xgboost.spark import SparkXGBRegressor, puis from sparkdl.xgboost import XgboostClassifier par from xgboost.spark import SparkXGBClassifier.
  • Changez tous les noms de paramètres dans le constructeur de l’estimateur en passant de la casse camelCase à la casse snake_case. Par exemple, remplacez XgboostRegressor(featuresCol=XXX) par SparkXGBRegressor(features_col=XXX).
  • Les paramètres use_external_storage et external_storage_precision ont été supprimés. Les estimateurs xgboost.spark utilisent l’API d’itération des données DMatrix pour utiliser la mémoire de manière plus efficace. Il n’est plus nécessaire d’utiliser le mode de stockage externe inefficace. Pour les jeux de données extrêmement volumineux, Databricks vous recommande d’augmenter le paramètre num_workers, ce qui oblige chaque tâche d’entraînement à partitionner les données en partitions de données plus petites, plus faciles à gérer. Envisagez de définir num_workers = sc.defaultParallelism, qui définit num_workers au nombre total d’emplacements de tâches Spark dans le cluster.
  • Pour les estimateurs définis dans xgboost.spark, le paramètre num_workers=1 permet d’exécuter l’entraînement du modèle à l’aide d’une seule tâche Spark. Cela implique l’utilisation du nombre de cœurs de processeur spécifié par le paramètre de configuration de cluster Spark spark.task.cpus, dont la valeur est 1 par défaut. Pour utiliser davantage de cœurs d’unité centrale pour entraîner le modèle, augmentez num_workers ou spark.task.cpus. Vous ne pouvez pas définir le paramètre nthread ou n_jobs pour les estimateurs définis dans xgboost.spark. Ce comportement est différent du comportement précédent des estimateurs définis dans le package sparkdl.xgboost déprécié.

Convertir un modèle sparkdl.xgboost en modèle xgboost.spark

Les modèles sparkdl.xgboost sont enregistrés dans un format différent de celui des modèles xgboost.spark et ont des paramètres différents. Utilisez la fonction utilitaire suivante pour convertir le modèle :

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Si vous avez un modèle pyspark.ml.PipelineModel contenant un modèle sparkdl.xgboost comme dernière étape, vous pouvez remplacer l’étape du modèle sparkdl.xgboost par le modèle converti xgboost.spark.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)