共用方式為


在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集 (預覽)

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

AKS 上的 Azure HDInsight 是適用於巨量數據分析的受控雲端式服務,可協助組織處理大量數據。 本教學課程示範如何在 AKS 上使用 Azure HDInsight 中的 Delta Lake 搭配 Apache Spark™ 叢集。

必要條件

  1. 在 AKS 上的 Azure HDInsight 中建立 Apache Spark™ 叢集

    顯示 Spark 叢集建立的螢幕快照。

  2. 在 Jupyter Notebook 中執行 Delta Lake 案例。 建立 Jupyter Notebook 並在建立筆記本時選取 「Spark」,因為下列範例位於 Scala 中。

    顯示如何執行 Delta Lake 案例的螢幕快照。

案例

  • 讀取 NYC 計程車 Parquet 數據格式 - 從 NYC 計程車和豪華轎車委員會提供 Parquet 檔案 URL 清單。
  • 針對每個 URL(檔案)執行一些轉換,並以 Delta 格式儲存。
  • 使用累加負載計算差異數據表的平均距離、每英里的平均成本,以及平均成本。
  • 將步驟#3 的計算值以差異格式儲存至 KPI 輸出資料夾。
  • 在差異格式輸出資料夾上建立差異資料表 (自動重新整理)。
  • KPI 輸出資料夾具有多個版本的平均距離,以及車程每英里的平均成本。

提供 Delta Lake 的需求設定

Delta Lake 與 Apache Spark 相容性矩陣 - Delta Lake,根據 Apache Spark 版本變更 Delta Lake 版本。

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

顯示差異湖組態的螢幕快照。

列出數據檔

注意

這些檔案 URL 來自 NYC 計程車和豪華轎車委員會

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

顯示如何啟動 Spark 應用程式的螢幕快照。

建立輸出目錄

您想要建立差異格式輸出的位置,視需要變更 transformDeltaOutputPathavgDeltaOutputKPIPath 變數,

  • avgDeltaOutputKPIPath - 以差異格式儲存平均 KPI
  • transformDeltaOutputPath - 以差異格式儲存轉換的輸出
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

顯示如何建立 output-directory 的螢幕快照。

從 Parquet 格式建立差異格式數據

  1. 輸入資料來自 listOfDataFile,其中從 下載數據 https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

  2. 若要示範時間移動和版本,請個別載入數據

  3. 在累加式負載上執行轉換和計算下列商務 KPI:

    1. 平均距離
    2. 每英里的平均成本
    3. 平均成本
  4. 以差異格式儲存已轉換和 KPI 資料

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    顯示如何以差異格式數據顯示的螢幕快照。

  5. 使用 Delta 資料表讀取差異格式

    1. 讀取轉換的數據
    2. 讀取 KPI 數據
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    顯示讀取 KPI 資料的螢幕快照。

  6. 列印架構

    1. 列印已轉換和平均 KPI 資料的差異資料表架構1。
    // tranform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    顯示列印架構輸出的螢幕快照。

  7. 顯示數據表中上次計算的 KPI

    dtAvgKpi.toDF.show(false)

    顯示數據表中上次計算 KPI 的螢幕快照。

顯示計算 KPI 歷程記錄

此步驟會顯示 KPI 事務數據表的歷程記錄 _delta_log

dtAvgKpi.history().show(false)

顯示計算 KPI 歷程記錄的螢幕快照。

在每個數據載入之後顯示 KPI 數據

  1. 使用時間移動,您可以在每次載入之後檢視 KPI 變更
  2. 您可以將所有版本變更儲存在 avgMoMKPIChangePath 的 CSV 格式,讓 Power BI 可以讀取這些變更
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

每個數據載入之後的 KPI 資料螢幕快照。

參考