Training distribuito dei modelli XGBoost con xgboost.spark
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Il pacchetto Python xgboost>=1.7 contiene un nuovo modulo xgboost.spark
. Questo modulo include le stime di PySpark xgboost xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
e xgboost.spark.SparkXGBRanker
. Queste nuove classi supportano l'inclusione di strumenti di stima XGBoost nelle pipeline SparkML. Per informazioni dettagliate sull'API, vedere la documentazione dell'API Spark per Python XGBoost.
Requisiti
Databricks Runtime 12.0 ML e versioni successive.
parametri xgboost.spark
Gli estimator definiti nel modulo xgboost.spark
supportano la maggior parte degli stessi parametri e argomenti usati in XGBoost standard.
- I parametri per il costruttore della classe, il metodo
fit
e il metodopredict
sono in gran parte identici a quelli del moduloxgboost.sklearn
. - I nomi, i valori e i valori predefiniti sono principalmente identici a quelli descritti nei parametri XGBoost .
- Le eccezioni sono alcuni parametri non supportati ( ad esempio
gpu_id
,nthread
,sample_weight
,eval_set
) e i parametri specifici dello strumento di stimapyspark
aggiunti , ad esempiofeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Per informazioni dettagliate, vedere la documentazione dell'API Spark per XGBoost Python.
Training distribuito
Le stime PySpark definite nel modulo xgboost.spark
supportano il training XGBoost distribuito usando il parametro num_workers
. Per usare il training distribuito, creare un classificatore o un regressore e impostare num_workers
sul numero di attività Spark simultanee in esecuzione durante il training distribuito. Per utilizzare tutti gli slot delle attività Spark, imposta num_workers=sc.defaultParallelism
.
Ad esempio:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Nota
- Non è possibile usare
mlflow.xgboost.autolog
con XGBoost distribuito. Per registrare un modello Spark xgboost usando MLflow, usaremlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Non è possibile usare XGBoost distribuito in un cluster con scalabilità automatica abilitata. I nuovi nodi di lavoro che iniziano in questo paradigma di ridimensionamento elastico non possono ricevere nuovi set di attività e rimanere inattivi. Per istruzioni su come disabilitare la scalabilità automatica, vedere Abilitare la scalabilità automatica.
Abilitare l'ottimizzazione per il training sul set di dati delle funzionalità di tipo sparse
Gli estimatori PySpark definiti nel modulo supportano l'ottimizzazione xgboost.spark
per il training sui set di dati con funzionalità di tipo sparse.
Per abilitare l'ottimizzazione dei set di funzionalità di tipo sparse, è necessario fornire un set di dati al metodo fit
che contiene una colonna di funzionalità costituita da valori di tipo pyspark.ml.linalg.SparseVector
e impostare il parametro estimator enable_sparse_data_optim
su True
. Inoltre, è necessario impostare il parametro missing
su 0.0
.
Ad esempio:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Training della GPU
Le stime PySpark definite nel modulo xgboost.spark
supportano il training sulle GPU. Impostare il parametro use_gpu
su True
per abilitare l'allenamento GPU.
Nota
Per ogni attività Spark usata nel training distribuito XGBoost, viene usata una sola GPU nel training quando l'argomento use_gpu
è impostato su True
. Databricks consiglia di usare il valore predefinito di 1
per la configurazione spark.task.resource.gpu.amount
del cluster Spark. In caso contrario, le GPU aggiuntive allocate a questa attività Spark sono inattive.
Ad esempio:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Risoluzione dei problemi
Durante il training a più nodi, se viene visualizzato un messaggio NCCL failure: remote process exited or there was a network error
, in genere indica un problema con la comunicazione di rete tra GPU. Questo problema si verifica quando NCCL (NVIDIA Collective Communications Library) non può usare determinate interfacce di rete per la comunicazione GPU.
Per risolvere il problema, impostare sparkConf del cluster per spark.executorEnv.NCCL_SOCKET_IFNAME
su eth
. In pratica, la variabile di ambiente NCCL_SOCKET_IFNAME
viene impostata su eth
per tutti i ruoli di lavoro in un nodo.
Notebook di esempio
Questo notebook illustra l'uso del pacchetto Python xgboost.spark
con Spark MLlib.
Notebook PySpark-XGBoost
Ottieni il notebook
Guida alla migrazione per il modulo deprecato sparkdl.xgboost
- Sostituire
from sparkdl.xgboost import XgboostRegressor
confrom xgboost.spark import SparkXGBRegressor
e sostituirefrom sparkdl.xgboost import XgboostClassifier
confrom xgboost.spark import SparkXGBClassifier
. - Modificare tutti i nomi dei parametri nel costruttore stime dallo stile camelCase allo stile snake_case. Puoi ad esempio modificare
XgboostRegressor(featuresCol=XXX)
inSparkXGBRegressor(features_col=XXX)
. - I parametri
use_external_storage
eexternal_storage_precision
sono stati rimossi. Le stimexgboost.spark
usano l'API di iterazione dei dati DMatrix per usare la memoria in modo più efficiente. Non è più necessario usare la modalità di archiviazione esterna inefficiente. Per set di dati estremamente grandi, Databricks consiglia di aumentare il parametronum_workers
in modo che ogni attività di training partizionerà i dati in partizioni più piccole e più gestibili. Prendere in considerazione l'impostazione dinum_workers = sc.defaultParallelism
, che impostanum_workers
sul numero totale di slot di attività Spark nel cluster. - Per le stime definite in
xgboost.spark
, l'impostazionenum_workers=1
esegue il training del modello usando una singola attività Spark. In questo modo viene utilizzato il numero di core CPU specificati dal set di configurazionespark.task.cpus
del cluster Spark, ovvero 1 per impostazione predefinita. Per usare più core CPU per eseguire il training del modello, aumentarenum_workers
ospark.task.cpus
. Non è possibile impostare il parametronthread
on_jobs
per gli estimator definiti inxgboost.spark
. Questo comportamento è diverso dal comportamento precedente delle stime definite nel pacchetto deprecatosparkdl.xgboost
.
Convertire il modello sparkdl.xgboost
nel modello xgboost.spark
I modelli sparkdl.xgboost
vengono salvati in un formato diverso rispetto ai modelli xgboost.spark
e hanno impostazioni dei parametri diverse. Usare la funzione dell'utilità seguente per convertire il modello:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se si dispone di un modello pyspark.ml.PipelineModel
contenente un modello sparkdl.xgboost
come ultima fase, è possibile sostituire la fase del modello sparkdl.xgboost
con il modello convertito xgboost.spark
.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)