Treinamento distribuído de modelos XGBoost usando xgboost.spark
Importante
Esta funcionalidade está em Pré-visualização Pública.
O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark
. Este módulo inclui os estimadores xgboost.spark.SparkXGBRegressor
xgboost PySpark, xgboost.spark.SparkXGBClassifier
e xgboost.spark.SparkXGBRanker
. Essas novas classes suportam a inclusão de estimadores XGBoost em pipelines SparkML. Para obter detalhes da API, consulte o documento da API XGBoost python spark.
Requisitos
Databricks Runtime 12.0 ML e superior.
xgboost.spark
Parâmetros
Os estimadores definidos no módulo suportam a xgboost.spark
maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros para o construtor de classe,
fit
método epredict
método são em grande parte idênticos aos doxgboost.sklearn
módulo. - Nomenclatura, valores e padrões são basicamente idênticos aos descritos em parâmetros XGBoost.
- As exceções são alguns parâmetros sem suporte (como
gpu_id
, ,sample_weight
nthread
,eval_set
) e os parâmetros específicos dopyspark
estimador que foram adicionados (comofeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Para obter detalhes, consulte a documentação da API XGBoost Python Spark.
Preparação distribuída
Os estimadores PySpark definidos no módulo suportam treinamento xgboost.spark
XGBoost distribuído usando o num_workers
parâmetro. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers
o número de tarefas simultâneas do Spark em execução durante o treinamento distribuído. Para usar todos os slots de tarefas do Spark, defina num_workers=sc.defaultParallelism
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Nota
- Você não pode usar
mlflow.xgboost.autolog
com XGBoost distribuído. Para registrar um modelo xgboost Spark usando MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Não é possível usar XGBoost distribuído em um cluster que tenha o dimensionamento automático habilitado. Novos nós de trabalho que começam nesse paradigma de dimensionamento elástico não podem receber novos conjuntos de tarefas e permanecem ociosos. Para obter instruções sobre como desativar o dimensionamento automático, consulte Habilitar o dimensionamento automático.
Habilite a otimização para treinamento em conjuntos de dados de recursos esparsos
Os estimadores PySpark definidos no xgboost.spark
módulo suportam otimização para treinamento em conjuntos de dados com recursos esparsos.
Para habilitar a otimização de conjuntos de recursos esparsos, você precisa fornecer um conjunto de dados para o fit
método que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector
e definir o parâmetro enable_sparse_data_optim
do estimador como True
. Além disso, você precisa definir o missing
parâmetro como 0.0
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Treinamento de GPU
Os estimadores PySpark definidos no módulo suportam o xgboost.spark
treinamento em GPUs. Defina o parâmetro use_gpu
para True
habilitar o treinamento da GPU.
Nota
Para cada tarefa do Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o use_gpu
argumento é definido como True
. O Databricks recomenda o uso do valor padrão de para a configuração spark.task.resource.gpu.amount
do cluster Spark1
. Caso contrário, as GPUs adicionais alocadas para essa tarefa do Spark ficarão ociosas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Resolução de Problemas
Durante o treinamento de vários nós, se você encontrar uma NCCL failure: remote process exited or there was a network error
mensagem, ela geralmente indica um problema com a comunicação de rede entre GPUs. Esse problema surge quando NCCL (NVIDIA Collective Communications Library) não pode usar determinadas interfaces de rede para comunicação GPU.
Para resolver, defina sparkConf do cluster como spark.executorEnv.NCCL_SOCKET_IFNAME
eth
. Isso essencialmente define a variável NCCL_SOCKET_IFNAME
de ambiente para eth
todos os trabalhadores em um nó.
Bloco de notas de exemplo
Este bloco de anotações mostra o uso do pacote xgboost.spark
Python com o Spark MLlib.
Notebook PySpark-XGBoost
Guia de migração para o módulo preterido sparkdl.xgboost
- Substitua
from sparkdl.xgboost import XgboostRegressor
porfrom xgboost.spark import SparkXGBRegressor
e substituafrom sparkdl.xgboost import XgboostClassifier
porfrom xgboost.spark import SparkXGBClassifier
. - Altere todos os nomes de parâmetros no construtor do estimador de camelCase style para snake_case style. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)
paraSparkXGBRegressor(features_col=XXX)
. - Os parâmetros
use_external_storage
eexternal_storage_precision
foram removidos.xgboost.spark
os estimadores usam a API de iteração de dados DMatrix para usar a memória de forma mais eficiente. Não há mais necessidade de usar o modo de armazenamento externo ineficiente. Para conjuntos de dados extremamente grandes, o Databricks recomenda que você aumente o parâmetro, onum_workers
que faz com que cada tarefa de treinamento particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism
, que definenum_workers
o número total de slots de tarefas do Spark no cluster. - Para estimadores definidos no
xgboost.spark
, a configuraçãonum_workers=1
executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela definiçãospark.task.cpus
de configuração do cluster Spark, que é 1 por padrão. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workers
ouspark.task.cpus
. Não é possível definir onthread
parâmetro oun_jobs
para estimadores definidos emxgboost.spark
. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote preteridosparkdl.xgboost
.
Converter sparkdl.xgboost
modelo em xgboost.spark
modelo
sparkdl.xgboost
Os modelos são salvos em um formato diferente dos xgboost.spark
modelos e têm configurações de parâmetros diferentes. Use a seguinte função de utilitário para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um pyspark.ml.PipelineModel
modelo contendo um sparkdl.xgboost
modelo como o último estágio, poderá substituir o estágio do sparkdl.xgboost
modelo pelo modelo convertido xgboost.spark
.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)