Partilhar via


Treinamento distribuído de modelos XGBoost usando xgboost.spark

Importante

Esta funcionalidade está em Pré-visualização Pública.

O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark. Este módulo inclui os estimadores xgboost.spark.SparkXGBRegressorxgboost PySpark, xgboost.spark.SparkXGBClassifiere xgboost.spark.SparkXGBRanker. Essas novas classes suportam a inclusão de estimadores XGBoost em pipelines SparkML. Para obter detalhes da API, consulte o documento da API XGBoost python spark.

Requisitos

Databricks Runtime 12.0 ML e superior.

xgboost.spark Parâmetros

Os estimadores definidos no módulo suportam a xgboost.spark maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.

  • Os parâmetros para o construtor de classe, fit método e predict método são em grande parte idênticos aos do xgboost.sklearn módulo.
  • Nomenclatura, valores e padrões são basicamente idênticos aos descritos em parâmetros XGBoost.
  • As exceções são alguns parâmetros sem suporte (como gpu_id, , sample_weightnthread, eval_set) e os parâmetros específicos do pyspark estimador que foram adicionados (como featuresCol, labelCol, use_gpu, validationIndicatorCol). Para obter detalhes, consulte a documentação da API XGBoost Python Spark.

Preparação distribuída

Os estimadores PySpark definidos no módulo suportam treinamento xgboost.spark XGBoost distribuído usando o num_workers parâmetro. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers o número de tarefas simultâneas do Spark em execução durante o treinamento distribuído. Para usar todos os slots de tarefas do Spark, defina num_workers=sc.defaultParallelism.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Nota

  • Você não pode usar mlflow.xgboost.autolog com XGBoost distribuído. Para registrar um modelo xgboost Spark usando MLflow, use mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Não é possível usar XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecem ociosos. Para obter instruções sobre como desativar o dimensionamento automático, consulte Habilitar o dimensionamento automático.

Habilite a otimização para treinamento em conjuntos de dados de recursos esparsos

Os estimadores PySpark definidos no xgboost.spark módulo suportam otimização para treinamento em conjuntos de dados com recursos esparsos. Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o fit método que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro enable_sparse_data_optim do estimador como True. Além disso, você precisa definir o missing parâmetro como 0.0.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Treinamento de GPU

Os estimadores PySpark definidos no módulo suportam o xgboost.spark treinamento em GPUs. Defina o parâmetro use_gpu para True habilitar o treinamento da GPU.

Nota

Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o use_gpu argumento é definido como True. O Databricks recomenda o uso do valor padrão de para a configuração spark.task.resource.gpu.amountdo cluster Spark1. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Resolução de Problemas

Durante o treinamento de vários nós, se você encontrar uma NCCL failure: remote process exited or there was a network error mensagem, ela geralmente indica um problema com a comunicação de rede entre GPUs. Esse problema surge quando NCCL (NVIDIA Collective Communications Library) não pode usar determinadas interfaces de rede para comunicação GPU.

Para resolver, defina sparkConf do cluster como spark.executorEnv.NCCL_SOCKET_IFNAME eth. Isso essencialmente define a variável NCCL_SOCKET_IFNAME de ambiente para eth todos os trabalhadores em um nó.

Bloco de notas de exemplo

Este bloco de anotações mostra o uso do pacote xgboost.spark Python com o Spark MLlib.

Notebook PySpark-XGBoost

Obter o bloco de notas

Guia de migração para o módulo preterido sparkdl.xgboost

  • Substitua from sparkdl.xgboost import XgboostRegressor por from xgboost.spark import SparkXGBRegressor e substitua from sparkdl.xgboost import XgboostClassifier por from xgboost.spark import SparkXGBClassifier.
  • Altere todos os nomes de parâmetros no construtor do estimador de camelCase style para snake_case style. Por exemplo, altere XgboostRegressor(featuresCol=XXX) para SparkXGBRegressor(features_col=XXX).
  • Os parâmetros use_external_storage e external_storage_precision foram removidos. xgboost.spark os estimadores usam a API de iteração de dados DMatrix para usar a memória de forma mais eficiente. Não há mais necessidade de usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetro, o num_workers que faz com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuração num_workers = sc.defaultParallelism, que define num_workers o número total de slots de tarefas do Spark no cluster.
  • Para estimadores definidos no xgboost.spark, a configuração num_workers=1 executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela definição spark.task.cpusde configuração do cluster Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumente num_workers ou spark.task.cpus. Não é possível definir o nthread parâmetro ou n_jobs para estimadores definidos em xgboost.spark. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote preterido sparkdl.xgboost .

Converter sparkdl.xgboost modelo em xgboost.spark modelo

sparkdl.xgboost Os modelos são salvos em um formato diferente dos xgboost.spark modelos e têm configurações de parâmetros diferentes. Use a seguinte função de utilitário para converter o modelo:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Se você tiver um pyspark.ml.PipelineModel modelo contendo um sparkdl.xgboost modelo como o último estágio, poderá substituir o estágio do sparkdl.xgboost modelo pelo modelo convertido xgboost.spark .

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)