Treinamento distribuído de modelos XGBoost usando o xgboost.spark
Importante
Esse recurso está em uma versão prévia.
O pacote xgboost>=1.7 do Python contém um novo módulo xgboost.spark
. Este módulo inclui os avaliadores xgboost xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
e xgboost.spark.SparkXGBRanker
do PySpark. Essas novas classes dão suporte à inclusão de avaliadores XGBoost em Pipelines do SparkML. Para saber detalhes da API, confira a documentação da API XGBoost python spark.
Requisitos
Databricks Runtime 12.0 ML e superior.
Parâmetros xgboost.spark
Os avaliadores definidos no módulo xgboost.spark
dão suporte à maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros para o construtor de classe, o método
fit
e o métodopredict
são praticamente idênticos aos do móduloxgboost.sklearn
. - Nomenclatura, valores e padrões são praticamente idênticos aos descritos nos parâmetros do XGBoost.
- Exceções são alguns parâmetros sem suporte (como
gpu_id
,nthread
,sample_weight
,eval_set
) e os parâmetros específicos do avaliadorpyspark
que foram adicionados (comofeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Para obter detalhes, confira a documentação da API XGBoost Python Spark.
Treinamento distribuído
Os avaliadores do PySpark definidos no módulo xgboost.spark
dão suporte ao treinamento XGBoost distribuído usando o parâmetro num_workers
. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers
como o número de tarefas do Spark em execução simultâneas durante o treinamento distribuído. Para usar todos os slots de tarefa do Spark, defina num_workers=sc.defaultParallelism
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Observação
- Você não pode usar
mlflow.xgboost.autolog
com XGBoost distribuído. Para registrar um modelo xgboost do Spark usando o MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Você não pode usar o XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecer ociosos. Para obter instruções de como desabilitar o dimensionamento automático, consulte Habilitar dimensionamento automático.
Habilitar a otimização para treinamento no conjunto de dados de recursos esparsos
Os Avaliadores do PySpark definidos no módulo xgboost.spark
dão suporte à otimização para treinamento em conjuntos de dados com recursos esparsos.
Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o método fit
que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector
e definir o parâmetro do avaliador enable_sparse_data_optim
como True
. Além disso, você precisa definir o parâmetro missing
como 0.0
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Treinamento de GPU
Os avaliadores do PySpark definidos no módulo xgboost.spark
dão suporte ao treinamento em GPUs. Defina o parâmetro use_gpu
como True
para habilitar o treinamento de GPU.
Observação
Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu
é definido como True
. O Databricks recomenda usar o valor padrão de 1
para a configuração spark.task.resource.gpu.amount
do cluster do Spark. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solução de problemas
Durante o treinamento de vários nós, se você encontrar uma mensagem de NCCL failure: remote process exited or there was a network error
, geralmente isso indica um problema com a comunicação de rede entre GPUs. Esse problema ocorre quando a NCCL (Biblioteca de Comunicações Coletivas NVIDIA) não pode usar determinados adaptadores de rede para comunicação de GPU.
Para resolver isso, configure o sparkConf do cluster de spark.executorEnv.NCCL_SOCKET_IFNAME
para eth
. Isso basicamente ajusta a variável de ambiente NCCL_SOCKET_IFNAME
para eth
em todos os trabalhos em um node.
Caderno de exemplo
Este notebook mostra o uso do pacote xgboost.spark
do Python com o Spark MLlib.
Notebook PySpark-XGBoost
Guia de migração para o módulo preterido sparkdl.xgboost
- Substitua
from sparkdl.xgboost import XgboostRegressor
porfrom xgboost.spark import SparkXGBRegressor
e substituafrom sparkdl.xgboost import XgboostClassifier
porfrom xgboost.spark import SparkXGBClassifier
. - Altere todos os nomes de parâmetro no construtor do avaliador do estilo camelCase para o estilo snake_case. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)
paraSparkXGBRegressor(features_col=XXX)
. - Os parâmetros
use_external_storage
eexternal_storage_precision
foram removidos. Os avaliadoresxgboost.spark
usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetronum_workers
, fazendo com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism
, que definenum_workers
como o número total de slots de tarefa do Spark no cluster. - Para avaliadores definidos no
xgboost.spark
, a configuraçãonum_workers=1
executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificados pela definiçãospark.task.cpus
da configuração do cluster do Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workers
ouspark.task.cpus
. Não é possível definir o parâmetronthread
oun_jobs
para avaliadores definidos noxgboost.spark
. Esse comportamento é diferente do comportamento anterior dos avaliadores definidos no pacote preteridosparkdl.xgboost
.
Converter o modelo sparkdl.xgboost
em modelo xgboost.spark
Os modelos sparkdl.xgboost
são salvos em um formato diferente dos modelos xgboost.spark
e têm configurações de parâmetro diferentes. Use a seguinte função de utilitário para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um modelo pyspark.ml.PipelineModel
que contém um modelo sparkdl.xgboost
como o último estágio, poderá substituir o estágio do modelo sparkdl.xgboost
pelo modelo convertido xgboost.spark
.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)