Rozproszone trenowanie modeli XGBoost przy użyciu xgboost.spark
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Pakiet języka Python xgboost>=1.7 zawiera nowy moduł xgboost.spark
. Ten moduł zawiera narzędzia do szacowania xgboost.spark.SparkXGBRegressor
Xgboost PySpark, xgboost.spark.SparkXGBClassifier
i xgboost.spark.SparkXGBRanker
. Te nowe klasy obsługują dołączanie narzędzi do szacowania XGBoost w potokach SparkML. Aby uzyskać szczegółowe informacje o interfejsie API, zobacz dokumentację interfejsu API platformy Spark języka Python XGBoost.
Wymagania
Databricks Runtime 12.0 ML i nowsze.
xgboost.spark
Parametry
Narzędzia do szacowania zdefiniowane w xgboost.spark
module obsługują większość tych samych parametrów i argumentów używanych w standardowej biblioteki XGBoost.
- Parametry konstruktora klasy,
fit
metody ipredict
metody są w dużej mierze identyczne z parametrami wxgboost.sklearn
module. - Nazewnictwo, wartości i wartości domyślne są w większości identyczne z nazwami opisanymi w parametrach XGBoost.
- Wyjątki to kilka nieobsługiwanych parametrów (takich jak , , , ) i
pyspark
parametry specyficzne dla narzędzia do szacowania, które zostały dodane (takie jakfeaturesCol
,labelCol
,use_gpu
, ).validationIndicatorCol
eval_set
sample_weight
nthread
gpu_id
Aby uzyskać szczegółowe informacje, zobacz dokumentację interfejsu API platformy Spark języka Python XGBoost.
Szkolenie rozproszone
Narzędzia do szacowania PySpark zdefiniowane w xgboost.spark
module obsługują rozproszone trenowanie XGBoost przy użyciu parametru num_workers
. Aby użyć trenowania rozproszonego, utwórz klasyfikator lub regresję i ustaw num_workers
liczbę współbieżnych uruchomionych zadań platformy Spark podczas trenowania rozproszonego. Aby użyć wszystkich miejsc zadań platformy Spark, ustaw wartość num_workers=sc.defaultParallelism
.
Na przykład:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Uwaga
- Nie można używać z
mlflow.xgboost.autolog
rozproszoną biblioteką XGBoost. Aby zarejestrować model xgboost Spark przy użyciu biblioteki MLflow, użyj poleceniamlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Nie można użyć rozproszonej biblioteki XGBoost w klastrze z włączonym skalowaniem automatycznym. Nowe węzły robocze, które rozpoczynają się w tym modelu elastycznego skalowania, nie mogą odbierać nowych zestawów zadań i pozostają bezczynne. Aby uzyskać instrukcje dotyczące wyłączania skalowania automatycznego, zobacz Włączanie skalowania automatycznego.
Włączanie optymalizacji na potrzeby trenowania w zestawie danych funkcji rozrzednia
Narzędzia do szacowania PySpark zdefiniowane w xgboost.spark
module obsługują optymalizację trenowania zestawów danych z funkcjami rozrzednymi.
Aby włączyć optymalizację zestawów funkcji rozrzedzona, należy podać zestaw danych fit
metodzie zawierającej kolumnę funkcji składającą się z wartości typu pyspark.ml.linalg.SparseVector
i ustawić parametr enable_sparse_data_optim
narzędzia do szacowania na True
wartość . Ponadto należy ustawić missing
parametr na 0.0
wartość .
Na przykład:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Trenowanie procesora GPU
Narzędzia do szacowania PySpark zdefiniowane w module obsługi trenowania xgboost.spark
na procesorach GPU. Ustaw parametr use_gpu
na wartość , aby True
włączyć trenowanie procesora GPU.
Uwaga
Dla każdego zadania platformy Spark używanego w trenowaniu rozproszonym XGBoost tylko jeden procesor GPU jest używany podczas trenowania, gdy use_gpu
argument jest ustawiony na True
wartość . Usługa Databricks zaleca użycie wartości domyślnej 1
dla konfiguracji spark.task.resource.gpu.amount
klastra Spark. W przeciwnym razie dodatkowe jednostki GPU przydzielone do tego zadania platformy Spark są bezczynne.
Na przykład:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Rozwiązywanie problemów
Podczas trenowania z wieloma węzłami, jeśli wystąpi NCCL failure: remote process exited or there was a network error
komunikat, zazwyczaj wskazuje problem z komunikacją sieciową między procesorami GPU. Ten problem pojawia się, gdy NCCL (biblioteka NVIDIA Collective Communications Library) nie może używać niektórych interfejsów sieciowych do komunikacji z procesorem GPU.
Aby rozwiązać ten problem, ustaw parametr sparkConf klastra na spark.executorEnv.NCCL_SOCKET_IFNAME
eth
wartość . Zasadniczo ustawia zmienną środowiskową NCCL_SOCKET_IFNAME
na eth
dla wszystkich procesów roboczych w węźle.
Przykładowy notes
W tym notesie przedstawiono użycie pakietu xgboost.spark
języka Python z biblioteką MLlib platformy Spark.
Notes PySpark-XGBoost
Przewodnik migracji dla przestarzałego sparkdl.xgboost
modułu
- Zastąp ciągiem
from sparkdl.xgboost import XgboostRegressor
i zastąpfrom xgboost.spark import SparkXGBRegressor
ciągfrom sparkdl.xgboost import XgboostClassifier
.from xgboost.spark import SparkXGBClassifier
- Zmień wszystkie nazwy parametrów w konstruktorze narzędzia do szacowania z stylu camelCase na styl snake_case. Na przykład zmień wartość
XgboostRegressor(featuresCol=XXX)
naSparkXGBRegressor(features_col=XXX)
. - Parametry
use_external_storage
iexternal_storage_precision
zostały usunięte.xgboost.spark
Narzędzia do szacowania używają interfejsu API iteracji danych DMatrix, aby wydajniej używać pamięci. Nie ma już potrzeby korzystania z nieefektywnego trybu przechowywania zewnętrznego. W przypadku bardzo dużych zestawów danych usługa Databricks zaleca zwiększenie parametrunum_workers
, co sprawia, że każde zadanie szkoleniowe dzieli dane na mniejsze, bardziej zarządzane partycje danych. Rozważ ustawienienum_workers = sc.defaultParallelism
, które ustawianum_workers
łączną liczbę miejsc zadań platformy Spark w klastrze. - W przypadku narzędzi do szacowania zdefiniowanych w programie
xgboost.spark
ustawienienum_workers=1
wykonuje trenowanie modelu przy użyciu pojedynczego zadania platformy Spark. Korzysta to z liczby rdzeni procesora CPU określonych przez ustawieniespark.task.cpus
konfiguracji klastra Spark , które jest domyślnie 1. Aby użyć większej liczby rdzeni procesora CPU do trenowania modelu, zwiększnum_workers
lubspark.task.cpus
. Nie można ustawić parametrunthread
lubn_jobs
dla narzędzi do szacowania zdefiniowanych w plikuxgboost.spark
. To zachowanie różni się od poprzedniego zachowania narzędzia do szacowania zdefiniowanego w przestarzałymsparkdl.xgboost
pakiecie.
Konwertowanie sparkdl.xgboost
modelu na xgboost.spark
model
sparkdl.xgboost
modele są zapisywane w innym formacie niż xgboost.spark
modele i mają różne ustawienia parametrów. Użyj następującej funkcji narzędzia, aby przekonwertować model:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Jeśli masz pyspark.ml.PipelineModel
model zawierający sparkdl.xgboost
model jako ostatni etap, możesz zastąpić etap sparkdl.xgboost
modelu przekonwertowanym xgboost.spark
modelem.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)