使用 xgboost.spark
之 XGBoost 模型的分散式訓練
重要
這項功能處於公開預覽狀態。
Python 套件 xgboost>=1.7 包含新的模組 xgboost.spark
。 本模組包含 xgboost PySpark 估算器 xgboost.spark.SparkXGBRegressor
、xgboost.spark.SparkXGBClassifier
和 xgboost.spark.SparkXGBRanker
。 這些新類別支援在 SparkML 管線中支援包含 XGBoost 估算器。 如需 API 詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。
需求
Databricks Runtime 12.0 ML 和更新版本。
xgboost.spark
參數
xgboost.spark
模組中定義的估算器支援標準 XGBoost 中使用的大部分相同參數和引數。
- 類別建構函式、
fit
方法和predict
方法的參數,大部分會與xgboost.sklearn
模組中的參數完全相同。 - 命名、值和預設值大多與 XGBoost 參數中所述的相同。
- 例外狀況是一些不受支援的參數 (例如
gpu_id
、nthread
、sample_weight
、eval_set
) 和已新增的pyspark
估算器特定參數 (例如featuresCol
、labelCol
、use_gpu
、validationIndicatorCol
)。 如需詳細資料,請參閱 XGBoost Python Spark API 文件 (英文)。
分散式訓練
xgboost.spark
模組中定義的 PySpark 估算器支援使用 num_workers
參數的分散式 XGBoost 訓練。 若要使用分散式訓練,請建立分類器或迴歸輸入變數,並將 num_workers
設定為分散式訓練期間並行執行的 Spark 工作數量。 若要使用所有 Spark 工作插槽,請設定 num_workers=sc.defaultParallelism
。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
注意
- 您無法使用
mlflow.xgboost.autolog
搭配分散式 XGBoost。 若要使用 MLflow 記錄 xgboost Spark 模型,請使用mlflow.spark.log_model(spark_xgb_model, artifact_path)
。 - 您無法在已啟用自動調整功能的叢集上使用分散式 XGBoost。 在此彈性調整範例中啟動的新工作者節點無法接收新的工作集,並維持閒置。 如需停用自動調整的指示,請參閱啟用自動調整 (英文)。
啟用疏鬆功能資料集訓練的最佳化
xgboost.spark
模組中定義的 PySpark 估算器支援針對具有疏鬆功能之資料集的培訓最佳化。
若要啟用疏鬆功能集的最佳化,您必須提供資料集給包含類型值 pyspark.ml.linalg.SparseVector
之功能資料行的資料集給 fit
方法,並將估算器參數 enable_sparse_data_optim
設定為 True
。 此外,您必須將 missing
參數設定為 0.0
。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU 訓練
xgboost.spark
模組中定義的 PySpark 估算器支援 GPU 訓練。 將參數 use_gpu
設定為 True
以啟用 GPU 訓練。
注意
針對 XGBoost 分散式訓練中使用的每個 Spark 工作,use_gpu
引數設定為 True
時,只會在訓練中使用一個 GPU。 Databricks 建議針對 Spark 叢集組態 spark.task.resource.gpu.amount
使用預設值 1
。 否則,配置給此 Spark 工作的其他 GPU 會閒置。
例如:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
疑難排解
在多節點訓練期間,如果您收到 NCCL failure: remote process exited or there was a network error
訊息,通常表示 GPU 之間的網路通訊發生問題。 NCCL (NVIDIA 集體通訊程式庫) 無法使用特定網路介面進行 GPU 通訊時,就會發生此問題。
若要解決,請將叢集的 sparkConf spark.executorEnv.NCCL_SOCKET_IFNAME
設定為 eth
。 這基本上會將節點中所有工作者的環境變數 NCCL_SOCKET_IFNAME
設定為 eth
。
範例筆記本
此 Notebook 顯示搭配 Spark MLlib 使用 Python 套件 xgboost.spark
。
PySpark-XGBoost Notebook
已取代 sparkdl.xgboost
模組的移轉指南
- 將
from sparkdl.xgboost import XgboostRegressor
取代為from xgboost.spark import SparkXGBRegressor
,並將from sparkdl.xgboost import XgboostClassifier
取代為from xgboost.spark import SparkXGBClassifier
。 - 將估算器建構函式中的所有參數名稱從 camelCase 樣式變更為 snake_case 樣式。 例如,將
XgboostRegressor(featuresCol=XXX)
變更為SparkXGBRegressor(features_col=XXX)
。 - 參數
use_external_storage
和external_storage_precision
已移除。xgboost.spark
估算器會使用 DMatrix 資料反覆項目 API,以更有效率地使用記憶體。 不再需要使用效率不佳的外部儲存模式。 對於極大型資料集,Databricks 建議您增加num_workers
參數,讓每個訓練工作將資料分割成更小、更容易管理的資料分割區。 請考慮設定num_workers = sc.defaultParallelism
,這會將num_workers
設定為叢集中 Spark 工作位置的總數。 - 針對
xgboost.spark
中定義的估算器,設定num_workers=1
會使用單一 Spark 工作執行模型訓練。 這會利用 Spark 叢集組態設定spark.task.cpus
所指定的 CPU 核心數目,預設為 1。 若要使用更多 CPU 核心來訓練模型,請增加num_workers
或spark.task.cpus
。 您無法為xgboost.spark
中定義的估算器設定nthread
或n_jobs
參數。 此行為與已取代之sparkdl.xgboost
套件中定義的估算器先前行為不同。
將 sparkdl.xgboost
模型轉換成 xgboost.spark
模型
sparkdl.xgboost
模型會以不同於 xgboost.spark
模型的格式儲存,而且有不同的參數設定。 使用下列公用程式函數來轉換模型:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
如果您有包含 sparkdl.xgboost
模型作為最後一個階段的 pyspark.ml.PipelineModel
模型,就可以將 sparkdl.xgboost
模型的階段取代為已轉換的 xgboost.spark
模型。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)