Condividi tramite


Training distribuito dei modelli XGBoost con xgboost.spark

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Il pacchetto Python xgboost>=1.7 contiene un nuovo modulo xgboost.spark. Questo modulo include le stime di PySpark xgboost xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker. Queste nuove classi supportano l'inclusione di strumenti di stima XGBoost nelle pipeline SparkML. Per informazioni dettagliate sull'API, vedere la documentazione dell'API Spark per Python XGBoost.

Requisiti

Databricks Runtime 12.0 ML e versioni successive.

parametri xgboost.spark

Gli estimator definiti nel modulo xgboost.spark supportano la maggior parte degli stessi parametri e argomenti usati in XGBoost standard.

  • I parametri per il costruttore della classe, il metodo fit e il metodo predict sono in gran parte identici a quelli del modulo xgboost.sklearn.
  • I nomi, i valori e i valori predefiniti sono principalmente identici a quelli descritti nei parametri XGBoost .
  • Le eccezioni sono alcuni parametri non supportati ( ad esempio gpu_id, nthread, sample_weight, eval_set) e i parametri specifici dello strumento di stima pyspark aggiunti , ad esempio featuresCol, labelCol, use_gpu, validationIndicatorCol). Per informazioni dettagliate, vedere la documentazione dell'API Spark per XGBoost Python.

Training distribuito

Le stime PySpark definite nel modulo xgboost.spark supportano il training XGBoost distribuito usando il parametro num_workers. Per usare il training distribuito, creare un classificatore o un regressore e impostare num_workers sul numero di attività Spark simultanee in esecuzione durante il training distribuito. Per utilizzare tutti gli slot delle attività Spark, imposta num_workers=sc.defaultParallelism.

Ad esempio:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Nota

  • Non è possibile usare mlflow.xgboost.autolog con XGBoost distribuito. Per registrare un modello Spark xgboost usando MLflow, usare mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Non è possibile usare XGBoost distribuito in un cluster con scalabilità automatica abilitata. I nuovi nodi di lavoro che iniziano in questo paradigma di ridimensionamento elastico non possono ricevere nuovi set di attività e rimanere inattivi. Per istruzioni su come disabilitare la scalabilità automatica, vedere Abilitare la scalabilità automatica.

Abilitare l'ottimizzazione per il training sul set di dati delle funzionalità di tipo sparse

Gli estimatori PySpark definiti nel modulo supportano l'ottimizzazione xgboost.spark per il training sui set di dati con funzionalità di tipo sparse. Per abilitare l'ottimizzazione dei set di funzionalità di tipo sparse, è necessario fornire un set di dati al metodo fit che contiene una colonna di funzionalità costituita da valori di tipo pyspark.ml.linalg.SparseVector e impostare il parametro estimator enable_sparse_data_optim su True. Inoltre, è necessario impostare il parametro missing su 0.0.

Ad esempio:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Training della GPU

Le stime PySpark definite nel modulo xgboost.spark supportano il training sulle GPU. Impostare il parametro use_gpu su True per abilitare l'allenamento GPU.

Nota

Per ogni attività Spark usata nel training distribuito XGBoost, viene usata una sola GPU nel training quando l'argomento use_gpu è impostato su True. Databricks consiglia di usare il valore predefinito di 1 per la configurazione spark.task.resource.gpu.amount del cluster Spark. In caso contrario, le GPU aggiuntive allocate a questa attività Spark sono inattive.

Ad esempio:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Risoluzione dei problemi

Durante il training a più nodi, se viene visualizzato un messaggio NCCL failure: remote process exited or there was a network error, in genere indica un problema con la comunicazione di rete tra GPU. Questo problema si verifica quando NCCL (NVIDIA Collective Communications Library) non può usare determinate interfacce di rete per la comunicazione GPU.

Per risolvere il problema, impostare sparkConf del cluster per spark.executorEnv.NCCL_SOCKET_IFNAME su eth. In pratica, la variabile di ambiente NCCL_SOCKET_IFNAME viene impostata su eth per tutti i ruoli di lavoro in un nodo.

Notebook di esempio

Questo notebook illustra l'uso del pacchetto Python xgboost.spark con Spark MLlib.

Notebook PySpark-XGBoost

Ottieni il notebook

Guida alla migrazione per il modulo deprecato sparkdl.xgboost

  • Sostituire from sparkdl.xgboost import XgboostRegressor con from xgboost.spark import SparkXGBRegressor e sostituire from sparkdl.xgboost import XgboostClassifier con from xgboost.spark import SparkXGBClassifier.
  • Modificare tutti i nomi dei parametri nel costruttore stime dallo stile camelCase allo stile snake_case. Puoi ad esempio modificare XgboostRegressor(featuresCol=XXX) in SparkXGBRegressor(features_col=XXX).
  • I parametri use_external_storage e external_storage_precision sono stati rimossi. Le stime xgboost.spark usano l'API di iterazione dei dati DMatrix per usare la memoria in modo più efficiente. Non è più necessario usare la modalità di archiviazione esterna inefficiente. Per set di dati estremamente grandi, Databricks consiglia di aumentare il parametro num_workers in modo che ogni attività di training partizionerà i dati in partizioni più piccole e più gestibili. Prendere in considerazione l'impostazione di num_workers = sc.defaultParallelism, che imposta num_workers sul numero totale di slot di attività Spark nel cluster.
  • Per le stime definite in xgboost.spark, l'impostazione num_workers=1 esegue il training del modello usando una singola attività Spark. In questo modo viene utilizzato il numero di core CPU specificati dal set di configurazione spark.task.cpus del cluster Spark, ovvero 1 per impostazione predefinita. Per usare più core CPU per eseguire il training del modello, aumentare num_workers o spark.task.cpus. Non è possibile impostare il parametro nthread o n_jobs per gli estimator definiti in xgboost.spark. Questo comportamento è diverso dal comportamento precedente delle stime definite nel pacchetto deprecato sparkdl.xgboost.

Convertire il modello sparkdl.xgboost nel modello xgboost.spark

I modelli sparkdl.xgboost vengono salvati in un formato diverso rispetto ai modelli xgboost.spark e hanno impostazioni dei parametri diverse. Usare la funzione dell'utilità seguente per convertire il modello:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Se si dispone di un modello pyspark.ml.PipelineModel contenente un modello sparkdl.xgboost come ultima fase, è possibile sostituire la fase del modello sparkdl.xgboost con il modello convertito xgboost.spark.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)