Delen via


Gedistribueerde training van XGBoost-modellen met behulp van xgboost.spark

Belangrijk

Deze functie is beschikbaar als openbare preview.

Het Python-pakket xgboost>=1.7 bevat een nieuwe module xgboost.spark. Deze module bevat de xgboost PySpark-estimators xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifieren xgboost.spark.SparkXGBRanker. Deze nieuwe klassen ondersteunen het opnemen van XGBoost-schattingen in SparkML-pijplijnen. Zie het XGBoost Python Spark API-document voor API-details voor XGBoost.

Vereisten

Databricks Runtime 12.0 ML en hoger.

parameters voor xgboost.spark

De schattingen die zijn gedefinieerd in de xgboost.spark module ondersteunen de meeste van dezelfde parameters en argumenten die worden gebruikt in standaard XGBoost.

  • De parameters voor de klasseconstructor, fit methode en predict methode zijn grotendeels identiek aan de parameters in de xgboost.sklearn module.
  • Naamgeving, waarden en standaardinstellingen zijn grotendeels identiek aan die worden beschreven in XGBoost-parameters.
  • Uitzonderingen zijn enkele niet-ondersteunde parameters (zoals gpu_id, nthread, sample_weight, eval_set) en de pyspark-schattingsparameters die zijn toegevoegd (zoals featuresCol, labelCol, use_gpu, validationIndicatorCol). Zie de documentatie voor XGBoost Python Spark API voor meer informatie.

Gedistribueerde training

PySpark-estimators die zijn gedefinieerd in de xgboost.spark module ondersteunen gedistribueerde XGBoost-training met behulp van de num_workers parameter. Als u gedistribueerde training wilt gebruiken, maakt u een classificatie of regressor en stelt u num_workers in op het aantal gelijktijdige actieve Spark-taken tijdens gedistribueerde training. Als u alle Spark-taakslots wilt gebruiken, stelt u num_workers=sc.defaultParallelismals instelling in.

Bijvoorbeeld:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Notitie

  • U kunt niet gebruiken mlflow.xgboost.autolog met gedistribueerde XGBoost. Als u een xgboost Spark-model wilt registreren met behulp van MLflow, gebruikt u mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • U kunt gedistribueerde XGBoost niet gebruiken op een cluster waarvoor automatisch schalen is ingeschakeld. Nieuwe werkknooppunten die beginnen in dit paradigma voor elastisch schalen, kunnen geen nieuwe sets taken ontvangen en blijven inactief. Zie Automatisch schalen inschakelen voor instructies voor het uitschakelen van automatisch schalen.

Optimalisatie inschakelen voor training over gegevensset met sparsefuncties

PySpark Estimators die zijn gedefinieerd in xgboost.spark moduleondersteuningsoptimalisatie voor training over gegevenssets met sparse-functies. Als u de optimalisatie van sparse-functie sets wilt inschakelen, moet u een gegevensset opgeven voor de fit methode die een kenmerkenkolom bevat die bestaat uit waarden van het type pyspark.ml.linalg.SparseVector en de parameter enable_sparse_data_optim van de estimator instellen op True. Daarnaast moet u de parameter missing instellen op 0.0.

Bijvoorbeeld:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU-training

PySpark-schattingen die zijn gedefinieerd in de xgboost.spark moduleondersteuningstraining over GPU's. Stel de parameter use_gpu in op True om GPU-training in te schakelen.

Notitie

Voor elke Spark-taak die wordt gebruikt in gedistribueerde XGBoost-training, wordt er slechts één GPU gebruikt in de training wanneer het argument use_gpu is ingesteld op True. Databricks raadt aan de standaardwaarde van 1 de Spark-clusterconfiguratie spark.task.resource.gpu.amountte gebruiken. Anders zijn de extra GPU's die aan deze Spark-taak zijn toegewezen, niet actief.

Bijvoorbeeld:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Probleemoplossing

Tijdens training met meerdere knooppunten, als u een NCCL failure: remote process exited or there was a network error bericht tegenkomt, duidt dit meestal op een probleem met netwerkcommunicatie tussen GPU's. Dit probleem treedt op wanneer NCCL (NVIDIA Collective Communications Library) bepaalde netwerkinterfaces voor GPU-communicatie niet kan gebruiken.

U kunt dit oplossen door de sparkConf van het cluster voor spark.executorEnv.NCCL_SOCKET_IFNAME in te stellen op eth. Hiermee stelt u de omgevingsvariabele NCCL_SOCKET_IFNAMEeth in feite in voor alle werkrollen in een knooppunt.

Voorbeeld van notebook

Dit notebook toont het gebruik van het Python-pakket xgboost.spark met Spark MLlib.

PySpark-XGBoost-notebook

Notitieblok ophalen

Migratiehandleiding voor de afgeschafte sparkdl.xgboost module

  • Vervangen from sparkdl.xgboost import XgboostRegressor door from xgboost.spark import SparkXGBRegressor en vervangen door from sparkdl.xgboost import XgboostClassifierfrom xgboost.spark import SparkXGBClassifier .
  • Wijzig alle parameternamen in de estimatorconstructor van de camelCase-stijl in snake_case stijl. Wijzig bijvoorbeeld XgboostRegressor(featuresCol=XXX) in SparkXGBRegressor(features_col=XXX).
  • De parameters use_external_storage en external_storage_precision zijn verwijderd. xgboost.spark schattingen maken gebruik van de DMatrix-gegevensiteratie-API om efficiënter geheugen te gebruiken. U hoeft de inefficiënte externe opslagmodus niet meer te gebruiken. Voor extreem grote gegevenssets raadt Databricks u aan de parameter num_workers te verhogen, waardoor elke trainingstaak de gegevens partitioneert in kleinere, beter beheerbare gegevenspartities. Overweeg het instellen num_workers = sc.defaultParallelism, waarmee het totale aantal Spark-taaksites in het cluster wordt ingesteld num_workers .
  • Voor schattingen die zijn gedefinieerd in xgboost.spark, voert de instelling num_workers=1 modeltraining uit met één Spark-taak. Dit maakt gebruik van het aantal CPU-kernen dat is opgegeven door de configuratie-instelling spark.task.cpusvan het Spark-cluster. Dit is standaard 1. Als u meer CPU-kernen wilt gebruiken om het model te trainen, verhoogt num_workers u of spark.task.cpus. U kunt de parameter nthread of n_jobs niet instellen voor schattingen die zijn gedefinieerd in xgboost.spark. Dit gedrag verschilt van het vorige gedrag van estimators die zijn gedefinieerd in het afgeschafte sparkdl.xgboost pakket.

Model converteren sparkdl.xgboost naar xgboost.spark model

sparkdl.xgboost modellen worden opgeslagen in een andere indeling dan xgboost.spark modellen en hebben verschillende parameterinstellingen. Gebruik de volgende functie om het model te converteren:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Als u een pyspark.ml.PipelineModel model met een sparkdl.xgboost model als laatste fase hebt, kunt u de fase van sparkdl.xgboost het model vervangen door het geconverteerde xgboost.spark model.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)