共用方式為


使用 xgboost.spark 之 XGBoost 模型的分散式訓練

重要

這項功能處於公開預覽狀態

Python 套件 xgboost>=1.7 包含新的模組 xgboost.spark。 本模組包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressorxgboost.spark.SparkXGBClassifierxgboost.spark.SparkXGBRanker。 這些新類別支援在 SparkML 管線中支援包含 XGBoost 估算器。 如需 API 詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。

需求

Databricks Runtime 12.0 ML 和更新版本。

xgboost.spark 參數

xgboost.spark 模組中定義的估算器支援標準 XGBoost 中使用的大部分相同參數和引數。

  • 類別建構函式、fit 方法和 predict 方法的參數,大部分會與 xgboost.sklearn 模組中的參數完全相同。
  • 命名、值和預設值大多與 XGBoost 參數中所述的相同。
  • 例外狀況是一些不受支援的參數 (例如 gpu_idnthreadsample_weighteval_set) 和已新增的 pyspark 估算器特定參數 (例如 featuresCollabelColuse_gpuvalidationIndicatorCol)。 如需詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。

分散式訓練

xgboost.spark 模組中定義的 PySpark 估算器支援使用 num_workers 參數的分散式 XGBoost 訓練。 若要使用分散式訓練,請建立分類器或迴歸輸入變數,並將 num_workers 設定為分散式訓練期間並行執行的 Spark 工作數量。 若要使用所有 Spark 工作插槽,請設定 num_workers=sc.defaultParallelism

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

注意

  • 您無法使用 mlflow.xgboost.autolog 搭配分散式 XGBoost。 若要使用 MLflow 記錄 xgboost Spark 模型,請使用 mlflow.spark.log_model(spark_xgb_model, artifact_path)
  • 您無法在已啟用自動調整功能的叢集上使用分散式 XGBoost。 在此彈性調整範例中啟動的新工作者節點無法接收新的工作集,並維持閒置。 如需停用自動調整的指示,請參閱啟用自動調整 (英文)。

啟用疏鬆功能資料集訓練的最佳化

xgboost.spark 模組中定義的 PySpark 估算器支援針對具有疏鬆功能之資料集的培訓最佳化。 若要啟用疏鬆功能集的最佳化,您必須提供資料集給包含類型值 pyspark.ml.linalg.SparseVector 之功能資料行的資料集給 fit 方法,並將估算器參數 enable_sparse_data_optim 設定為 True。 此外,您必須將 missing 參數設定為 0.0

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU 訓練

xgboost.spark 模組中定義的 PySpark 估算器支援 GPU 訓練。 將參數 use_gpu 設定為 True 以啟用 GPU 訓練。

注意

針對 XGBoost 分散式訓練中使用的每個 Spark 工作,use_gpu 引數設定為 True 時,只會在訓練中使用一個 GPU。 Databricks 建議針對 Spark 叢集組態 spark.task.resource.gpu.amount 使用預設值 1。 否則,配置給此 Spark 工作的其他 GPU 會閒置。

例如:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

疑難排解

在多節點訓練期間,如果您收到 NCCL failure: remote process exited or there was a network error 訊息,通常表示 GPU 之間的網路通訊發生問題。 NCCL (NVIDIA 集體通訊程式庫) 無法使用特定網路介面進行 GPU 通訊時,就會發生此問題。

若要解決,請將叢集的 sparkConf spark.executorEnv.NCCL_SOCKET_IFNAME 設定為 eth。 這基本上會將節點中所有工作者的環境變數 NCCL_SOCKET_IFNAME 設定為 eth

範例筆記本

此 Notebook 顯示搭配 Spark MLlib 使用 Python 套件 xgboost.spark

PySpark-XGBoost Notebook

取得筆記本

已取代 sparkdl.xgboost 模組的移轉指南

  • from sparkdl.xgboost import XgboostRegressor 取代為 from xgboost.spark import SparkXGBRegressor,並將 from sparkdl.xgboost import XgboostClassifier 取代為 from xgboost.spark import SparkXGBClassifier
  • 將估算器建構函式中的所有參數名稱從 camelCase 樣式變更為 snake_case 樣式。 例如,將 XgboostRegressor(featuresCol=XXX) 變更為 SparkXGBRegressor(features_col=XXX)
  • 參數 use_external_storageexternal_storage_precision 已移除。 xgboost.spark 估算器會使用 DMatrix 資料反覆項目 API,以更有效率地使用記憶體。 不再需要使用效率不佳的外部儲存模式。 對於極大型資料集,Databricks 建議您增加 num_workers 參數,讓每個訓練工作將資料分割成更小、更容易管理的資料分割區。 請考慮設定 num_workers = sc.defaultParallelism,這會將 num_workers 設定為叢集中 Spark 工作位置的總數。
  • 針對 xgboost.spark 中定義的估算器,設定 num_workers=1 會使用單一 Spark 工作執行模型訓練。 這會利用 Spark 叢集組態設定 spark.task.cpus 所指定的 CPU 核心數目,預設為 1。 若要使用更多 CPU 核心來訓練模型,請增加 num_workersspark.task.cpus。 您無法為 xgboost.spark 中定義的估算器設定 nthreadn_jobs 參數。 此行為與已取代之 sparkdl.xgboost 套件中定義的估算器先前行為不同。

sparkdl.xgboost 模型轉換成 xgboost.spark 模型

sparkdl.xgboost 模型會以不同於 xgboost.spark 模型的格式儲存,而且有不同的參數設定。 使用下列公用程式函數來轉換模型:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

如果您有包含 sparkdl.xgboost 模型作為最後一個階段的 pyspark.ml.PipelineModel 模型,就可以將 sparkdl.xgboost 模型的階段取代為已轉換的 xgboost.spark 模型。

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)