Condividi tramite


Usare Delta Lake in cluster Azure HDInsight su AKS con Apache Spark™ (anteprima)

Nota

Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.

Solo il supporto di base sarà disponibile fino alla data di ritiro.

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.

Azure HDInsight su AKS è un servizio gestito basato sul cloud per l'analisi di Big Data che consente alle organizzazioni di elaborare grandi quantità di dati. Questa esercitazione illustra come usare Delta Lake in cluster Azure HDInsight su AKS con Apache Spark™.

Prerequisito

  1. Creare un cluster Apache Spark™ in Azure HDInsight su AKS

    Screenshot che mostra la creazione del cluster Spark.

  2. Eseguire uno scenario Delta Lake in Jupyter Notebook. Creare un Jupyter Notebook e selezionare "Spark" durante la creazione di un notebook, poiché l'esempio seguente si trova in Scala.

    Screenshot che mostra come eseguire uno scenario Delta Lake.

Scenario

  • Leggere il formato dati Parquet per NYC Taxi: l'elenco degli URL dei file Parquet è disponibile presso la NYC Taxi & Limousine Commission.
  • Per ogni URL (file) eseguire alcune trasformazioni e archiviare in formato Delta.
  • Calcolare la distanza media, il costo medio per miglio e il costo medio della tabella Delta usando il carico incrementale.
  • Archiviare il valore calcolato dal passaggio 3 in formato Delta nella cartella di output dell'indicatore KPI.
  • Creare una tabella Delta nella cartella di output del formato Delta (aggiornamento automatico).
  • La cartella di output KPI include più versioni della distanza media e il costo medio per miglio per una corsa.

Fornire le configurazioni necessarie per il Delta Lake

Matrice di compatibilità Delta Lake con Apache Spark: Delta Lake, modificare la versione di Delta Lake in base alla versione di Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Screenshot che mostra le configurazioni Delta Lake.

Elencare il file di dati

Nota

Questi URL di file provengono dalla NYC Taxi & Limousine Commission.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Screenshot che mostra come avviare l'applicazione Spark.

Creare la directory di output

Percorso in cui si vuole creare l'output del formato differenziale, modificare le variabili transformDeltaOutputPath e avgDeltaOutputKPIPath se necessario.

  • avgDeltaOutputKPIPath: per archiviare l'indicatore KPI medio in formato differenziale
  • transformDeltaOutputPath: archiviare l'output trasformato in formato differenziale
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Screenshot che mostra come creare la directory di output.

Creare dati in formato Delta dal formato Parquet

  1. I dati di input provengono da listOfDataFile, dove i dati sono scaricati da https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

  2. Per illustrare il tempo di viaggio e la versione, caricare i dati singolarmente

  3. Eseguire la trasformazione e calcolare l'indicatore KPI aziendale seguente al caricamento incrementale:

    1. Distanza media
    2. Costo medio per miglio
    3. Costo medio
  4. Salvare i dati trasformati e KPI in formato differenziale

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Screenshot che mostra come usare i dati in formato differenziale.

  5. Leggere il formato differenziale usando la tabella Delta

    1. leggere i dati trasformati
    2. leggere i dati KPI
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Screenshot che mostra la lettura dei dati KPI.

  6. Schema di stampa

    1. Schema tabella Delta di stampa per i dati KPI trasformati e medi1.
    // tranform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Screenshot che mostra l'output dello schema di stampa.

  7. Visualizzare l'ultimo indicatore KPI calcolato dalla tabella dati

    dtAvgKpi.toDF.show(false)

    Screenshot che mostra l'ultimo indicatore KPI calcolato dalla tabella dati.

Visualizzare la cronologia degli indicatori KPI calcolati

Questo passaggio visualizza la cronologia della tabella delle transazioni KPI da _delta_log

dtAvgKpi.history().show(false)

Screenshot che mostra la cronologia degli indicatori KPI calcolati.

Visualizzare i dati KPI dopo ogni caricamento dei dati

  1. Usando lo spostamento cronologico è possibile visualizzare le modifiche KPI dopo ogni caricamento
  2. È possibile archiviare tutte le modifiche alla versione in formato CSV in avgMoMKPIChangePath, in modo che Power BI possa leggere queste modifiche
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Screenshot dei dati KPI dopo ogni caricamento dei dati.

Riferimento