xgboost.spark
를 사용하여 XGBoost 모델의 분산 학습
Important
이 기능은 공개 미리 보기 상태입니다.
Python 패키지 xgboost>=1.7에는 새 모듈 xgboost.spark
가 포함되어 있습니다. 이 모듈에는 xgboost PySpark 추정기 xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
, xgboost.spark.SparkXGBRanker
가 포함됩니다. 이러한 새 클래스는 SparkML 파이프라인에 XGBoost 추정기를 포함할 수 있습니다. API 세부 정보는 XGBoost python spark API 문서를 참조하세요.
요구 사항
Databricks Runtime 12.0 ML 이상
xgboost.spark
매개 변수
xgboost.spark
모듈에 정의된 추정기는 표준 XGBoost에 사용되는 대부분의 동일한 매개 변수 및 인수를 지원합니다.
- 클래스 생성자,
fit
메서드 및predict
메서드에 대한 매개 변수는xgboost.sklearn
모듈의 매개 변수와 크게 동일합니다. - 이름 지정, 값 및 기본값은 XGBoost 매개 변수설명된 것과 대부분 동일합니다.
- 예외는 지원되지 않는 몇 가지 매개 변수(예:
gpu_id
,nthread
,sample_weight
,eval_set
) 및 추가된pyspark
추정기별 매개 변수(예:featuresCol
,labelCol
,use_gpu
,validationIndicatorCol
)입니다. 자세한 내용은 XGBoost Python Spark API 설명서를 참조하세요.
분산 학습
xgboost.spark
모듈에 정의된 PySpark 예측 도구는 num_workers
매개 변수를 사용하여 분산 XGBoost 학습을 지원합니다. 분산 학습을 사용하려면 분류자 또는 회귀자를 만들고 분산 학습 중에 동시에 실행 중인 Spark 작업의 수로 num_workers
설정합니다. 모든 Spark 작업 슬롯을 사용하려면 num_workers=sc.defaultParallelism
설정합니다.
예시:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
참고 항목
- 분산 XGBoost와 함께
mlflow.xgboost.autolog
을 사용할 수 없습니다. MLflow를 사용하여 xgboost Spark 모델을 기록하려면mlflow.spark.log_model(spark_xgb_model, artifact_path)
를 사용합니다. - 자동 스케일링이 사용하도록 설정된 클러스터에서는 분산 XGBoost를 사용할 수 없습니다. 이 탄력적 스케일링 패러다임에서 시작하는 새 작업자 노드는 새 작업 집합을 수신할 수 없으며 유휴 상태를 유지할 수 없습니다. 자동 스케일링을 사용하지 않도록 설정하는 방법은, 자동 스케일링 사용을 참조하세요.
스파스 기능 데이터 세트 학습에 최적화 사용
xgboost.spark
모듈에 정의된 PySpark 예측 도구는 스파스 기능이 있는 데이터 세트 학습을 위한 최적화를 지원합니다.
스파스 기능 집합의 최적화를 사용하도록 설정하려면 fit
형식의 값으로 구성된 기능 열을 포함하는 pyspark.ml.linalg.SparseVector
메서드에 데이터 세트를 제공하고 추정기 매개 변수 enable_sparse_data_optim
True
설정해야 합니다. 또한 missing
매개 변수를 0.0
로 설정해야 한다.
예시:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU 교육
xgboost.spark
모듈에 정의된 PySpark 예측 도구는 GPU에 대한 학습을 지원합니다. 매개변수 use_gpu
를 True
로 설정하여 GPU 학습을 활성화합니다.
참고 항목
use_gpu
인수가 True
로 설정될 때, XGBoost 분산 학습에 사용되는 각 Spark 태스크에서는 학습에 GPU가 하나만 사용됩니다. Databricks는 Spark 클러스터 구성 1
에 대해 기본값 spark.task.resource.gpu.amount
을 사용하는 것이 좋습니다. 그렇지 않으면 이 Spark 작업에 할당된 추가 GPU가 유휴 상태가 됩니다.
예시:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
문제 해결
다중 노드 학습 중에 NCCL failure: remote process exited or there was a network error
메시지가 발생하면, 일반적으로 GPU 간의 네트워크 통신에 문제가 있음을 나타냅니다. 이 문제는 NCCL(NVIDIA 집단 통신 라이브러리)이 GPU 통신에 특정 네트워크 인터페이스를 사용할 수 없는 경우에 발생합니다.
이 문제를 해결하려면 spark.executorEnv.NCCL_SOCKET_IFNAME
클러스터의 sparkConf를 eth
로 설정해야 합니다. 이는 기본적으로 노드의 모든 작업자에 대해 환경 변수 NCCL_SOCKET_IFNAME
을 eth
로 설정합니다.
예제 Notebook
이 Notebook은 Spark MLlib와 함께 xgboost.spark
Python 패키지를 사용하는 것을 보여줍니다.
PySpark-XGBoost Notebook
사용되지 않는 sparkdl.xgboost
모듈에 대한 마이그레이션 가이드
-
from sparkdl.xgboost import XgboostRegressor
를from xgboost.spark import SparkXGBRegressor
로,from sparkdl.xgboost import XgboostClassifier
를from xgboost.spark import SparkXGBClassifier
로 바꿉니다. - 예측 도구 생성자의 모든 매개 변수 이름을 camelCase 스타일에서 snake_case 스타일로 변경합니다. 예를 들어
XgboostRegressor(featuresCol=XXX)
를SparkXGBRegressor(features_col=XXX)
로 변경합니다. -
use_external_storage
및external_storage_precision
매개 변수가 제거되었습니다.xgboost.spark
예측 도구는 DMatrix 데이터 반복 API를 사용하여 메모리를 보다 효율적으로 사용합니다. 더 이상 비효율적인 외부 스토리지 모드를 사용할 필요가 없습니다. 매우 큰 데이터 세트의 경우 Databricks는num_workers
매개 변수를 늘려 각 학습 태스크가 데이터를 더 작고 관리하기 쉬운 데이터 파티션으로 분할하는 것이 좋습니다. 클러스터의 총 Spark 작업 슬롯 수로num_workers = sc.defaultParallelism
를설정하는,num_workers
의 설정을 고려합니다. -
xgboost.spark
에 정의된 예측 도구의 경우num_workers=1
을 설정하면 단일 Spark 작업을 사용하여 모델 학습이 실행됩니다. 이는 기본적으로 1인 Spark 클러스터 구성 설정spark.task.cpus
에서 지정한 CPU 코어 수를 활용합니다. 더 많은 CPU 코어를 사용하여 모델을 학습하려면,num_workers
이나spark.task.cpus
를 늘리세요.nthread
로 정의된 추정기의n_jobs
또는xgboost.spark
매개변수를 설정할 수 없습니다. 이 동작은 사용되지 않는sparkdl.xgboost
패키지에 정의된 예측 도구의 이전 동작과 다릅니다.
sparkdl.xgboost
모델을 xgboost.spark
모델로 변환
sparkdl.xgboost
모델은 xgboost.spark
모델과 다른 형식 로 저장되며 매개 변수 설정이 다릅니다. 다음 유틸리티 함수를 사용하여 모델을 변환합니다:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
pyspark.ml.PipelineModel
모델을 마지막 단계로 포함하는 sparkdl.xgboost
모델이 있는 경우, sparkdl.xgboost
모델 스테이지 를 변환된 xgboost.spark
모델로 바꿀 수 있습니다.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)