Entraînement distribué de modèles XGBoost à l’aide de xgboost.spark
Important
Cette fonctionnalité est disponible en préversion publique.
Le package Python xgboost>=1.7 contient un nouveau module xgboost.spark
. Ce module comprend les estimateurs PySpark xgboost xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
et xgboost.spark.SparkXGBRanker
. Ces nouvelles classes prennent en charge l’inclusion des estimateurs XGBoost dans les pipelines SparkML. Pour plus d’informations sur l’API, consultez la documentation de l’API XGBoost Python Spark.
Spécifications
Databricks Runtime 12.0 ML et versions ultérieures.
Paramètres xgboost.spark
Les estimateurs définis dans le module xgboost.spark
prennent en charge la plupart des paramètres et arguments utilisés dans le XGBoost standard.
- Les paramètres du constructeur de classe, de la méthode
fit
et de la méthodepredict
sont en grande partie identiques à ceux du modulexgboost.sklearn
. - Les noms, les valeurs et les paramètres par défaut sont pour la plupart identiques à ceux décrits dans Paramètres XGBoost.
- Les exceptions concernent quelques paramètres non pris en charge (par exemple
gpu_id
,nthread
,sample_weight
,eval_set
) ainsi que les paramètres spécifiques à l’estimateurpyspark
, qui ont été ajoutés (par exemplefeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Pour plus d’informations, consultez la documentation de l’API XGBoost Python Spark.
Entraînement distribué
Les estimateurs PySpark définis dans le module xgboost.spark
prennent en charge l’entraînement XGBoost distribué à l’aide du paramètre num_workers
. Pour utiliser l’entraînement distribué, créez un classifieur ou un régresseur, puis affectez à num_workers
le nombre de tâches Spark s’exécutant simultanément durant l’entraînement distribué. Pour utiliser tous les emplacements de tâches Spark, définissez num_workers=sc.defaultParallelism
.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Notes
- Vous ne pouvez pas utiliser
mlflow.xgboost.autolog
avec XGBoost distribué. Pour journaliser un modèle Spark xgboost à l’aide de MLflow, utilisezmlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Vous ne pouvez pas utiliser de XGBoost distribué sur un cluster pour lequel la mise à l’échelle automatique est activée. Les nouveaux nœuds Worker qui démarrent dans ce paradigme de mise à l’échelle élastique ne peuvent pas recevoir de nouveaux ensembles de tâches, et restent inactifs. Pour obtenir des instructions sur la désactivation de la mise à l’échelle automatique, consultez Activer la mise à l’échelle automatique.
Activer l’optimisation de l’entraînement sur un jeu de données aux caractéristiques éparses
Les estimateurs PySpark définis dans le module xgboost.spark
prennent en charge l’optimisation de l’entraînement sur les jeux de données ayant des caractéristiques éparses.
Pour permettre l’optimisation des jeux de données aux caractéristiques éparses, vous devez fournir un jeu de données à la méthode fit
, qui contient une colonne de caractéristiques composée des valeurs de type pyspark.ml.linalg.SparseVector
, et affecter au paramètre enable_sparse_data_optim
de l’estimateur la valeur True
. De plus, vous devez affecter au paramètre missing
la valeur 0.0
.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Formation du GPU
Les estimateurs PySpark définis dans le module xgboost.spark
prennent en charge l’entraînement sur les GPU. Affectez au paramètre use_gpu
la valeur True
pour activer l’entraînement basé sur les GPU.
Notes
Pour chaque tâche Spark utilisée dans l’entraînement distribué XGBoost, un seul GPU est utilisé dans l’entraînement quand l’argument use_gpu
a la valeur True
. Databricks recommande d’utiliser la valeur par défaut 1
pour la configuration de cluster Spark spark.task.resource.gpu.amount
. Sinon, les GPU supplémentaires alloués à cette tâche Spark sont inactifs.
Par exemple :
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Dépannage
Lors d’un entraînement à plusieurs nœuds, si vous rencontrez un message NCCL failure: remote process exited or there was a network error
, cela indique généralement un problème de communication réseau entre les GPU. Ce problème survient lorsque la bibliothèque NCCL (NVIDIA Collective Communications Library) ne peut pas utiliser certaines interfaces réseau pour la communication avec le GPU.
Pour résoudre ce problème, définissez le sparkConf du groupement pour spark.executorEnv.NCCL_SOCKET_IFNAME
sur eth
. Cela définit essentiellement la variable d’environnement NCCL_SOCKET_IFNAME
sur eth
pour tous les workers d’un nœud.
Exemple de bloc-notes
Ce notebook montre l’utilisation du package Python xgboost.spark
avec Spark MLlib.
Notebook PySpark-XGBoost
Guide de migration du module déprécié sparkdl.xgboost
- Remplacez
from sparkdl.xgboost import XgboostRegressor
parfrom xgboost.spark import SparkXGBRegressor
, puisfrom sparkdl.xgboost import XgboostClassifier
parfrom xgboost.spark import SparkXGBClassifier
. - Changez tous les noms de paramètres dans le constructeur de l’estimateur en passant de la casse camelCase à la casse snake_case. Par exemple, remplacez
XgboostRegressor(featuresCol=XXX)
parSparkXGBRegressor(features_col=XXX)
. - Les paramètres
use_external_storage
etexternal_storage_precision
ont été supprimés. Les estimateursxgboost.spark
utilisent l’API d’itération des données DMatrix pour utiliser la mémoire de manière plus efficace. Il n’est plus nécessaire d’utiliser le mode de stockage externe inefficace. Pour les jeux de données extrêmement volumineux, Databricks vous recommande d’augmenter le paramètrenum_workers
, ce qui oblige chaque tâche d’entraînement à partitionner les données en partitions de données plus petites, plus faciles à gérer. Envisagez de définirnum_workers = sc.defaultParallelism
, qui définitnum_workers
au nombre total d’emplacements de tâches Spark dans le cluster. - Pour les estimateurs définis dans
xgboost.spark
, le paramètrenum_workers=1
permet d’exécuter l’entraînement du modèle à l’aide d’une seule tâche Spark. Cela implique l’utilisation du nombre de cœurs de processeur spécifié par le paramètre de configuration de cluster Sparkspark.task.cpus
, dont la valeur est 1 par défaut. Pour utiliser davantage de cœurs d’unité centrale pour entraîner le modèle, augmenteznum_workers
ouspark.task.cpus
. Vous ne pouvez pas définir le paramètrenthread
oun_jobs
pour les estimateurs définis dansxgboost.spark
. Ce comportement est différent du comportement précédent des estimateurs définis dans le packagesparkdl.xgboost
déprécié.
Convertir un modèle sparkdl.xgboost
en modèle xgboost.spark
Les modèles sparkdl.xgboost
sont enregistrés dans un format différent de celui des modèles xgboost.spark
et ont des paramètres différents. Utilisez la fonction utilitaire suivante pour convertir le modèle :
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Si vous avez un modèle pyspark.ml.PipelineModel
contenant un modèle sparkdl.xgboost
comme dernière étape, vous pouvez remplacer l’étape du modèle sparkdl.xgboost
par le modèle converti xgboost.spark
.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)