Partager via


Utiliser Delta Lake dans un cluster Azure HDInsight sur AKS avec Apache Spark™ (préversion)

Remarque

Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.

Seul le support de base sera disponible jusqu’à la date de mise hors service.

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.

Azure HDInsight sur AKS est un service informatique managé pour l’analytique du Big Data qui aide les organisations à traiter de grandes quantités de données. Ce tutoriel montre comment utiliser Delta Lake dans un cluster Azure HDInsight sur AKS avec Apache Spark™.

Configuration requise

  1. Créer un cluster Apache Spark™ dans Azure HDInsight sur AKS

    Capture d’écran montrant la création d’un cluster spark.

  2. Exécutez le scénario Delta Lake dans Jupyter Notebook. Créez un notebook Jupyter et sélectionnez « Spark » lors de la création du notebook, car l’exemple suivant est en Scala.

    Capture d’écran montrant comment exécuter le scénario du lac delta.

Scénario

  • Lire le format de données Parquet NYC Taxi – La liste des URL de fichiers Parquet provient de NYC Taxi et Limousine Commission.
  • Pour chaque URL (fichier), effectuer une transformation et la stocker au format Delta.
  • Calculer la distance moyenne, le coût moyen par mile et le coût moyen à partir de Delta Table à l’aide d’une charge incrémentielle.
  • Stocker la valeur calculée de l’étape 3 au format Delta dans le dossier de sortie d’indicateurs de performance clés.
  • Créer un dossier de sortie Delta Table au format Delta (actualisation automatique).
  • Le dossier de sortie d’indicateurs de performance clés contient plusieurs versions de la distance moyenne et du coût moyen par mile pour un trajet.

Fournir des configurations requises pour le Delta Lake

Matrice de compatibilité Delta Lake avec Apache Spark : Delta Lake, modifiez la version Delta Lake en fonction de la version Apache Spark.

%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
  }

Capture d’écran montrant les configurations des lacs delta.

Lister le fichier de données

Remarque

Ces URL de fichiers proviennent de NYC Taxi et Limousine Commission.

import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
    
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
    
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
    )
    
// Add a file to be downloaded with this Spark job on every node.
        val listOfDataFile = fileUrls.map(url=>{
        val urlPath=url.split("/") 
        val fileName = urlPath(urlPath.size-1)
        val urlSaveFilePath = s"/tmp/${fileName}"
        val hdfsSaveFilePath = s"/tmp/${fileName}"
        val file = new File(urlSaveFilePath)
        FileUtils.copyURLToFile(new URL(url), file)
        // copy local file to HDFS /tmp/${fileName}
        // use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
        fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
        DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})

Capture d’écran montrant comment lancer l’application Spark.

Créer le répertoire de sortie

Emplacement où vous souhaitez créer une sortie au format Delta. Modifiez les variables transformDeltaOutputPath et avgDeltaOutputKPIPath si nécessaire.

  • avgDeltaOutputKPIPath : pour stocker l’indicateur de performance clé moyen au format Delta
  • transformDeltaOutputPath : pour stocker la sortie transformée au format Delta
import org.apache.hadoop.fs._

// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"

// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
    val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path =  new Path(dataSourcePath)
    if(!fs.exists(path) && !fs.isDirectory(path)) {
        fs.mkdirs(path)
    }
}

createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)

Capture d’écran montrant comment créer un répertoire de sortie.

Créer des données au format Delta à partir du format Parquet

  1. Les données d’entrée proviennent de listOfDataFile, où les données sont téléchargées à partir de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

  2. Pour illustrer le voyage dans le temps et la version, chargez les données individuellement.

  3. Effectuez une transformation et calculez les indicateurs de performance clés métier suivants sur la charge incrémentielle :

    1. Distance moyenne
    2. Coût moyen par mile
    3. Coût moyen
  4. Enregistrez les données transformées et les données d’indicateurs de performance clés au format Delta.

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.DataFrame
    
    // UDF to compute sum of value paid by customer
    def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => {
        val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips
        total
    })
    
    // read parquet file from spark conf with given file input
    // transform data to compute total amount
    // compute kpi for the given file/batch data
    def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = {
        val df = spark.read.format(format).load(fileName)
        val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips"))
        // union with old data to compute KPI
        val dfFullLoad= oldData match {
            case Some(odf)=>
                    dfNewLoad.union(odf)
            case _ =>
                    dfNewLoad
        }
        dfFullLoad.createOrReplaceTempView("tempFullLoadCompute")
        val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute")
        // save only new transformed data
        dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath)
        //save compute KPI
        dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath)
        // return incremental dataframe for next set of load
        dfFullLoad
    }
    
    // load data for each data file, use last dataframe for KPI compute with the current load
    def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = {
        if(dataFile.isEmpty) {    
            true
        } else {
            val nextDataFile = dataFile.head
            val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF)
            loadData(dataFile.tail,Some(newFullDF))
        }
    }
    val starTime=System.currentTimeMillis()
    loadData(listOfDataFile,None)
    println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
    

    Capture d’écran montrant comment utiliser des données au format delta.

  5. Lisez le format Delta à l’aide de Delta Table.

    1. Lisez les données transformées.
    2. Lisez les données d’indicateurs de performance clés.
    import io.delta.tables._
    val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath)
    val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
    

    Capture d’écran montrant les données KPI lues.

  6. Imprimez le schéma.

    1. Imprimez le schéma Delta Table pour les données d’indicateurs de performance clés moyennes et les données transformées.
    // tranform data schema
    dtTransformed.toDF.printSchema
    // Average KPI Data Schema
    dtAvgKpi.toDF.printSchema
    

    Capture d’écran montrant l’impression du schéma.

  7. Affichez le dernier indicateur de performance clé calculé à partir de Data Table.

    dtAvgKpi.toDF.show(false)

    Capture d’écran montrant le dernier ICP calculé à partir du tableau de données.

Afficher l’historique des indicateurs de performance clés calculés

Cette étape affiche l’historique de la table de transactions d’indicateurs de performance clés à partir de _delta_log.

dtAvgKpi.history().show(false)

Capture d’écran montrant l’historique des ICP calculés.

Afficher les données d’indicateurs de performance clés après chaque chargement de données

  1. À l’aide du voyage dans le temps, vous pouvez afficher les modifications des indicateurs de performance clés après chaque chargement.
  2. Vous pouvez stocker toutes les modifications de version au format CSV dans avgMoMKPIChangePath, afin que Power BI puisse lire ces modifications.
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)

Capture d’écran des données d’indicateurs de performance clés après chaque chargement de données.

Référence