Tutorial: Carga y transformación de datos mediante DataFrames de Apache Spark
En este tutorial se muestra cómo cargar y transformar datos mediante la API DataFrame de Apache Spark Python (PySpark), la API DataFrame de Apache Spark Scala y la API SparkDataFrame de SparkR en Azure Databricks.
Al final de este tutorial, comprenderá lo que es un DataFrame y estará familiarizado con las siguientes tareas:
Python
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Creación de un objeto DataFrame con Python
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con un DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en PySpark
Consulte también la referencia de la API de PySpark de Apache Spark.
Scala
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Crear un Dataframe con Scala
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con una DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en Apache Spark
Consulte también la referencia de la API de Scala de Apache Spark.
R
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Creación de un SparkDataFrames de SparkR
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con un DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en SparkR
Consulte también la referencia a la API de Apache SparkR.
¿Qué es un DataFrame?
Un DataFrame es una estructura de datos etiquetada bidimensional con columnas de tipos potencialmente diferentes. Puedes pensar en un DataFrame como una hoja de cálculo, una tabla SQL o un diccionario de objetos de serie. DataFrame de Apache Spark proporciona un amplio conjunto de funciones (selección de columnas, filtro, unión, incorporación) que permiten resolver problemas comunes de análisis de datos de forma eficaz.
Los DataFrames de Apache Spark son una compilación de abstracción basada en conjuntos de datos distribuidos resistentes (RDD). Spark DataFrame y Spark SQL usan un motor unificado de planificación y optimización, lo que le permite obtener un rendimiento casi idéntico en todos los lenguajes admitidos en Azure Databricks (Python, SQL, Scala y R).
Requisitos
Para completar el siguiente tutorial, debe cumplir los siguientes requisitos:
Para usar los ejemplos de este tutorial, el área de trabajo debe tener habilitado Unity Catalog.
En los ejemplos de este tutorial se usa un volumen de Unity Catalog para almacenar datos de ejemplo. Para usar estos ejemplos, cree un volumen y use los nombres de catálogo, esquema y volumen de ese volumen para establecer la ruta de acceso del volumen usada por los ejemplos.
Debe tener estos permisos en Unity Catalog:
READ VOLUME
yWRITE VOLUME
, oALL PRIVILEGES
para el volumen usado para este tutorial.USE SCHEMA
oALL PRIVILEGES
para el esquema usado para este tutorial.USE CATALOG
oALL PRIVILEGES
para el catálogo usado para este tutorial.
Para establecer estos permisos, consulte los privilegios de administrador de Databricks o Unity Catalog y objetos protegibles.
Sugerencia
Para ver un cuaderno completado para este artículo, vea cuadernos del tutorial DataFrame.
Paso 1: Definir variables y cargar el archivo CSV
En este paso se definen las variables para su uso en este tutorial y, a continuación, se carga un archivo CSV que contiene los datos de nombres de bebé de health.data.ny.gov en el volumen de Unity Catalog.
Para abrir un nuevo cuaderno, haga clic en el icono . Para obtener información sobre cómo navegar por cuadernos de Azure Databricks, consulte Interfaz y controles del cuaderno de Databricks.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Reemplace
<catalog-name>
,<schema-name>
y<volume-name>
por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. Reemplace<table_name>
por un nombre de la tabla de su elección. Cargará los datos de nombres de bebé en esta tabla más adelante en este tutorial.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Presione
Shift+Enter
para ejecutar la celda y crear una nueva celda en blanco.Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código copia el archivo
rows.csv
de health.data.ny.gov en el volumen de Unity Catalog mediante el comando dbutuils de Databricks.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Paso 2: Crear un DataFrame
En este paso se crea un DataFrame denominado df1
con datos de prueba y, a continuación, se muestra su contenido.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código crea el DataFrame con datos de prueba y, a continuación, muestra el contenido y el esquema del DataFrame.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Paso 3: Cargar datos en un DataFrame desde un archivo CSV
Este paso crea un DataFrame denominado df_csv
desde el archivo CSV que cargó anteriormente en el volumen de Unity Catalog. Consulte spark.read.csv.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código carga los datos de nombres de bebé en el DataFrame
df_csv
desde el archivo CSV y, a continuación, muestra el contenido del DataFrame.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Puede cargar datos de muchos formatos de archivo admitidos.
Paso 4: Ver e interactuar con el DataFrame
Vea e interactúe con los DataFrames de nombres de bebé mediante los métodos siguientes.
Imprimir el esquema Dataframe
Aprenda a mostrar el esquema de un DataFrame de Apache Spark. Spark usa el término esquema para hacer referencia a los nombres y los tipos de datos de las columnas del DataFrame.
Nota:
Azure Databricks también usa el esquema de términos para describir una colección de tablas registradas en un catálogo.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra el esquema de los DataFrames con el método
.printSchema()
para ver los esquemas de los dos DataFrames y preparar la unión de ambos.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Cambiar un nombre de columna en el DataFrame
Obtenga información sobre cómo cambiar el nombre de una columna en un DataFrame.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código cambia el nombre de una columna del DataFrame
df1_csv
para que coincida con la columna correspondiente en el DataFramedf1
. Este código usa el métodowithColumnRenamed()
de Apache Spark.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Combinar DataFrames
Obtenga información sobre cómo crear un DataFrame que agregue las filas de un DataFrame a otro.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark
union()
para combinar el contenido del primer DataFramedf
con el DataFramedf_csv
que contiene los datos de nombres de bebé cargados desde el archivo CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Filtrado de filas en un DataFrame
Descubra los nombres de bebé más populares en el conjunto de datos mediante el filtrado de filas mediante los métodos .filter()
o .where()
de Apache Spark. Use el filtrado para seleccionar un subconjunto de filas para devolver o modificar en un DataFrame. No hay ninguna diferencia en el rendimiento o la sintaxis, como se muestra en los siguientes ejemplos.
Uso del método .filter()
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark
.filter()
para mostrar esas filas en el DataFrame con un recuento de más de 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Uso del método .where()
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark
.where()
para mostrar esas filas en el DataFrame con un recuento de más de 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Seleccionar columnas de un DataFrame y ordenar por frecuencia
Obtenga información sobre la frecuencia de un nombre de bebé con el método select()
para especificar las columnas de DataFrame que se van a devolver. Use las funciones orderby
y desc
de Apache Spark para ordenar los resultados.
El módulo pyspark.sql para Apache Spark proporciona compatibilidad con funciones SQL. Entre estas funciones que se usan en este tutorial se encuentran las funciones orderBy()
, desc()
y expr()
de Apache Spark. Para habilitar el uso de estas funciones, debe importarlas en la sesión según sea necesario.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función
desc()
y, a continuación, usa el métodoselect()
de Apache Spark y las funcionesorderBy()
ydesc()
de Apache Spark para mostrar los nombres más comunes y sus recuentos en orden descendente.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Creación de un DataFrame de subconjunto
Obtenga información sobre cómo crear un DataFrame de subconjunto a partir de un DataFrame existente.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método
filter
de Apache Spark para crear un nuevo DataFrame que restringe los datos por año, recuento y sexo. Usa el métodoselect()
de Apache Spark para limitar las columnas. También usa las funcionesorderBy()
ydesc()
de Apache Spark para ordenar el nuevo DataFrame por recuento.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Paso 5: Guardar el DataFrame
Obtenga información sobre cómo guardar un DataFrame. Puede guardar el DataFrame en una tabla o escribir el DataFrame en un archivo o en varios archivos.
Guardar el DataFrame en una tabla
Azure Databricks usa el formato Delta Lake para todas las tablas de forma predeterminada. Para guardar el Dataframe, debe tener privilegios de tabla CREATE
en el catálogo y el esquema.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el contenido de DataFrame en una tabla mediante la variable que definió al principio de este tutorial.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
La mayoría de las aplicaciones Spark funcionan en grandes conjuntos de datos y de forma distribuida. Spark escribe un directorio de archivos en lugar de un solo archivo. Delta Lake divide las carpetas y los archivos de Parquet. Muchos sistemas de datos pueden leer estos directorios de archivos. Azure Databricks recomienda usar tablas a través de rutas de acceso de archivo para la mayoría de las aplicaciones.
Guardar el DataFrame en archivos JSON
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el DataFrame en un directorio de archivos JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Lea el DataFrame de un archivo JSON
Aprenda a usar el método spark.read.format()
de Apache Spark para leer datos JSON de un directorio en un DataFrame.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra los archivos JSON que guardó en el ejemplo anterior.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Tareas adicionales: Ejecución de consultas SQL en PySpark, Scala y R
Los DataFrames de Apache Spark proporcionan las siguientes opciones para combinar SQL con PySpark, Scala y R. Puede ejecutar el código siguiente en el mismo cuaderno que creó para este tutorial.
Especificar una columna como una consulta SQL
Aprenda a usar el método selectExpr()
de Apache Spark. Se trata de una variante del método select()
que acepta expresiones SQL y devuelve un DataFrame actualizado. Este método permite usar una expresión SQL, como upper
.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark
selectExpr()
y la expresión SQLupper
para convertir una columna de cadena en mayúsculas (y cambiar el nombre de la columna).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Uso de expr()
para usar la sintaxis SQL para una columna
Obtenga información sobre cómo importar y usar la función de Apache Spark expr()
para usar la sintaxis SQL en cualquier lugar en el que se especifique una columna.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función
expr()
y, a continuación, usa la funciónexpr()
de Apache Spark y la expresión SQLlower
para convertir una columna de cadena en minúsculas (y cambiar el nombre de la columna).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Ejecución de una consulta SQL arbitraria mediante la función spark.sql()
Aprenda a usar la función spark.sql()
de Apache Spark para ejecutar consultas SQL arbitrarias.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa la función
spark.sql()
de Apache Spark para consultar una tabla SQL mediante la sintaxis SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.
Cuaderno del tutorial de DataFrame
Los cuadernos siguientes incluyen las consultas de ejemplos de este tutorial.