Use Delta Lake in Azure HDInsight on AKS with Apache Spark™ cluster (Preview)
Azure HDInsight on AKS is a managed cloud-based service for big data analytics that helps organizations process large amounts data. This tutorial shows how to use Delta Lake in Azure HDInsight on AKS with Apache Spark™ cluster.
Run Delta Lake scenario in Jupyter Notebook. Create a Jupyter notebook and select "Spark" while creating a notebook, since the following example is in Scala.
- Read NYC Taxi Parquet Data format - List of Parquet files URLs are provided from NYC Taxi & Limousine Commission.
- For each url (file) perform some transformation and store in Delta format.
- Compute the average distance, average cost per mile and average cost from Delta Table using incremental load.
- Store computed value from Step#3 in Delta format into the KPI output folder.
- Create Delta Table on Delta Format output folder (auto refresh).
- The KPI output folder has multiple versions of the average distance and the average cost per mile for a trip.
Delta Lake with Apache Spark Compatibility matrix - Delta Lake, change Delta Lake version based on Apache Spark Version.
%%configure -f
{ "conf": {"spark.jars.packages": ",net.andreinc:mockneat:0.4.8",
These file URLs are from NYC Taxi & Limousine Commission.
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile =>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
The location where you want to create delta format output, change the transformDeltaOutputPath
and avgDeltaOutputKPIPath
variable if necessary,
- to store average KPI in delta formattransformDeltaOutputPath
- store transformed output in delta format
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
Input data is from
, where data downloaded from demonstrate the Time travel and version, load the data individually
Perform transformation and compute following business KPI on incremental load:
- The average distance
- The average cost per mile
- The average cost
Save transformed and KPI data in delta format
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Read delta format using Delta Table
- read transformed data
- read KPI data
import val dtTransformed: = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: = DeltaTable.forPath(avgDeltaOutputKPIPath)
Print Schema
- Print Delta Table Schema for transformed and average KPI data1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Display Last Computed KPI from Data Table
This step displays history of KPI transaction table from _delta_log
- Using Time travel you can view KPI changes after each load
- You can store all version changes in CSV format at
, so that Power BI can read these changes
val dfTxLog ="${transformDeltaOutputPath}/_delta_log/*.json")"add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
