Gedistribueerde training van XGBoost-modellen met behulp van xgboost.spark
Belangrijk
Deze functie is beschikbaar als openbare preview.
Het Python-pakket xgboost>=1.7 bevat een nieuwe module xgboost.spark
. Deze module bevat de xgboost PySpark-estimators xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
en xgboost.spark.SparkXGBRanker
. Deze nieuwe klassen ondersteunen het opnemen van XGBoost-schattingen in SparkML-pijplijnen. Zie het XGBoost Python Spark API-document voor API-details voor XGBoost.
Vereisten
Databricks Runtime 12.0 ML en hoger.
parameters voor xgboost.spark
De schattingen die zijn gedefinieerd in de xgboost.spark
module ondersteunen de meeste van dezelfde parameters en argumenten die worden gebruikt in standaard XGBoost.
- De parameters voor de klasseconstructor,
fit
methode enpredict
methode zijn grotendeels identiek aan de parameters in dexgboost.sklearn
module. - Naamgeving, waarden en standaardinstellingen zijn grotendeels identiek aan die worden beschreven in XGBoost-parameters.
- Uitzonderingen zijn enkele niet-ondersteunde parameters (zoals
gpu_id
,nthread
,sample_weight
,eval_set
) en depyspark
-schattingsparameters die zijn toegevoegd (zoalsfeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Zie de documentatie voor XGBoost Python Spark API voor meer informatie.
Gedistribueerde training
PySpark-estimators die zijn gedefinieerd in de xgboost.spark
module ondersteunen gedistribueerde XGBoost-training met behulp van de num_workers
parameter. Als u gedistribueerde training wilt gebruiken, maakt u een classificatie of regressor en stelt u num_workers
in op het aantal gelijktijdige actieve Spark-taken tijdens gedistribueerde training. Als u alle Spark-taakslots wilt gebruiken, stelt u num_workers=sc.defaultParallelism
als instelling in.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Notitie
- U kunt niet gebruiken
mlflow.xgboost.autolog
met gedistribueerde XGBoost. Als u een xgboost Spark-model wilt registreren met behulp van MLflow, gebruikt umlflow.spark.log_model(spark_xgb_model, artifact_path)
. - U kunt gedistribueerde XGBoost niet gebruiken op een cluster waarvoor automatisch schalen is ingeschakeld. Nieuwe werkknooppunten die beginnen in dit paradigma voor elastisch schalen, kunnen geen nieuwe sets taken ontvangen en blijven inactief. Zie Automatisch schalen inschakelen voor instructies voor het uitschakelen van automatisch schalen.
Optimalisatie inschakelen voor training over gegevensset met sparsefuncties
PySpark Estimators die zijn gedefinieerd in xgboost.spark
moduleondersteuningsoptimalisatie voor training over gegevenssets met sparse-functies.
Als u de optimalisatie van sparse-functie sets wilt inschakelen, moet u een gegevensset opgeven voor de fit
methode die een kenmerkenkolom bevat die bestaat uit waarden van het type pyspark.ml.linalg.SparseVector
en de parameter enable_sparse_data_optim
van de estimator instellen op True
. Daarnaast moet u de parameter missing
instellen op 0.0
.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU-training
PySpark-schattingen die zijn gedefinieerd in de xgboost.spark
moduleondersteuningstraining over GPU's. Stel de parameter use_gpu
in op True
om GPU-training in te schakelen.
Notitie
Voor elke Spark-taak die wordt gebruikt in gedistribueerde XGBoost-training, wordt er slechts één GPU gebruikt in de training wanneer het argument use_gpu
is ingesteld op True
. Databricks raadt aan de standaardwaarde van 1
de Spark-clusterconfiguratie spark.task.resource.gpu.amount
te gebruiken. Anders zijn de extra GPU's die aan deze Spark-taak zijn toegewezen, niet actief.
Bijvoorbeeld:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Probleemoplossing
Tijdens training met meerdere knooppunten, als u een NCCL failure: remote process exited or there was a network error
bericht tegenkomt, duidt dit meestal op een probleem met netwerkcommunicatie tussen GPU's. Dit probleem treedt op wanneer NCCL (NVIDIA Collective Communications Library) bepaalde netwerkinterfaces voor GPU-communicatie niet kan gebruiken.
U kunt dit oplossen door de sparkConf van het cluster voor spark.executorEnv.NCCL_SOCKET_IFNAME
in te stellen op eth
. Hiermee stelt u de omgevingsvariabele NCCL_SOCKET_IFNAME
eth
in feite in voor alle werkrollen in een knooppunt.
Voorbeeld van notebook
Dit notebook toont het gebruik van het Python-pakket xgboost.spark
met Spark MLlib.
PySpark-XGBoost-notebook
Migratiehandleiding voor de afgeschafte sparkdl.xgboost
module
- Vervangen
from sparkdl.xgboost import XgboostRegressor
doorfrom xgboost.spark import SparkXGBRegressor
en vervangen doorfrom sparkdl.xgboost import XgboostClassifier
from xgboost.spark import SparkXGBClassifier
. - Wijzig alle parameternamen in de estimatorconstructor van de camelCase-stijl in snake_case stijl. Wijzig bijvoorbeeld
XgboostRegressor(featuresCol=XXX)
inSparkXGBRegressor(features_col=XXX)
. - De parameters
use_external_storage
enexternal_storage_precision
zijn verwijderd.xgboost.spark
schattingen maken gebruik van de DMatrix-gegevensiteratie-API om efficiënter geheugen te gebruiken. U hoeft de inefficiënte externe opslagmodus niet meer te gebruiken. Voor extreem grote gegevenssets raadt Databricks u aan de parameternum_workers
te verhogen, waardoor elke trainingstaak de gegevens partitioneert in kleinere, beter beheerbare gegevenspartities. Overweeg het instellennum_workers = sc.defaultParallelism
, waarmee het totale aantal Spark-taaksites in het cluster wordt ingesteldnum_workers
. - Voor schattingen die zijn gedefinieerd in
xgboost.spark
, voert de instellingnum_workers=1
modeltraining uit met één Spark-taak. Dit maakt gebruik van het aantal CPU-kernen dat is opgegeven door de configuratie-instellingspark.task.cpus
van het Spark-cluster. Dit is standaard 1. Als u meer CPU-kernen wilt gebruiken om het model te trainen, verhoogtnum_workers
u ofspark.task.cpus
. U kunt de parameternthread
ofn_jobs
niet instellen voor schattingen die zijn gedefinieerd inxgboost.spark
. Dit gedrag verschilt van het vorige gedrag van estimators die zijn gedefinieerd in het afgeschaftesparkdl.xgboost
pakket.
Model converteren sparkdl.xgboost
naar xgboost.spark
model
sparkdl.xgboost
modellen worden opgeslagen in een andere indeling dan xgboost.spark
modellen en hebben verschillende parameterinstellingen. Gebruik de volgende functie om het model te converteren:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Als u een pyspark.ml.PipelineModel
model met een sparkdl.xgboost
model als laatste fase hebt, kunt u de fase van sparkdl.xgboost
het model vervangen door het geconverteerde xgboost.spark
model.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)