在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集 (預覽)
重要
此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight 上提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群上取得更多更新。
AKS 上的 Azure HDInsight 是適用於巨量數據分析的受控雲端式服務,可協助組織處理大量數據。 本教學課程示範如何在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集。
必要條件
在 AKS 上的 Azure HDInsight 中建立 Apache Spark™ 叢集
在 Jupyter Notebook 中執行 Delta Lake 案例。 建立 Jupyter Notebook 並在建立筆記本時選取 「Spark」,因為下列範例位於 Scala 中。
案例
- 讀取 NYC 計程車 Parquet 數據格式 - 從 NYC 計程車和豪華轎車委員會提供 Parquet 檔案 URL 清單。
- 針對每個 URL(檔案)執行一些轉換,並以 Delta 格式儲存。
- 使用累加負載計算差異數據表的平均距離、每英里的平均成本,以及平均成本。
- 將步驟#3 的計算值以差異格式儲存至 KPI 輸出資料夾。
- 在差異格式輸出資料夾上建立差異資料表 (自動重新整理)。
- KPI 輸出資料夾具有多個版本的平均距離,以及車程每英里的平均成本。
提供 Delta Lake 的需求設定
Delta Lake 與 Apache Spark 相容性矩陣 - Delta Lake,根據 Apache Spark 版本變更 Delta Lake 版本。
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
列出數據檔
注意
這些檔案 URL 來自 NYC 計程車和豪華轎車委員會。
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
建立輸出目錄
您想要建立差異格式輸出的位置,視需要變更 transformDeltaOutputPath
和 avgDeltaOutputKPIPath
變數,
avgDeltaOutputKPIPath
- 以差異格式儲存平均 KPItransformDeltaOutputPath
- 以差異格式儲存轉換的輸出
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
從 Parquet 格式建立差異格式數據
輸入資料來自
listOfDataFile
,其中從 下載數據 https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page若要示範時間移動和版本,請個別載入數據
在累加式負載上執行轉換和計算下列商務 KPI:
- 平均距離
- 每英里的平均成本
- 平均成本
以差異格式儲存已轉換和 KPI 資料
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
使用 Delta 資料表讀取差異格式
- 讀取轉換的數據
- 讀取 KPI 數據
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
列印架構
- 列印已轉換和平均 KPI 資料的差異資料表架構1。
// tranform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
顯示數據表中上次計算的 KPI
dtAvgKpi.toDF.show(false)
顯示計算 KPI 歷程記錄
此步驟會顯示 KPI 事務數據表的歷程記錄 _delta_log
dtAvgKpi.history().show(false)
在每個數據載入之後顯示 KPI 數據
- 使用時間移動,您可以在每次載入之後檢視 KPI 變更
- 您可以將所有版本變更儲存在
avgMoMKPIChangePath
的 CSV 格式,讓 Power BI 可以讀取這些變更
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
參考
- Apache、Apache Spark、Spark 和相關聯的開放原始碼專案名稱為 Apache Software Foundation (ASF) 的商標。
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應