Distribuované trénování modelů XGBoost pomocí xgboost.spark
Důležité
Tato funkce je ve verzi Public Preview.
Balíček Python xgboost>=1.7 obsahuje nový modul xgboost.spark
. Tento modul zahrnuje estimátory xgboost.spark.SparkXGBRegressor
xgboost PySpark , xgboost.spark.SparkXGBClassifier
a xgboost.spark.SparkXGBRanker
. Tyto nové třídy podporují zahrnutí estimátorů XGBoost do kanálů SparkML. Podrobnosti o rozhraní API najdete v dokumentaci k rozhraní PYTHON SPARK API XGBoost.
Požadavky
Databricks Runtime 12.0 ML a vyšší
xgboost.spark
parameters
Estimátory definované v modulu xgboost.spark
podporují většinu stejných parameters a argumentů použitých ve standardním XGBoostu.
-
parameters pro konstruktor třídy, metodu
fit
a metodupredict
jsou z velké části identické s metodami v moduluxgboost.sklearn
. - Názvy, valuesa výchozí hodnoty jsou většinou totožné s těmi popsanými v XGBoost parameters.
- Výjimky tvoří několik nepodporovaných parameters (například
gpu_id
,nthread
,sample_weight
,eval_set
) a specifické odhadypyspark
parameters, které byly přidány (napříkladfeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Podrobnosti najdete v dokumentaci k rozhraní XGBoost Python Spark API.
Distribuované trénování
Estimátory PySpark definované v xgboost.spark
modulu podporují distribuované trénování XGBoost pomocí parametru num_workers
. Pokud chcete použít distribuované trénování, vytvořte klasifikátor nebo regresor a setnum_workers
na počet souběžných spuštěných úloh Sparku během distribuovaného trénování. Pokud chcete použít všechny sloty úloh Sparku, setnum_workers=sc.defaultParallelism
.
Příklad:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Poznámka:
- Nelze použít
mlflow.xgboost.autolog
s distribuovaným XGBoostem. Pokud chcete protokolovat model Spark xgboost pomocí MLflow, použijtemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - Distribuovaný XGBoost nelze použít v clusteru s povoleným automatickým škálováním. Nové pracovní uzly, které začínají v tomto paradigmatu elastického škálování, nemůžou přijímat nové sady úloh a zůstat nečinné. Pokyny k zakázání automatického škálování najdete v tématu Povolení automatického škálování.
Povolení optimalizace pro trénování u řídkých funkcí – datová sada
Estimátory PySpark definované v xgboost.spark
modulu podporují optimalizaci pro trénování datových sad s řídkými funkcemi.
Pokud chcete povolit optimalizaci řídkých sad vlastností, je nutné zadat datovou sadu metodě fit
, která obsahuje vlastnosti column sestávající z values typu pyspark.ml.linalg.SparseVector
a set odhadového parametru enable_sparse_data_optim
True
. Kromě toho je potřeba set parametr missing
na 0.0
.
Příklad:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
Trénování GPU
Estimátory PySpark definované v xgboost.spark
modulu podporují trénování gpu.
Set parametru use_gpu
na True
, aby se povolilo trénovat GPU.
Poznámka:
Pro každou úlohu Sparku, která se používá v distribuovaném tréninku XGBoost, se při argumentu use_gpu
nastaveném na hodnotu mezi set a True
používá ke školení pouze jedna GPU. Databricks doporučuje použít výchozí hodnotu konfigurace clusteru 1
spark.task.resource.gpu.amount
Spark . V opačném případě jsou další GPU přidělené této úloze Sparku nečinné.
Příklad:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Řešení problému
Pokud během trénování s více uzly narazíte na NCCL failure: remote process exited or there was a network error
zprávu, obvykle to značí problém se síťovými komunikacemi mezi grafickými procesory. K tomuto problému dochází, když NCCL (NVIDIA Collective Communications Library) nemůže pro komunikaci s GPU používat určitá síťová rozhraní.
Pro konfiguraci nastav sparkConf clusteru od set do spark.executorEnv.NCCL_SOCKET_IFNAME
na eth
. Tím se v podstatě nastaví proměnná NCCL_SOCKET_IFNAME
eth
prostředí pro všechny pracovní procesy v uzlu.
Příklad poznámkového bloku
Tento poznámkový blok ukazuje použití balíčku xgboost.spark
Pythonu se sparkem MLlib.
Poznámkový blok PySpark-XGBoost
Průvodce migrací pro zastaralý sparkdl.xgboost
modul
- Nahraďte
from sparkdl.xgboost import XgboostRegressor
hofrom xgboost.spark import SparkXGBRegressor
a nahraďtefrom sparkdl.xgboost import XgboostClassifier
ho .from xgboost.spark import SparkXGBClassifier
- Změňte všechny názvy parametrů v konstruktoru estimátoru z stylu camelCase na snake_case styl. Například změňte
XgboostRegressor(featuresCol=XXX)
naSparkXGBRegressor(features_col=XXX)
. - Odebrali jsme parameters
use_external_storage
aexternal_storage_precision
.xgboost.spark
Estimátory používají rozhraní API iterace dat DMatrix k efektivnějšímu využití paměti. Už není potřeba používat neefektivní externí režim úložiště. U extrémně velkých datových sad doporučuje Databricks zvýšitnum_workers
parametr, díky kterému každý trénovací úkol partition data do menších a lépe spravovatelných datových oddílů. Zvažte nastavenínum_workers = sc.defaultParallelism
, které nastavínum_workers
celkový počet slotů úloh Sparku v clusteru. - U odhadců definovaných v
xgboost.spark
nastavenínum_workers=1
se provádí trénování modelu pomocí jediné úlohy Sparku. To využívá počet jader procesoru určených nastavenímspark.task.cpus
konfigurace clusteru Spark, což je ve výchozím nastavení 1. Pokud chcete k trénování modelu použít více jader procesoru, zvyštenum_workers
nebospark.task.cpus
. Nelze set parametrnthread
nebon_jobs
pro odhadátory definované vxgboost.spark
. Toto chování se liší od předchozího chování odhadců definovaných v zastaralémsparkdl.xgboost
balíčku.
Převod sparkdl.xgboost
modelu na xgboost.spark
model
sparkdl.xgboost
modely se ukládají v jiném formátu než xgboost.spark
modely a mají různá nastavení parametrů. K převodu modelu použijte následující funkci nástroje:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Pokud máte pyspark.ml.PipelineModel
model obsahující sparkdl.xgboost
model jako poslední fázi, můžete fázi sparkdl.xgboost
modelu nahradit převedeným xgboost.spark
modelem.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)