Udostępnij za pośrednictwem


Rozproszone trenowanie modeli XGBoost przy użyciu xgboost.spark

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Pakiet języka Python xgboost>=1.7 zawiera nowy moduł xgboost.spark. Ten moduł zawiera narzędzia do szacowania xgboost.spark.SparkXGBRegressorXgboost PySpark, xgboost.spark.SparkXGBClassifieri xgboost.spark.SparkXGBRanker. Te nowe klasy obsługują dołączanie narzędzi do szacowania XGBoost w potokach SparkML. Aby uzyskać szczegółowe informacje o interfejsie API, zobacz dokumentację interfejsu API platformy Spark języka Python XGBoost.

Wymagania

Databricks Runtime 12.0 ML i nowsze.

xgboost.spark Parametry

Narzędzia do szacowania zdefiniowane w xgboost.spark module obsługują większość tych samych parametrów i argumentów używanych w standardowej biblioteki XGBoost.

  • Parametry konstruktora klasy, fit metody i predict metody są w dużej mierze identyczne z parametrami w xgboost.sklearn module.
  • Nazewnictwo, wartości i wartości domyślne są w większości identyczne z nazwami opisanymi w parametrach XGBoost.
  • Wyjątki to kilka nieobsługiwanych parametrów (takich jak , , , ) i pyspark parametry specyficzne dla narzędzia do szacowania, które zostały dodane (takie jak featuresCol, labelCol, use_gpu, ). validationIndicatorColeval_setsample_weightnthreadgpu_id Aby uzyskać szczegółowe informacje, zobacz dokumentację interfejsu API platformy Spark języka Python XGBoost.

Szkolenie rozproszone

Narzędzia do szacowania PySpark zdefiniowane w xgboost.spark module obsługują rozproszone trenowanie XGBoost przy użyciu parametru num_workers . Aby użyć trenowania rozproszonego, utwórz klasyfikator lub regresję i ustaw num_workers liczbę współbieżnych uruchomionych zadań platformy Spark podczas trenowania rozproszonego. Aby użyć wszystkich miejsc zadań platformy Spark, ustaw wartość num_workers=sc.defaultParallelism.

Na przykład:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Uwaga

  • Nie można używać z mlflow.xgboost.autolog rozproszoną biblioteką XGBoost. Aby zarejestrować model xgboost Spark przy użyciu biblioteki MLflow, użyj polecenia mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Nie można użyć rozproszonej biblioteki XGBoost w klastrze z włączonym skalowaniem automatycznym. Nowe węzły robocze, które rozpoczynają się w tym modelu elastycznego skalowania, nie mogą odbierać nowych zestawów zadań i pozostają bezczynne. Aby uzyskać instrukcje dotyczące wyłączania skalowania automatycznego, zobacz Włączanie skalowania automatycznego.

Włączanie optymalizacji na potrzeby trenowania w zestawie danych funkcji rozrzednia

Narzędzia do szacowania PySpark zdefiniowane w xgboost.spark module obsługują optymalizację trenowania zestawów danych z funkcjami rozrzednymi. Aby włączyć optymalizację zestawów funkcji rozrzedzona, należy podać zestaw danych fit metodzie zawierającej kolumnę funkcji składającą się z wartości typu pyspark.ml.linalg.SparseVector i ustawić parametr enable_sparse_data_optim narzędzia do szacowania na Truewartość . Ponadto należy ustawić missing parametr na 0.0wartość .

Na przykład:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Trenowanie procesora GPU

Narzędzia do szacowania PySpark zdefiniowane w module obsługi trenowania xgboost.spark na procesorach GPU. Ustaw parametr use_gpu na wartość , aby True włączyć trenowanie procesora GPU.

Uwaga

Dla każdego zadania platformy Spark używanego w trenowaniu rozproszonym XGBoost tylko jeden procesor GPU jest używany podczas trenowania, gdy use_gpu argument jest ustawiony na Truewartość . Usługa Databricks zaleca użycie wartości domyślnej 1 dla konfiguracji spark.task.resource.gpu.amountklastra Spark. W przeciwnym razie dodatkowe jednostki GPU przydzielone do tego zadania platformy Spark są bezczynne.

Na przykład:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Rozwiązywanie problemów

Podczas trenowania z wieloma węzłami, jeśli wystąpi NCCL failure: remote process exited or there was a network error komunikat, zazwyczaj wskazuje problem z komunikacją sieciową między procesorami GPU. Ten problem pojawia się, gdy NCCL (biblioteka NVIDIA Collective Communications Library) nie może używać niektórych interfejsów sieciowych do komunikacji z procesorem GPU.

Aby rozwiązać ten problem, ustaw parametr sparkConf klastra na spark.executorEnv.NCCL_SOCKET_IFNAME ethwartość . Zasadniczo ustawia zmienną środowiskową NCCL_SOCKET_IFNAME na eth dla wszystkich procesów roboczych w węźle.

Przykładowy notes

W tym notesie przedstawiono użycie pakietu xgboost.spark języka Python z biblioteką MLlib platformy Spark.

Notes PySpark-XGBoost

Pobierz notes

Przewodnik migracji dla przestarzałego sparkdl.xgboost modułu

  • Zastąp ciągiem from sparkdl.xgboost import XgboostRegressor i zastąp from xgboost.spark import SparkXGBRegressor ciąg from sparkdl.xgboost import XgboostClassifier .from xgboost.spark import SparkXGBClassifier
  • Zmień wszystkie nazwy parametrów w konstruktorze narzędzia do szacowania z stylu camelCase na styl snake_case. Na przykład zmień wartość XgboostRegressor(featuresCol=XXX) na SparkXGBRegressor(features_col=XXX).
  • Parametry use_external_storage i external_storage_precision zostały usunięte. xgboost.spark Narzędzia do szacowania używają interfejsu API iteracji danych DMatrix, aby wydajniej używać pamięci. Nie ma już potrzeby korzystania z nieefektywnego trybu przechowywania zewnętrznego. W przypadku bardzo dużych zestawów danych usługa Databricks zaleca zwiększenie parametru num_workers , co sprawia, że każde zadanie szkoleniowe dzieli dane na mniejsze, bardziej zarządzane partycje danych. Rozważ ustawienie num_workers = sc.defaultParallelism, które ustawia num_workers łączną liczbę miejsc zadań platformy Spark w klastrze.
  • W przypadku narzędzi do szacowania zdefiniowanych w programie xgboost.sparkustawienie num_workers=1 wykonuje trenowanie modelu przy użyciu pojedynczego zadania platformy Spark. Korzysta to z liczby rdzeni procesora CPU określonych przez ustawienie spark.task.cpuskonfiguracji klastra Spark , które jest domyślnie 1. Aby użyć większej liczby rdzeni procesora CPU do trenowania modelu, zwiększ num_workers lub spark.task.cpus. Nie można ustawić parametru nthread lub n_jobs dla narzędzi do szacowania zdefiniowanych w pliku xgboost.spark. To zachowanie różni się od poprzedniego zachowania narzędzia do szacowania zdefiniowanego w przestarzałym sparkdl.xgboost pakiecie.

Konwertowanie sparkdl.xgboost modelu na xgboost.spark model

sparkdl.xgboost modele są zapisywane w innym formacie niż xgboost.spark modele i mają różne ustawienia parametrów. Użyj następującej funkcji narzędzia, aby przekonwertować model:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Jeśli masz pyspark.ml.PipelineModel model zawierający sparkdl.xgboost model jako ostatni etap, możesz zastąpić etap sparkdl.xgboost modelu przekonwertowanym xgboost.spark modelem.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)