Sdílet prostřednictvím


Distribuované trénování modelů XGBoost pomocí xgboost.spark

Důležité

Tato funkce je ve verzi Public Preview.

Balíček Python xgboost>=1.7 obsahuje nový modul xgboost.spark. Tento modul zahrnuje estimátory xgboost.spark.SparkXGBRegressorxgboost PySpark , xgboost.spark.SparkXGBClassifiera xgboost.spark.SparkXGBRanker. Tyto nové třídy podporují zahrnutí estimátorů XGBoost do kanálů SparkML. Podrobnosti o rozhraní API najdete v dokumentaci k rozhraní PYTHON SPARK API XGBoost.

Požadavky

Databricks Runtime 12.0 ML a vyšší

xgboost.spark parameters

Estimátory definované v modulu xgboost.spark podporují většinu stejných parameters a argumentů použitých ve standardním XGBoostu.

  • parameters pro konstruktor třídy, metodu fit a metodu predict jsou z velké části identické s metodami v modulu xgboost.sklearn.
  • Názvy, valuesa výchozí hodnoty jsou většinou totožné s těmi popsanými v XGBoost parameters.
  • Výjimky tvoří několik nepodporovaných parameters (například gpu_id, nthread, sample_weight, eval_set) a specifické odhady pysparkparameters, které byly přidány (například featuresCol, labelCol, use_gpu, validationIndicatorCol). Podrobnosti najdete v dokumentaci k rozhraní XGBoost Python Spark API.

Distribuované trénování

Estimátory PySpark definované v xgboost.spark modulu podporují distribuované trénování XGBoost pomocí parametru num_workers . Pokud chcete použít distribuované trénování, vytvořte klasifikátor nebo regresor a setnum_workers na počet souběžných spuštěných úloh Sparku během distribuovaného trénování. Pokud chcete použít všechny sloty úloh Sparku, setnum_workers=sc.defaultParallelism.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Poznámka:

  • Nelze použít mlflow.xgboost.autolog s distribuovaným XGBoostem. Pokud chcete protokolovat model Spark xgboost pomocí MLflow, použijte mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • Distribuovaný XGBoost nelze použít v clusteru s povoleným automatickým škálováním. Nové pracovní uzly, které začínají v tomto paradigmatu elastického škálování, nemůžou přijímat nové sady úloh a zůstat nečinné. Pokyny k zakázání automatického škálování najdete v tématu Povolení automatického škálování.

Povolení optimalizace pro trénování u řídkých funkcí – datová sada

Estimátory PySpark definované v xgboost.spark modulu podporují optimalizaci pro trénování datových sad s řídkými funkcemi. Pokud chcete povolit optimalizaci řídkých sad vlastností, je nutné zadat datovou sadu metodě fit, která obsahuje vlastnosti column sestávající z values typu pyspark.ml.linalg.SparseVector a set odhadového parametru enable_sparse_data_optimTrue. Kromě toho je potřeba set parametr missing na 0.0.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

Trénování GPU

Estimátory PySpark definované v xgboost.spark modulu podporují trénování gpu. Set parametru use_gpu na True, aby se povolilo trénovat GPU.

Poznámka:

Pro každou úlohu Sparku, která se používá v distribuovaném tréninku XGBoost, se při argumentu use_gpu nastaveném na hodnotu mezi set a Truepoužívá ke školení pouze jedna GPU. Databricks doporučuje použít výchozí hodnotu konfigurace clusteru 1spark.task.resource.gpu.amountSpark . V opačném případě jsou další GPU přidělené této úloze Sparku nečinné.

Příklad:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Řešení problému

Pokud během trénování s více uzly narazíte na NCCL failure: remote process exited or there was a network error zprávu, obvykle to značí problém se síťovými komunikacemi mezi grafickými procesory. K tomuto problému dochází, když NCCL (NVIDIA Collective Communications Library) nemůže pro komunikaci s GPU používat určitá síťová rozhraní.

Pro konfiguraci nastav sparkConf clusteru od set do spark.executorEnv.NCCL_SOCKET_IFNAME na eth. Tím se v podstatě nastaví proměnná NCCL_SOCKET_IFNAMEeth prostředí pro všechny pracovní procesy v uzlu.

Příklad poznámkového bloku

Tento poznámkový blok ukazuje použití balíčku xgboost.spark Pythonu se sparkem MLlib.

Poznámkový blok PySpark-XGBoost

Get poznámkového bloku

Průvodce migrací pro zastaralý sparkdl.xgboost modul

  • Nahraďte from sparkdl.xgboost import XgboostRegressor ho from xgboost.spark import SparkXGBRegressor a nahraďte from sparkdl.xgboost import XgboostClassifier ho .from xgboost.spark import SparkXGBClassifier
  • Změňte všechny názvy parametrů v konstruktoru estimátoru z stylu camelCase na snake_case styl. Například změňte XgboostRegressor(featuresCol=XXX) na SparkXGBRegressor(features_col=XXX).
  • Odebrali jsme parametersuse_external_storage a external_storage_precision. xgboost.spark Estimátory používají rozhraní API iterace dat DMatrix k efektivnějšímu využití paměti. Už není potřeba používat neefektivní externí režim úložiště. U extrémně velkých datových sad doporučuje Databricks zvýšit num_workers parametr, díky kterému každý trénovací úkol partition data do menších a lépe spravovatelných datových oddílů. Zvažte nastavení num_workers = sc.defaultParallelism, které nastaví num_workers celkový počet slotů úloh Sparku v clusteru.
  • U odhadců definovaných v xgboost.sparknastavení num_workers=1 se provádí trénování modelu pomocí jediné úlohy Sparku. To využívá počet jader procesoru určených nastavením spark.task.cpuskonfigurace clusteru Spark, což je ve výchozím nastavení 1. Pokud chcete k trénování modelu použít více jader procesoru, zvyšte num_workers nebo spark.task.cpus. Nelze set parametr nthread nebo n_jobs pro odhadátory definované v xgboost.spark. Toto chování se liší od předchozího chování odhadců definovaných v zastaralém sparkdl.xgboost balíčku.

Převod sparkdl.xgboost modelu na xgboost.spark model

sparkdl.xgboost modely se ukládají v jiném formátu než xgboost.spark modely a mají různá nastavení parametrů. K převodu modelu použijte následující funkci nástroje:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Pokud máte pyspark.ml.PipelineModel model obsahující sparkdl.xgboost model jako poslední fázi, můžete fázi sparkdl.xgboost modelu nahradit převedeným xgboost.spark modelem.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)