다음을 통해 공유


xgboost.spark를 사용하여 XGBoost 모델의 분산 학습

Important

이 기능은 공개 미리 보기 상태입니다.

Python 패키지 xgboost>=1.7에는 새 모듈 xgboost.spark가 포함되어 있습니다. 이 모듈에는 xgboost PySpark 추정기 xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier, xgboost.spark.SparkXGBRanker가 포함됩니다. 이러한 새 클래스는 SparkML 파이프라인에 XGBoost 추정기를 포함할 수 있습니다. API 세부 정보는 XGBoost python spark API 문서를 참조하세요.

요구 사항

Databricks Runtime 12.0 ML 이상

xgboost.spark 매개 변수

xgboost.spark 모듈에 정의된 추정기는 표준 XGBoost에 사용되는 대부분의 동일한 매개 변수 및 인수를 지원합니다.

  • 클래스 생성자, fit 메서드 및 predict 메서드에 대한 매개 변수는 xgboost.sklearn 모듈의 매개 변수와 크게 동일합니다.
  • 이름 지정, 값 및 기본값은 XGBoost 매개 변수설명된 것과 대부분 동일합니다.
  • 예외는 지원되지 않는 몇 가지 매개 변수(예: gpu_id, nthread, sample_weight, eval_set) 및 추가된 pyspark 추정기별 매개 변수(예: featuresCol, labelCol, use_gpu, validationIndicatorCol)입니다. 자세한 내용은 XGBoost Python Spark API 설명서를 참조하세요.

분산 학습

xgboost.spark 모듈에 정의된 PySpark 예측 도구는 num_workers 매개 변수를 사용하여 분산 XGBoost 학습을 지원합니다. 분산 학습을 사용하려면 분류자 또는 회귀자를 만들고 분산 학습 중에 동시에 실행 중인 Spark 작업의 수로 num_workers 설정합니다. 모든 Spark 작업 슬롯을 사용하려면 num_workers=sc.defaultParallelism설정합니다.

예시:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

참고 항목

  • 분산 XGBoost와 함께 mlflow.xgboost.autolog을 사용할 수 없습니다. MLflow를 사용하여 xgboost Spark 모델을 기록하려면 mlflow.spark.log_model(spark_xgb_model, artifact_path)를 사용합니다.
  • 자동 스케일링이 사용하도록 설정된 클러스터에서는 분산 XGBoost를 사용할 수 없습니다. 이 탄력적 스케일링 패러다임에서 시작하는 새 작업자 노드는 새 작업 집합을 수신할 수 없으며 유휴 상태를 유지할 수 없습니다. 자동 스케일링을 사용하지 않도록 설정하는 방법은, 자동 스케일링 사용을 참조하세요.

스파스 기능 데이터 세트 학습에 최적화 사용

xgboost.spark 모듈에 정의된 PySpark 예측 도구는 스파스 기능이 있는 데이터 세트 학습을 위한 최적화를 지원합니다. 스파스 기능 집합의 최적화를 사용하도록 설정하려면 fit 형식의 값으로 구성된 기능 열을 포함하는 pyspark.ml.linalg.SparseVector 메서드에 데이터 세트를 제공하고 추정기 매개 변수 enable_sparse_data_optimTrue설정해야 합니다. 또한 missing 매개 변수를 0.0로 설정해야 한다.

예시:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU 교육

xgboost.spark 모듈에 정의된 PySpark 예측 도구는 GPU에 대한 학습을 지원합니다. 매개변수 use_gpuTrue로 설정하여 GPU 학습을 활성화합니다.

참고 항목

use_gpu 인수가 True로 설정될 때, XGBoost 분산 학습에 사용되는 각 Spark 태스크에서는 학습에 GPU가 하나만 사용됩니다. Databricks는 Spark 클러스터 구성 1에 대해 기본값 spark.task.resource.gpu.amount을 사용하는 것이 좋습니다. 그렇지 않으면 이 Spark 작업에 할당된 추가 GPU가 유휴 상태가 됩니다.

예시:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

문제 해결

다중 노드 학습 중에 NCCL failure: remote process exited or there was a network error 메시지가 발생하면, 일반적으로 GPU 간의 네트워크 통신에 문제가 있음을 나타냅니다. 이 문제는 NCCL(NVIDIA 집단 통신 라이브러리)이 GPU 통신에 특정 네트워크 인터페이스를 사용할 수 없는 경우에 발생합니다.

이 문제를 해결하려면 spark.executorEnv.NCCL_SOCKET_IFNAME 클러스터의 sparkConf를 eth로 설정해야 합니다. 이는 기본적으로 노드의 모든 작업자에 대해 환경 변수 NCCL_SOCKET_IFNAMEeth로 설정합니다.

예제 Notebook

이 Notebook은 Spark MLlib와 함께 xgboost.spark Python 패키지를 사용하는 것을 보여줍니다.

PySpark-XGBoost Notebook

노트북 가져오기

사용되지 않는 sparkdl.xgboost 모듈에 대한 마이그레이션 가이드

  • from sparkdl.xgboost import XgboostRegressorfrom xgboost.spark import SparkXGBRegressor로, from sparkdl.xgboost import XgboostClassifierfrom xgboost.spark import SparkXGBClassifier로 바꿉니다.
  • 예측 도구 생성자의 모든 매개 변수 이름을 camelCase 스타일에서 snake_case 스타일로 변경합니다. 예를 들어 XgboostRegressor(featuresCol=XXX)SparkXGBRegressor(features_col=XXX)로 변경합니다.
  • use_external_storageexternal_storage_precision 매개 변수가 제거되었습니다. xgboost.spark 예측 도구는 DMatrix 데이터 반복 API를 사용하여 메모리를 보다 효율적으로 사용합니다. 더 이상 비효율적인 외부 스토리지 모드를 사용할 필요가 없습니다. 매우 큰 데이터 세트의 경우 Databricks는 num_workers 매개 변수를 늘려 각 학습 태스크가 데이터를 더 작고 관리하기 쉬운 데이터 파티션으로 분할하는 것이 좋습니다. 클러스터의 총 Spark 작업 슬롯 수로 num_workers = sc.defaultParallelism를설정하는, num_workers의 설정을 고려합니다.
  • xgboost.spark에 정의된 예측 도구의 경우 num_workers=1을 설정하면 단일 Spark 작업을 사용하여 모델 학습이 실행됩니다. 이는 기본적으로 1인 Spark 클러스터 구성 설정 spark.task.cpus에서 지정한 CPU 코어 수를 활용합니다. 더 많은 CPU 코어를 사용하여 모델을 학습하려면, num_workers이나 spark.task.cpus를 늘리세요. nthread로 정의된 추정기의 n_jobs 또는 xgboost.spark 매개변수를 설정할 수 없습니다. 이 동작은 사용되지 않는 sparkdl.xgboost 패키지에 정의된 예측 도구의 이전 동작과 다릅니다.

sparkdl.xgboost모델을 xgboost.spark모델로 변환

sparkdl.xgboost모델은 xgboost.spark 모델과 다른 형식 로 저장되며 매개 변수 설정이 다릅니다. 다음 유틸리티 함수를 사용하여 모델을 변환합니다:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

pyspark.ml.PipelineModel 모델을 마지막 단계로 포함하는 sparkdl.xgboost 모델이 있는 경우, sparkdl.xgboost 모델 스테이지 를 변환된 xgboost.spark 모델로 바꿀 수 있습니다.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)