Compartir a través de


Entrenamiento distribuido de modelos XGBoost mediante xgboost.spark

Importante

Esta característica está en versión preliminar pública.

El paquete de Python xgboost>=1.7 contiene un nuevo módulo, xgboost.spark. Este módulo incluye los estimadores de PySpark para xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier y xgboost.spark.SparkXGBRanker. Estas nuevas clases admiten la inclusión de estimadores XGBoost en canalizaciones de SparkML. Para más información sobre la API, consulte la documentación de la API de Spark de Python de XGBoost.

Requisitos

Databricks Runtime 12.0 ML y versiones posteriores.

Parámetros xgboost.spark

Los estimadores definidos en el módulo xgboost.spark admiten la mayoría de los mismos parámetros y argumentos usados en el XGBoost estándar.

  • Los parámetros del constructor de clase y los métodos fit y predict son prácticamente idénticos a los del módulo xgboost.sklearn.
  • La nomenclatura, los valores y los valores predeterminados son casi idénticos a los descritos en los parámetros XGBoost.
  • Las excepciones son algunos parámetros no admitidos (como gpu_id, nthread, sample_weight y eval_set) y los parámetros específicos del estimador pyspark que se han agregado (como featuresCol, labelCol, use_gpu y validationIndicatorCol). Para más información, consulte la documentación de la API de Spark para Python de XGBoost.

Entrenamiento distribuido

Los estimadores de PySpark definidos en el módulo xgboost.spark admiten el entrenamiento XGBoost distribuido mediante el parámetro num_workers. Para usar el entrenamiento distribuido, cree un clasificador o un regresor y establezca num_workers en el número de tareas de Spark en ejecución simultáneas durante el entrenamiento distribuido. Para usar todas las ranuras de tareas de Spark, establezca num_workers=sc.defaultParallelism.

Por ejemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Nota:

  • No se puede usar mlflow.xgboost.autolog con XGBoost distribuido. Para registrar un modelo de Spark para xgboost mediante MLflow, use mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • No puede usar XGBoost distribuido en un clúster con el escalado automático habilitado. Los nuevos nodos de trabajo que comienzan en este paradigma de escalado elástico no pueden recibir nuevos conjuntos de tareas y permanecer inactivos. Consulte Habilitar autoescalado para obtener instrucciones para deshabilitar el autoescalado.

Habilitación de la optimización para el entrenamiento en el conjunto de datos de características dispersas

Estimadores de PySpark definidos en la optimización de compatibilidad del módulo xgboost.spark para el entrenamiento en conjuntos de datos con características dispersas. Para habilitar la optimización de conjuntos de características dispersos, debe proporcionar un conjunto de datos al método fit que contiene una columna de características que consta de valores de tipo pyspark.ml.linalg.SparseVector y establecer el parámetro estimador enable_sparse_data_optim en True. Además, debe establecer el parámetro missing en 0.0.

Por ejemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Entrenamiento de GPU

Los estimadores de PySpark definidos en el módulo xgboost.spark admiten el entrenamiento en GPU. Establezca el parámetro use_gpu en True para habilitar el entrenamiento de GPU.

Nota:

Para cada tarea de Spark usada en el entrenamiento distribuido XGBoost, solo se usa una GPU en el entrenamiento cuando el argumento use_gpu se establece en True. Databricks recomienda usar el valor predeterminado de 1 para la configuración del clúster de Spark spark.task.resource.gpu.amount. De lo contrario, las GPU adicionales asignadas a esta tarea de Spark están inactivas.

Por ejemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Solución de problemas

Durante el entrenamiento de varios nodos, si aparece un mensaje NCCL failure: remote process exited or there was a network error, normalmente indica un problema con la comunicación de red entre GPU. Este problema se produce cuando NCCL (biblioteca de comunicaciones colectiva de NVIDIA) no puede usar determinadas interfaces de red para la comunicación con GPU.

Para resolverlo, establezca sparkConf del clúster para spark.executorEnv.NCCL_SOCKET_IFNAME a eth. De esta manera, se establece básicamente la variable NCCL_SOCKET_IFNAME a eth de entorno para todos los trabajos de un nodo.

Cuaderno de ejemplo

En este cuaderno se muestra el uso del paquete de Python xgboost.spark con Spark MLlib.

Cuaderno de PySpark-XGBoost

Obtener el cuaderno

Guía de migración del módulo en desuso sparkdl.xgboost

  • Reemplace from sparkdl.xgboost import XgboostRegressor por from xgboost.spark import SparkXGBRegressor y reemplace from sparkdl.xgboost import XgboostClassifier por from xgboost.spark import SparkXGBClassifier.
  • Cambie todos los nombres de parámetro del constructor estimador del estilo camelCase a estilo snake_case. Por ejemplo, cambie XgboostRegressor(featuresCol=XXX) a SparkXGBRegressor(features_col=XXX).
  • Los parámetros use_external_storage y external_storage_precision se han eliminado. Los estimadores xgboost.spark usan la API de iteración de datos DMatrix para usar la memoria de forma más eficaz. Ya no es necesario usar el modo de almacenamiento externo ineficaz. En el caso de conjuntos de datos extremadamente grandes, Databricks recomienda aumentar el parámetro num_workers, lo que hace que cada tarea de entrenamiento particione los datos en particiones de datos más pequeñas y fáciles de administrar. Considere la posibilidad de establecer num_workers = sc.defaultParallelism, que establece num_workers en el número total de ranuras de tareas de Spark en el clúster.
  • En el caso de los estimadores definidos en xgboost.spark, la configuración num_workers=1 ejecuta el entrenamiento del modelo mediante una sola tarea de Spark. Esto utiliza el número de núcleos de CPU especificados por el valor spark.task.cpus de configuración del clúster de Spark, que es 1 de forma predeterminada. Para usar más núcleos de CPU para entrenar el modelo, aumente num_workers o spark.task.cpus. No se puede establecer el parámetro nthread o n_jobs para los estimadores definidos en xgboost.spark. Este comportamiento es diferente del comportamiento anterior de los estimadores definidos en el paquete en desuso sparkdl.xgboost.

Conversión del modelo sparkdl.xgboost en modelo xgboost.spark

Los modelos sparkdl.xgboost se guardan en un formato diferente al de los modelos xgboost.spark y tienen diferentes configuraciones de parámetros. Use la siguiente función de utilidad para convertir el modelo:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Si tiene un modelo pyspark.ml.PipelineModel que contiene un modelo sparkdl.xgboost como última etapa, puede reemplazar la etapa del modelo sparkdl.xgboost por el modelo xgboost.spark convertido.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)