Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Obtenga más información con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información de la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
Azure HDInsight en AKS es un servicio administrado basado en la nube para el análisis de macrodatos que ayuda a las organizaciones a procesar grandes cantidades de datos. En este tutorial se muestra cómo usar Delta Lake en Azure HDInsight en AKS con el clúster de Apache Spark™.
Prerrequisito
Crear un clúster de Apache Spark™ en Azure HDInsight en AKS
Ejecute el escenario de Delta Lake en Jupyter Notebook. Cree un cuaderno de Jupyter Notebook y seleccione "Spark" al crear un cuaderno, ya que el ejemplo siguiente está en Scala.
Escenario
- Leer formato de datos Parquet de Taxi de NYC - Lista de URLs de archivos Parquet proporcionada por Taxi de NYC & Comisión de Limusinas.
- Para cada dirección URL (archivo) realiza una transformación y almacena en formato Delta.
- Calcule la distancia media, el costo promedio por milla y el costo medio de la tabla Delta mediante la carga incremental.
- Almacene el valor calculado del paso 3 en formato Delta en la carpeta de salida de KPI.
- Cree una tabla delta en la carpeta de salida formato delta (actualización automática).
- La carpeta de salida de KPI tiene varias versiones de la distancia media y el costo medio por milla de un viaje.
Proporcionar configuraciones necesarias para el lago delta
Matriz de compatibilidad de Delta Lake con Apache Spark: Delta Lake, cambie la versión de Delta Lake basada en la versión de Apache Spark.
%%configure -f
{ "conf": {"spark.jars.packages": "io.delta:delta-core_2.12:1.0.1,net.andreinc:mockneat:0.4.8",
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
Lista el archivo de datos
Nota
Estas direcciones URL de archivo proceden de NYC Taxi & Limousine Commission.
import java.io.File
import java.net.URL
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs._
// data file object is being used for future reference in order to read parquet files from HDFS
case class DataFile(name:String, downloadURL:String, hdfsPath:String)
// get Hadoop file system
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val fileUrls= List(
"https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet"
)
// Add a file to be downloaded with this Spark job on every node.
val listOfDataFile = fileUrls.map(url=>{
val urlPath=url.split("/")
val fileName = urlPath(urlPath.size-1)
val urlSaveFilePath = s"/tmp/${fileName}"
val hdfsSaveFilePath = s"/tmp/${fileName}"
val file = new File(urlSaveFilePath)
FileUtils.copyURLToFile(new URL(url), file)
// copy local file to HDFS /tmp/${fileName}
// use FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
fs.copyFromLocalFile(true,true,new org.apache.hadoop.fs.Path(urlSaveFilePath),new org.apache.hadoop.fs.Path(hdfsSaveFilePath))
DataFile(urlPath(urlPath.size-1),url, hdfsSaveFilePath)
})
Creación de un directorio de salida
La ubicación en la que desea crear la salida en formato delta, cambie las variables transformDeltaOutputPath
y avgDeltaOutputKPIPath
si es necesario.
-
avgDeltaOutputKPIPath
: para almacenar el KPI medio en formato delta -
transformDeltaOutputPath
: almacenar la salida transformada en formato delta
import org.apache.hadoop.fs._
// this is used to store source data being transformed and stored delta format
val transformDeltaOutputPath = "/nyctaxideltadata/transform"
// this is used to store Average KPI data in delta format
val avgDeltaOutputKPIPath = "/nyctaxideltadata/avgkpi"
// this is used for POWER BI reporting to show Month on Month change in KPI (not in delta format)
val avgMoMKPIChangePath = "/nyctaxideltadata/avgMoMKPIChangePath"
// create directory/folder if not exist
def createDirectory(dataSourcePath: String) = {
val fs:FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(dataSourcePath)
if(!fs.exists(path) && !fs.isDirectory(path)) {
fs.mkdirs(path)
}
}
createDirectory(transformDeltaOutputPath)
createDirectory(avgDeltaOutputKPIPath)
createDirectory(avgMoMKPIChangePath)
Crear datos en formato Delta a partir del formato Parquet
Los datos de entrada proceden de
listOfDataFile
, donde se han descargado datos de https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page.Para mostrar el viaje en el tiempo y la versión, cargue los datos individualmente.
Realice la transformación y calcule los siguientes KPI empresariales en la carga incremental:
- Distancia media
- El costo promedio por milla
- Costo medio
Guardar datos transformados y KPI en formato delta
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.DataFrame // UDF to compute sum of value paid by customer def totalCustPaid = udf((basePassengerFare:Double, tolls:Double,bcf:Double,salesTax:Double,congSurcharge:Double,airportFee:Double, tips:Double) => { val total = basePassengerFare + tolls + bcf + salesTax + congSurcharge + airportFee + tips total }) // read parquet file from spark conf with given file input // transform data to compute total amount // compute kpi for the given file/batch data def readTransformWriteDelta(fileName:String, oldData:Option[DataFrame], format:String="parquet"):DataFrame = { val df = spark.read.format(format).load(fileName) val dfNewLoad= df.withColumn("total_amount",totalCustPaid($"base_passenger_fare",$"tolls",$"bcf",$"sales_tax",$"congestion_surcharge",$"airport_fee",$"tips")) // union with old data to compute KPI val dfFullLoad= oldData match { case Some(odf)=> dfNewLoad.union(odf) case _ => dfNewLoad } dfFullLoad.createOrReplaceTempView("tempFullLoadCompute") val dfKpiCompute = spark.sql("SELECT round(avg(trip_miles),2) AS avgDist,round(avg(total_amount/trip_miles),2) AS avgCostPerMile,round(avg(total_amount),2) avgCost FROM tempFullLoadCompute") // save only new transformed data dfNewLoad.write.mode("overwrite").format("delta").save(transformDeltaOutputPath) //save compute KPI dfKpiCompute.write.mode("overwrite").format("delta").save(avgDeltaOutputKPIPath) // return incremental dataframe for next set of load dfFullLoad } // load data for each data file, use last dataframe for KPI compute with the current load def loadData(dataFile: List[DataFile], oldDF:Option[DataFrame]):Boolean = { if(dataFile.isEmpty) { true } else { val nextDataFile = dataFile.head val newFullDF = readTransformWriteDelta(nextDataFile.hdfsPath,oldDF) loadData(dataFile.tail,Some(newFullDF)) } } val starTime=System.currentTimeMillis() loadData(listOfDataFile,None) println(s"Time taken in Seconds: ${(System.currentTimeMillis()-starTime)/1000}")
Lectura del formato delta mediante la tabla Delta
- leer datos transformados
- leer datos de indicadores clave de rendimiento (KPI)
import io.delta.tables._ val dtTransformed: io.delta.tables.DeltaTable = DeltaTable.forPath(transformDeltaOutputPath) val dtAvgKpi: io.delta.tables.DeltaTable = DeltaTable.forPath(avgDeltaOutputKPIPath)
Imprimir esquema
- Imprimir esquema de tabla delta para datos de KPI transformados y promedio1.
// transform data schema dtTransformed.toDF.printSchema // Average KPI Data Schema dtAvgKpi.toDF.printSchema
Mostrar el ÚLTIMO KPI calculado de la tabla de datos
dtAvgKpi.toDF.show(false)
Mostrar el historial de KPIs calculados
En este paso se muestra el historial de la tabla de transacciones de KPI de _delta_log
dtAvgKpi.history().show(false)
Mostrar datos de KPI después de cada carga de datos
- Con el uso de viajes en el tiempo, puede ver los cambios de KPI después de cada carga de datos.
- Puede almacenar todos los cambios de versión en formato CSV en
avgMoMKPIChangePath
, para que Power BI pueda leer estos cambios.
val dfTxLog = spark.read.json(s"${transformDeltaOutputPath}/_delta_log/*.json")
dfTxLog.select(col("add")("path").alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL").show(false)
Referencia
- Los nombres de proyecto de código abierto asociados, Apache, Apache Spark, Spark son marcas comerciales de la Apache Software Foundation (ASF).