Compartilhar via


Treinamento distribuído de modelos XGBoost usando o xgboost.spark

Importante

Esse recurso está em uma versão prévia.

O pacote xgboost>=1.7 do Python contém um novo módulo xgboost.spark. Este módulo inclui os avaliadores xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker do PySpark. Essas novas classes dão suporte à inclusão de avaliadores XGBoost em Pipelines do SparkML. Para saber detalhes da API, confira a documentação da API XGBoost python spark.

Requisitos

Databricks Runtime 12.0 ML e superior.

Parâmetros xgboost.spark

Os avaliadores definidos no módulo xgboost.spark dão suporte à maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.

  • Os parâmetros para o construtor de classe, o método fit e o método predict são praticamente idênticos aos do módulo xgboost.sklearn.
  • Nomenclatura, valores e padrões são praticamente idênticos aos descritos nos parâmetros do XGBoost.
  • Exceções são alguns parâmetros sem suporte (como gpu_id, nthread, sample_weight, eval_set) e os parâmetros específicos do avaliador pyspark que foram adicionados (como featuresCol, labelCol, use_gpu, validationIndicatorCol). Para obter detalhes, confira a documentação da API XGBoost Python Spark.

Treinamento distribuído

Os avaliadores do PySpark definidos no módulo xgboost.spark dão suporte ao treinamento XGBoost distribuído usando o parâmetro num_workers. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers como o número de tarefas do Spark em execução simultâneas durante o treinamento distribuído. Para usar todos os slots de tarefa do Spark, defina num_workers=sc.defaultParallelism.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Observação

  • Você não pode usar mlflow.xgboost.autolog com XGBoost distribuído. Para registrar um modelo xgboost do Spark usando o MLflow, use mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Você não pode usar o XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecer ociosos. Para obter instruções de como desabilitar o dimensionamento automático, consulte Habilitar dimensionamento automático.

Habilitar a otimização para treinamento no conjunto de dados de recursos esparsos

Os Avaliadores do PySpark definidos no módulo xgboost.spark dão suporte à otimização para treinamento em conjuntos de dados com recursos esparsos. Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o método fit que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro do avaliador enable_sparse_data_optim como True. Além disso, você precisa definir o parâmetro missing como 0.0.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Treinamento de GPU

Os avaliadores do PySpark definidos no módulo xgboost.spark dão suporte ao treinamento em GPUs. Defina o parâmetro use_gpu como True para habilitar o treinamento de GPU.

Observação

Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu é definido como True. O Databricks recomenda usar o valor padrão de 1 para a configuração spark.task.resource.gpu.amount do cluster do Spark. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Solução de problemas

Durante o treinamento de vários nós, se você encontrar uma mensagem de NCCL failure: remote process exited or there was a network error, geralmente isso indica um problema com a comunicação de rede entre GPUs. Esse problema ocorre quando a NCCL (Biblioteca de Comunicações Coletivas NVIDIA) não pode usar determinados adaptadores de rede para comunicação de GPU.

Para resolver isso, configure o sparkConf do cluster de spark.executorEnv.NCCL_SOCKET_IFNAME para eth. Isso basicamente ajusta a variável de ambiente NCCL_SOCKET_IFNAME para eth em todos os trabalhos em um node.

Caderno de exemplo

Este notebook mostra o uso do pacote xgboost.spark do Python com o Spark MLlib.

Notebook PySpark-XGBoost

Obter notebook

Guia de migração para o módulo preterido sparkdl.xgboost

  • Substitua from sparkdl.xgboost import XgboostRegressor por from xgboost.spark import SparkXGBRegressor e substitua from sparkdl.xgboost import XgboostClassifier por from xgboost.spark import SparkXGBClassifier.
  • Altere todos os nomes de parâmetro no construtor do avaliador do estilo camelCase para o estilo snake_case. Por exemplo, altere XgboostRegressor(featuresCol=XXX) para SparkXGBRegressor(features_col=XXX).
  • Os parâmetros use_external_storage e external_storage_precision foram removidos. Os avaliadores xgboost.spark usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetro num_workers, fazendo com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuração num_workers = sc.defaultParallelism, que define num_workers como o número total de slots de tarefa do Spark no cluster.
  • Para avaliadores definidos no xgboost.spark, a configuração num_workers=1 executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificados pela definição spark.task.cpus da configuração do cluster do Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumente num_workers ou spark.task.cpus. Não é possível definir o parâmetro nthread ou n_jobs para avaliadores definidos no xgboost.spark. Esse comportamento é diferente do comportamento anterior dos avaliadores definidos no pacote preterido sparkdl.xgboost.

Converter o modelo sparkdl.xgboost em modelo xgboost.spark

Os modelos sparkdl.xgboost são salvos em um formato diferente dos modelos xgboost.spark e têm configurações de parâmetro diferentes. Use a seguinte função de utilitário para converter o modelo:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Se você tiver um modelo pyspark.ml.PipelineModel que contém um modelo sparkdl.xgboost como o último estágio, poderá substituir o estágio do modelo sparkdl.xgboost pelo modelo convertido xgboost.spark.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)