Distribuerad träning av XGBoost-modeller med hjälp av xgboost.spark
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Python-paketet xgboost>=1.7 innehåller en ny modul xgboost.spark
. Den här modulen innehåller xgboost PySpark-skattarna xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
och xgboost.spark.SparkXGBRanker
. Dessa nya klasser stöder införandet av XGBoost-skattningar i SparkML-pipelines. Api-information finns i XGBoost python spark API-dokumentet.
Krav
Databricks Runtime 12.0 ML och senare.
xgboost.spark
Parametrar
De skattningar som definieras i modulen xgboost.spark
stöder de flesta av samma parametrar och argument som används i standard XGBoost.
- Parametrarna för klasskonstruktorn,
fit
-metoden ochpredict
-metoden är i stort sett identiska med dem i modulenxgboost.sklearn
. - Namngivning, värden och standardvärden är mestadels identiska med de som beskrivs i XGBoost-parametrar.
- Undantag är några parametrar som inte stöds (till exempel , , , ), och de
gpu_id
beräkningsspecifika parametrar som har lagts till (till exempelnthread
,sample_weight
,eval_set
,pyspark
).featuresCol
labelCol
use_gpu
validationIndicatorCol
Mer information finns i dokumentationen om XGBoost Python Spark API.
Distribuerad träning
PySpark-skattare som definierats i modulen xgboost.spark
stöder distribuerad XGBoost-träning med hjälp av parametern num_workers
. Om du vill använda distribuerad träning skapar du en klassificerare eller regressor och anger num_workers
antalet samtidiga Spark-aktiviteter som körs under distribuerad träning. Om du vill använda alla Spark-aktivitetsfack anger du num_workers=sc.defaultParallelism
.
Till exempel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Kommentar
- Du kan inte använda
mlflow.xgboost.autolog
med distribuerad XGBoost. Om du vill logga en xgboost Spark-modell med MLflow använder dumlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Du kan inte använda distribuerad XGBoost i ett kluster som har automatisk skalning aktiverat. Nya arbetsnoder som startar i det här elastiska skalningsparadigmet kan inte ta emot nya uppsättningar uppgifter och förbli inaktiva. Instruktioner för att inaktivera automatisk skalning finns i Aktivera automatisk skalning.
Aktivera optimering för träning på datauppsättning med glesa funktioner
PySpark-skattningar som definierats i xgboost.spark
modulen stöder optimering för träning av datauppsättningar med glesa funktioner.
Om du vill aktivera optimering av glesa funktionsuppsättningar måste du ange en datauppsättning för metoden fit
som innehåller en funktionskolumn som består av värden av typen pyspark.ml.linalg.SparseVector
och ange parametern enable_sparse_data_optim
estimator till True
. Dessutom måste du ange parametern missing
till 0.0
.
Till exempel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU-utbildning
PySpark-skattningar som definierats i modulen xgboost.spark
stöder utbildning på GPU:er. Ange parametern use_gpu
till True
för att aktivera GPU-träning.
Kommentar
För varje Spark-uppgift som används i distribuerad XGBoost-träning används endast en GPU i träning när use_gpu
argumentet är inställt på True
. Databricks rekommenderar att du använder standardvärdet 1
för för Spark-klusterkonfigurationen spark.task.resource.gpu.amount
. I annat fall är de ytterligare GPU:er som allokerats till den här Spark-aktiviteten inaktiva.
Till exempel:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Felsökning
Om du stöter på ett NCCL failure: remote process exited or there was a network error
meddelande under träning med flera noder indikerar det vanligtvis ett problem med nätverkskommunikation mellan GPU:er. Det här problemet uppstår när NCCL (NVIDIA Collective Communications Library) inte kan använda vissa nätverksgränssnitt för GPU-kommunikation.
Lös problemet genom att ange klustrets sparkConf för spark.executorEnv.NCCL_SOCKET_IFNAME
till eth
. Detta anger i princip miljövariabeln NCCL_SOCKET_IFNAME
till eth
för alla arbetare i en nod.
Exempelnotebook-fil
Den här notebook-filen visar användningen av Python-paketet xgboost.spark
med Spark MLlib.
PySpark-XGBoost Notebook
Migreringsguide för den inaktuella sparkdl.xgboost
modulen
- Ersätt
from sparkdl.xgboost import XgboostRegressor
medfrom xgboost.spark import SparkXGBRegressor
och ersättfrom sparkdl.xgboost import XgboostClassifier
medfrom xgboost.spark import SparkXGBClassifier
. - Ändra alla parameternamn i estimatorkonstruktorn från camelCase-formatmallen till snake_case formatmall. Ändra
XgboostRegressor(featuresCol=XXX)
till exempel tillSparkXGBRegressor(features_col=XXX)
. - Parametrarna
use_external_storage
ochexternal_storage_precision
har tagits bort.xgboost.spark
estimatorer använder DMatrix data iteration API för att använda minne mer effektivt. Det finns inte längre något behov av att använda ineffektivt externt lagringsläge. För extremt stora datauppsättningar rekommenderar Databricks att du ökar parameternnum_workers
, vilket gör att varje träningsuppgift partitioneras till mindre, mer hanterbara datapartitioner. Överväg att angenum_workers = sc.defaultParallelism
, som angernum_workers
det totala antalet Spark-aktivitetsfack i klustret. - För skattare som definieras i
xgboost.spark
kör inställningennum_workers=1
modellträning med hjälp av en enda Spark-uppgift. Detta använder det antal CPU-kärnor som anges av konfigurationsinställningenspark.task.cpus
för Spark-klustret , som är 1 som standard. Om du vill använda fler CPU-kärnor för att träna modellen ökarnum_workers
du ellerspark.task.cpus
. Du kan inte ange parameternnthread
ellern_jobs
för skattningar som definierats ixgboost.spark
. Det här beteendet skiljer sig från det tidigare beteendet för skattningar som definierats i det inaktuellasparkdl.xgboost
paketet.
Konvertera sparkdl.xgboost
modell till xgboost.spark
modell
sparkdl.xgboost
modeller sparas i ett annat format än xgboost.spark
modeller och har olika parameterinställningar. Använd följande verktygsfunktion för att konvertera modellen:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Om du har en pyspark.ml.PipelineModel
modell som innehåller en sparkdl.xgboost
modell som den sista fasen kan du ersätta modellsteget sparkdl.xgboost
med den konverterade xgboost.spark
modellen.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)