Utiliser Delta Lake dans un cluster Azure HDInsight sur AKS avec Apache Spark™ (préversion)
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base sera disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.
Azure HDInsight sur AKS est un service informatique managé pour l’analytique du Big Data qui aide les organisations à traiter de grandes quantités de données. Ce tutoriel montre comment utiliser Delta Lake dans un cluster Azure HDInsight sur AKS avec Apache Spark™.
Configuration requise
Exécutez le scénario Delta Lake dans Jupyter Notebook. Créez un notebook Jupyter et sélectionnez « Spark » lors de la création du notebook, car l’exemple suivant est en Scala.
Scénario
- Lire le format de données Parquet NYC Taxi – La liste des URL de fichiers Parquet provient de NYC Taxi et Limousine Commission.
- Pour chaque URL (fichier), effectuer une transformation et la stocker au format Delta.
- Calculer la distance moyenne, le coût moyen par mile et le coût moyen à partir de Delta Table à l’aide d’une charge incrémentielle.
- Stocker la valeur calculée de l’étape 3 au format Delta dans le dossier de sortie d’indicateurs de performance clés.
- Créer un dossier de sortie Delta Table au format Delta (actualisation automatique).
- Le dossier de sortie d’indicateurs de performance clés contient plusieurs versions de la distance moyenne et du coût moyen par mile pour un trajet.
Fournir des configurations requises pour le Delta Lake
Matrice de compatibilité Delta Lake avec Apache Spark : Delta Lake, modifiez la version Delta Lake en fonction de la version Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Lister le fichier de données
Remarque
Ces URL de fichiers proviennent de NYC Taxi et Limousine Commission.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Créer le répertoire de sortie
Emplacement où vous souhaitez créer une sortie au format Delta. Modifiez les variables transformDeltaOutputPath
et avgDeltaOutputKPIPath
si nécessaire.
avgDeltaOutputKPIPath
: pour stocker l’indicateur de performance clé moyen au format DeltatransformDeltaOutputPath
: pour stocker la sortie transformée au format Delta
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Créer des données au format Delta à partir du format Parquet
Les données d’entrée proviennent de
listOfDataFile
, où les données sont téléchargées à partir de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.Pour illustrer le voyage dans le temps et la version, chargez les données individuellement.
Effectuez une transformation et calculez les indicateurs de performance clés métier suivants sur la charge incrémentielle :
- Distance moyenne
- Coût moyen par mile
- Coût moyen
Enregistrez les données transformées et les données d’indicateurs de performance clés au format Delta.
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Lisez le format Delta à l’aide de Delta Table.
- Lisez les données transformées.
- Lisez les données d’indicateurs de performance clés.
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Imprimez le schéma.
- Imprimez le schéma Delta Table pour les données d’indicateurs de performance clés moyennes et les données transformées.
// tranform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Affichez le dernier indicateur de performance clé calculé à partir de Data Table.
dtAvgKpi.toDF.show(false)
Afficher l’historique des indicateurs de performance clés calculés
Cette étape affiche l’historique de la table de transactions d’indicateurs de performance clés à partir de _delta_log
.
dtAvgKpi.history().show(false)
Afficher les données d’indicateurs de performance clés après chaque chargement de données
- À l’aide du voyage dans le temps, vous pouvez afficher les modifications des indicateurs de performance clés après chaque chargement.
- Vous pouvez stocker toutes les modifications de version au format CSV dans
avgMoMKPIChangePath
, afin que Power BI puisse lire ces modifications.
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Référence
- Apache, Apache Spark, Spark et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).