Entrenamiento distribuido de modelos XGBoost mediante xgboost.spark
Importante
Esta característica está en versión preliminar pública.
El paquete de Python xgboost>=1.7 contiene un nuevo módulo, xgboost.spark
. Este módulo incluye los estimadores de PySpark para xgboost xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
y xgboost.spark.SparkXGBRanker
. Estas nuevas clases admiten la inclusión de estimadores XGBoost en canalizaciones de SparkML. Para más información sobre la API, consulte la documentación de la API de Spark de Python de XGBoost.
Requisitos
Databricks Runtime 12.0 ML y versiones posteriores.
Parámetros xgboost.spark
Los estimadores definidos en el módulo xgboost.spark
admiten la mayoría de los mismos parámetros y argumentos usados en el XGBoost estándar.
- Los parámetros del constructor de clase y los métodos
fit
ypredict
son prácticamente idénticos a los del móduloxgboost.sklearn
. - La nomenclatura, los valores y los valores predeterminados son casi idénticos a los descritos en los parámetros XGBoost.
- Las excepciones son algunos parámetros no admitidos (como
gpu_id
,nthread
,sample_weight
yeval_set
) y los parámetros específicos del estimadorpyspark
que se han agregado (comofeaturesCol
,labelCol
,use_gpu
yvalidationIndicatorCol
). Para más información, consulte la documentación de la API de Spark para Python de XGBoost.
Entrenamiento distribuido
Los estimadores de PySpark definidos en el módulo xgboost.spark
admiten el entrenamiento XGBoost distribuido mediante el parámetro num_workers
. Para usar el entrenamiento distribuido, cree un clasificador o un regresor y establezca num_workers
en el número de tareas de Spark en ejecución simultáneas durante el entrenamiento distribuido. Para usar todas las ranuras de tareas de Spark, establezca num_workers=sc.defaultParallelism
.
Por ejemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Nota:
- No se puede usar
mlflow.xgboost.autolog
con XGBoost distribuido. Para registrar un modelo de Spark para xgboost mediante MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - No puede usar XGBoost distribuido en un clúster con el escalado automático habilitado. Los nuevos nodos de trabajo que comienzan en este paradigma de escalado elástico no pueden recibir nuevos conjuntos de tareas y permanecer inactivos. Consulte Habilitar autoescalado para obtener instrucciones para deshabilitar el autoescalado.
Habilitación de la optimización para el entrenamiento en el conjunto de datos de características dispersas
Estimadores de PySpark definidos en la optimización de compatibilidad del módulo xgboost.spark
para el entrenamiento en conjuntos de datos con características dispersas.
Para habilitar la optimización de conjuntos de características dispersos, debe proporcionar un conjunto de datos al método fit
que contiene una columna de características que consta de valores de tipo pyspark.ml.linalg.SparseVector
y establecer el parámetro estimador enable_sparse_data_optim
en True
. Además, debe establecer el parámetro missing
en 0.0
.
Por ejemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Entrenamiento de GPU
Los estimadores de PySpark definidos en el módulo xgboost.spark
admiten el entrenamiento en GPU. Establezca el parámetro use_gpu
en True
para habilitar el entrenamiento de GPU.
Nota:
Para cada tarea de Spark usada en el entrenamiento distribuido XGBoost, solo se usa una GPU en el entrenamiento cuando el argumento use_gpu
se establece en True
. Databricks recomienda usar el valor predeterminado de 1
para la configuración del clúster de Spark spark.task.resource.gpu.amount
. De lo contrario, las GPU adicionales asignadas a esta tarea de Spark están inactivas.
Por ejemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solución de problemas
Durante el entrenamiento de varios nodos, si aparece un mensaje NCCL failure: remote process exited or there was a network error
, normalmente indica un problema con la comunicación de red entre GPU. Este problema se produce cuando NCCL (biblioteca de comunicaciones colectiva de NVIDIA) no puede usar determinadas interfaces de red para la comunicación con GPU.
Para resolverlo, establezca sparkConf del clúster para spark.executorEnv.NCCL_SOCKET_IFNAME
a eth
. De esta manera, se establece básicamente la variable NCCL_SOCKET_IFNAME
a eth
de entorno para todos los trabajos de un nodo.
Cuaderno de ejemplo
En este cuaderno se muestra el uso del paquete de Python xgboost.spark
con Spark MLlib.
Cuaderno de PySpark-XGBoost
Guía de migración del módulo en desuso sparkdl.xgboost
- Reemplace
from sparkdl.xgboost import XgboostRegressor
porfrom xgboost.spark import SparkXGBRegressor
y reemplacefrom sparkdl.xgboost import XgboostClassifier
porfrom xgboost.spark import SparkXGBClassifier
. - Cambie todos los nombres de parámetro del constructor estimador del estilo camelCase a estilo snake_case. Por ejemplo, cambie
XgboostRegressor(featuresCol=XXX)
aSparkXGBRegressor(features_col=XXX)
. - Los parámetros
use_external_storage
yexternal_storage_precision
se han eliminado. Los estimadoresxgboost.spark
usan la API de iteración de datos DMatrix para usar la memoria de forma más eficaz. Ya no es necesario usar el modo de almacenamiento externo ineficaz. En el caso de conjuntos de datos extremadamente grandes, Databricks recomienda aumentar el parámetronum_workers
, lo que hace que cada tarea de entrenamiento particione los datos en particiones de datos más pequeñas y fáciles de administrar. Considere la posibilidad de establecernum_workers = sc.defaultParallelism
, que establecenum_workers
en el número total de ranuras de tareas de Spark en el clúster. - En el caso de los estimadores definidos en
xgboost.spark
, la configuraciónnum_workers=1
ejecuta el entrenamiento del modelo mediante una sola tarea de Spark. Esto utiliza el número de núcleos de CPU especificados por el valorspark.task.cpus
de configuración del clúster de Spark, que es 1 de forma predeterminada. Para usar más núcleos de CPU para entrenar el modelo, aumentenum_workers
ospark.task.cpus
. No se puede establecer el parámetronthread
on_jobs
para los estimadores definidos enxgboost.spark
. Este comportamiento es diferente del comportamiento anterior de los estimadores definidos en el paquete en desusosparkdl.xgboost
.
Conversión del modelo sparkdl.xgboost
en modelo xgboost.spark
Los modelos sparkdl.xgboost
se guardan en un formato diferente al de los modelos xgboost.spark
y tienen diferentes configuraciones de parámetros. Use la siguiente función de utilidad para convertir el modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Si tiene un modelo pyspark.ml.PipelineModel
que contiene un modelo sparkdl.xgboost
como última etapa, puede reemplazar la etapa del modelo sparkdl.xgboost
por el modelo xgboost.spark
convertido.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)