Sdílet prostřednictvím


Running XGBoost on Azure HDInsight

XGBoost is a popular open-source distributed gradient boosting library used by many companies in production. Azure HDInsight is a fully managed Hadoop and Spark solution where you can easily create a fully-managed Spark cluster and with great extensibility. In this blog post, we will walk you through the detailed steps on how to compile and run XGBoost on HDInsight Spark. We also publish all the resources to HDInsight GitHub page.

XGBoost

XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. It implements machine learning algorithms under the Gradient Boosting framework. XGBoost provides a parallel tree boosting (also known as GBDT, GBM) that solve many data science problems in a fast and accurate way. The same code runs on major distributed environment (Hadoop, SGE, MPI) and can solve problems beyond billions of examples.

It is not designed as a generic Machine Learning framework; it is designed as a library very specialized in boosting tree algorithm, and is widely used from production to experimental projects.

For more details on XGBoost, please go to XGBoost GitHub page.

XGBoost with Spark

The following figure illustrates the new pipeline architecture with the latest XGBoost4J-Spark.

With XGBoost4J-Spark, users are able to use both low- and high-level memory abstraction in Spark, i.e. RDD and DataFrame/Dataset. The DataFrame/Dataset abstraction grants the user to manipulate structured datasets and utilize the built-in routines in Spark or User Defined Functions (UDF) to explore the value distribution in columns before they feed data into the machine learning phase in the pipeline. In the following example, the structured sales records can be saved in a JSON file, parsed as DataFrame through Spark's API and feed to train XGBoost model in two lines of Scala code.

Compiling XGBoost

The first step is of course to compile XGBoost. You need to either ssh into your HDInsight cluster (which will be the Head Node basically, see more details here), or use the Jupyter Notebook in HDInsight repository which will be executing on Head Node.

You might see something like this when building xgboost. This is expected and is part of the test case. The final test should pass.

 Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.0.0.15, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=4}
17/08/14 22:41:34 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.RuntimeException: Worker exception.
        at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:72)
            at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:66)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
 #!/bin/bash
sudo apt-get update
sudo apt-get install -y maven git build-essential cmake python-setuptools
git clone --recursive https://github.com/dmlc/xgboost

#builds XGBoost using Maven
cd xgboost/jvm-packages
mvn -DskipTests=true install

#put the compiled packge to shared storage
#put to root folder for simplicity
hadoop fs -put -f xgboost4j-spark/target/xgboost4j-spark-0.7.jar /
hadoop fs -put -f xgboost4j/target/xgboost4j-0.7.jar /
hadoop fs -put -f xgboost4j-example/target/xgboost4j-example-0.7.jar /


#put the sample data to shared storage
hadoop fs -put -f ..//demo/data/agaricus.txt* /

Start a Spark session with XGBoost4J-Spark library loaded

After putting the jars and the files to the Azure Storage, which is shared across all the HDInsight nodes, the next step is to start a Spark session and call the XGBoost libraries. We will use the magic cell in Jupyter Notebook , first we need to load those jar files to the Spark session, so we can use XGBoost APIs in this Jupyter Notebook.

 %%configure -f
{ "jars": ["wasb:///xgboost4j-spark-0.7.jar", "wasb:///xgboost4j-0.7.jar", "wasb:///xgboost4j-example-0.7.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect:2.11.8,org.scala-lang:scala-compiler:2.11.8,org.scala-lang:scala-library:2.11.8"
   }
}

The key thing above is that we need to load 3 jar files: xgboost4j-spark, xgboost4j. xgboost4j-example is optional, but we just include it because it has a few utilities that we will be using later.

We also need to exclude three scala packages, namely scala-reflect, scala-compiler, and scala-library. The reason is that there is some issue between the XGBoost package we compiled and Livy, which is the REST API for Spark applications. There is another GitHub Issue talking about similar issue here.

Import XGBoost and Spark Packages

We then need to load a few packages, and train a very simple model:

 import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoost}
 // create training and testing dataframes
val inputTrainPath = "wasb:///agaricus.txt.train"
val inputTestPath = "wasb:///agaricus.txt.test"
val outputModelPath = "wasb:///XGBoostModelOutput"
val numWorkers = 4

// number of iterations
val numRound = 100

// build dataset
val trainDF = spark.sqlContext.read.format("libsvm").load(inputTrainPath)
val testDF = spark.sqlContext.read.format("libsvm").load(inputTestPath)
// start training
val paramMap = List(
  "eta" -> 0.1f,
  "max_depth" -> 6,
  "objective" -> "binary:logistic").toMap

val xgboostModel = XGBoost.trainWithDataFrame(
  trainDF, paramMap, numRound, nWorkers = numWorkers, useExternalMemory = true)

 

XGBoost can also integrate with Spark Pipeline. So in this case, we can use Spark Pipeline to train the model:

 

 // construct the pipeline       
val pipeline = new Pipeline().setStages(Array(new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100))))
// use the transformed dataframe as training dataset
val xgboostModelPipeLine = pipeline.fit(trainDF)

// predict with the trained model
val xgBoostModelPipelineTransform = xgboostModelPipeLine.transform(testDF)


xgBoostModelPipelineTransform.show()

 

The result is something similar with:

XGBoost can also use Spark to do Hyper Parameter Tuning, where you can specify the parameter ranges and then select the best parameter set:

 import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation.RegressionEvaluator

val xgboostParam = new mutable.HashMap[String, Any]()
xgboostParam += "eta" -> 0.1
xgboostParam += "max_depth" -> 6
xgboostParam += "silent" -> 1
xgboostParam += "ntreelimit" -> 1000
xgboostParam += "objective" -> "reg:linear"
xgboostParam += "subsample" -> 0.8
xgboostParam += "num_round" -> 100

val xgbEstimator = new XGBoostEstimator(xgboostParam.toMap).setFeaturesCol("features").
  setLabelCol("label")
val paramGrid = new ParamGridBuilder().addGrid(xgbEstimator.round, Array(20, 50)).addGrid(xgbEstimator.eta, Array(0.1, 0.4)).build()
val tv = new TrainValidationSplit().setEstimator(xgbEstimator).setEvaluator(new RegressionEvaluator().setLabelCol("label")).setEstimatorParamMaps(paramGrid).setTrainRatio(0.8)  // Use 3+ in practice


val bestModel = tv.fit(trainDF)

 

You can also save the model to Azure Storage by something similar with below. The caveat here is to set sc value which is required by the saveModelAsHadoopFile API. It is the sparkContext type so we need to get it from the default spark (which is of sparkSession type).

 

 implicit val sc = spark.sparkContext 
xgboostModel.saveModelAsHadoopFile(outputModelPath)

 

Summary

In this blog post, we demonstrated how you can run XGBoost on HDInsight Spark from Jupyter Notebooks, tune hyper parameters, and save the final model to attached Azure Storage account. Most of the code above are copied from XGBoost repository, notebly the SparkWithDataFrame example and SparkModelTuningTool example. All the code above is available in HDInsight repository.

Acknowlegement

Thanks to Nan Zhu (zhna@microsoft.com), Software Engineer in Microsoft for helping out and identifying the potential scala conflict between Livy and XGBoost.