Running XGBoost on Azure HDInsight
XGBoost is a popular open-source distributed gradient boosting library used by many companies in production. Azure HDInsight is a fully managed Hadoop and Spark solution where you can easily create a fully-managed Spark cluster and with great extensibility. In this blog post, we will walk you through the detailed steps on how to compile and run XGBoost on HDInsight Spark. We also publish all the resources to HDInsight GitHub page.
XGBoost
XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. It implements machine learning algorithms under the Gradient Boosting framework. XGBoost provides a parallel tree boosting (also known as GBDT, GBM) that solve many data science problems in a fast and accurate way. The same code runs on major distributed environment (Hadoop, SGE, MPI) and can solve problems beyond billions of examples.
It is not designed as a generic Machine Learning framework; it is designed as a library very specialized in boosting tree algorithm, and is widely used from production to experimental projects.
For more details on XGBoost, please go to XGBoost GitHub page.
XGBoost with Spark
The following figure illustrates the new pipeline architecture with the latest XGBoost4J-Spark.
With XGBoost4J-Spark, users are able to use both low- and high-level memory abstraction in Spark, i.e. RDD and DataFrame/Dataset. The DataFrame/Dataset abstraction grants the user to manipulate structured datasets and utilize the built-in routines in Spark or User Defined Functions (UDF) to explore the value distribution in columns before they feed data into the machine learning phase in the pipeline. In the following example, the structured sales records can be saved in a JSON file, parsed as DataFrame through Spark's API and feed to train XGBoost model in two lines of Scala code.
Compiling XGBoost
The first step is of course to compile XGBoost. You need to either ssh into your HDInsight cluster (which will be the Head Node basically, see more details here), or use the Jupyter Notebook in HDInsight repository which will be executing on Head Node.
You might see something like this when building xgboost. This is expected and is part of the test case. The final test should pass.
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.0.0.15, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=4} 17/08/14 22:41:34 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3) java.lang.RuntimeException: Worker exception. at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:72) at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:66) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
#!/bin/bash sudo apt-get update sudo apt-get install -y maven git build-essential cmake python-setuptools git clone --recursive https://github.com/dmlc/xgboost #builds XGBoost using Maven cd xgboost/jvm-packages mvn -DskipTests=true install #put the compiled packge to shared storage #put to root folder for simplicity hadoop fs -put -f xgboost4j-spark/target/xgboost4j-spark-0.7.jar / hadoop fs -put -f xgboost4j/target/xgboost4j-0.7.jar / hadoop fs -put -f xgboost4j-example/target/xgboost4j-example-0.7.jar / #put the sample data to shared storage hadoop fs -put -f ..//demo/data/agaricus.txt* /
Start a Spark session with XGBoost4J-Spark library loaded
After putting the jars and the files to the Azure Storage, which is shared across all the HDInsight nodes, the next step is to start a Spark session and call the XGBoost libraries. We will use the magic cell in Jupyter Notebook , first we need to load those jar files to the Spark session, so we can use XGBoost APIs in this Jupyter Notebook.
%%configure -f { "jars": ["wasb:///xgboost4j-spark-0.7.jar", "wasb:///xgboost4j-0.7.jar", "wasb:///xgboost4j-example-0.7.jar"], "conf": { "spark.jars.excludes": "org.scala-lang:scala-reflect:2.11.8,org.scala-lang:scala-compiler:2.11.8,org.scala-lang:scala-library:2.11.8" } }
The key thing above is that we need to load 3 jar files: xgboost4j-spark, xgboost4j. xgboost4j-example is optional, but we just include it because it has a few utilities that we will be using later.
We also need to exclude three scala packages, namely scala-reflect, scala-compiler, and scala-library. The reason is that there is some issue between the XGBoost package we compiled and Livy, which is the REST API for Spark applications. There is another GitHub Issue talking about similar issue here.
Import XGBoost and Spark Packages
We then need to load a few packages, and train a very simple model:
import ml.dmlc.xgboost4j.scala.Booster import ml.dmlc.xgboost4j.scala.spark.XGBoost import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoost}
// create training and testing dataframes val inputTrainPath = "wasb:///agaricus.txt.train" val inputTestPath = "wasb:///agaricus.txt.test" val outputModelPath = "wasb:///XGBoostModelOutput" val numWorkers = 4 // number of iterations val numRound = 100 // build dataset val trainDF = spark.sqlContext.read.format("libsvm").load(inputTrainPath) val testDF = spark.sqlContext.read.format("libsvm").load(inputTestPath) // start training val paramMap = List( "eta" -> 0.1f, "max_depth" -> 6, "objective" -> "binary:logistic").toMap val xgboostModel = XGBoost.trainWithDataFrame( trainDF, paramMap, numRound, nWorkers = numWorkers, useExternalMemory = true)
XGBoost can also integrate with Spark Pipeline. So in this case, we can use Spark Pipeline to train the model:
// construct the pipeline val pipeline = new Pipeline().setStages(Array(new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100)))) // use the transformed dataframe as training dataset val xgboostModelPipeLine = pipeline.fit(trainDF) // predict with the trained model val xgBoostModelPipelineTransform = xgboostModelPipeLine.transform(testDF) xgBoostModelPipelineTransform.show()
The result is something similar with:
XGBoost can also use Spark to do Hyper Parameter Tuning, where you can specify the parameter ranges and then select the best parameter set:
import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.spark.ml.tuning._ import org.apache.spark.ml.evaluation.RegressionEvaluator val xgboostParam = new mutable.HashMap[String, Any]() xgboostParam += "eta" -> 0.1 xgboostParam += "max_depth" -> 6 xgboostParam += "silent" -> 1 xgboostParam += "ntreelimit" -> 1000 xgboostParam += "objective" -> "reg:linear" xgboostParam += "subsample" -> 0.8 xgboostParam += "num_round" -> 100 val xgbEstimator = new XGBoostEstimator(xgboostParam.toMap).setFeaturesCol("features"). setLabelCol("label") val paramGrid = new ParamGridBuilder().addGrid(xgbEstimator.round, Array(20, 50)).addGrid(xgbEstimator.eta, Array(0.1, 0.4)).build() val tv = new TrainValidationSplit().setEstimator(xgbEstimator).setEvaluator(new RegressionEvaluator().setLabelCol("label")).setEstimatorParamMaps(paramGrid).setTrainRatio(0.8) // Use 3+ in practice val bestModel = tv.fit(trainDF)
You can also save the model to Azure Storage by something similar with below. The caveat here is to set sc value which is required by the saveModelAsHadoopFile API. It is the sparkContext type so we need to get it from the default spark (which is of sparkSession type).
implicit val sc = spark.sparkContext xgboostModel.saveModelAsHadoopFile(outputModelPath)
Summary
In this blog post, we demonstrated how you can run XGBoost on HDInsight Spark from Jupyter Notebooks, tune hyper parameters, and save the final model to attached Azure Storage account. Most of the code above are copied from XGBoost repository, notebly the SparkWithDataFrame example and SparkModelTuningTool example. All the code above is available in HDInsight repository.
Acknowlegement
Thanks to Nan Zhu (zhna@microsoft.com), Software Engineer in Microsoft for helping out and identifying the potential scala conflict between Livy and XGBoost.