Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Este tutorial mostra como carregar e transformar dados usando a API DataFrame do Apache Spark Python (PySpark), a API do Apache Spark Scala DataFrame e a API SparkR SparkDataFrame no Azure Databricks.
Ao final deste tutorial, você entenderá o que é um DataFrame e estará familiarizado com as seguintes tarefas:
Python
- Definir variáveis e copiar dados públicos para um volume do Catálogo Unity
- Criar um DataFrame com Python
- Carregar dados em um DataFrame a partir do arquivo CSV
- Exibir e interagir com um DataFrame
- Salvar o DataFrame
- Executar consultas SQL no PySpark
Consulte também Referência da API do Apache Spark PySpark.
linguagem de programação Scala
- Definir variáveis e copiar dados públicos para um volume do Catálogo Unity
- Criar um DataFrame com Scala
- Carregar dados em um DataFrame a partir do arquivo CSV
- Exibir e interagir com um DataFrame
- Salvar o DataFrame
- Executar consultas SQL no Apache Spark
Consulte também Referência da API do Apache Spark Scala.
R
- Definir variáveis e copiar dados públicos para um volume do Catálogo Unity
- Criar um SparkR SparkDataFrames
- Carregar dados em um DataFrame a partir do arquivo CSV
- Exibir e interagir com um DataFrame
- Salvar o DataFrame
- Executar consultas SQL no SparkR
Consulte também Referência da API do Apache SparkR.
O que é um DataFrame?
Um DataFrame é uma estrutura de dados rotulada bidimensional com colunas de tipos potencialmente diferentes. Você pode pensar em um DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos de série. O Apache Spark DataFrames fornece um rico conjunto de funções (selecionar colunas, filtrar, juntar, agregar) que permitem resolver problemas comuns de análise de dados de forma eficiente.
Os Apache Spark DataFrames são uma abstração construída sobre conjuntos de dados distribuídos resilientes (RDDs). O Spark DataFrames e o Spark SQL usam um mecanismo unificado de planejamento e otimização, permitindo que você obtenha um desempenho quase idêntico em todas as linguagens com suporte no Azure Databricks (Python, SQL, Scala e R).
Requisitos
Para concluir o tutorial a seguir, você deve atender aos seguintes requisitos:
Para usar os exemplos neste tutorial, seu espaço de trabalho deve ter Unity Catalog habilitado.
Os exemplos neste tutorial usam um Catálogo Unity volume para armazenar dados de exemplo. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos.
Você deve ter as seguintes permissões no Catálogo Unity:
-
READ VOLUMEeWRITE VOLUME, ouALL PRIVILEGESpara o volume usado para este tutorial. -
USE SCHEMAouALL PRIVILEGESpara o esquema usado para este tutorial. -
USE CATALOGouALL PRIVILEGESpara o catálogo usado para este tutorial.
Para definir essas permissões, consulte o administrador do Databricks ou privilégios do Catálogo Unity e objetos securizáveis.
-
Gorjeta
Para obter um bloco de anotações concluído para este artigo, consulte Blocos de anotações de tutorial do DataFrame.
Etapa 1: Definir variáveis e carregar arquivo CSV
Esta etapa define variáveis para uso neste tutorial e, em seguida, carrega um arquivo CSV contendo dados de nome de bebê de health.data.ny.gov em seu volume do Catálogo Unity.
Abra um novo bloco de notas clicando no
ícone. Para saber como navegar nos blocos de anotações do Azure Databricks, consulte Personalizar a aparência do bloco de anotações.Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Substitua
<catalog-name>,<schema-name>e<volume-name>pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity. Substitua<table_name>por um nome de tabela de sua escolha. Você carregará os dados do nome do bebê nesta tabela mais adiante neste tutorial.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete pathlinguagem de programação Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete pathR
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete pathPressione
Shift+Enterpara executar a célula e criar uma nova célula em branco.Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código copia o arquivo
rows.csvdo health.data.ny.gov para o volume do Catálogo Unity usando o comando Databricks dbutuils.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")linguagem de programação Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Etapa 2: Criar um DataFrame
Esta etapa cria um DataFrame nomeado df1 com dados de teste e, em seguida, exibe seu conteúdo.
Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código cria o DataFrame com dados de teste e, em seguida, exibe o conteúdo e o esquema do DataFrame.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] # highlight-next-line df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.linguagem de programação Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") // highlight-next-line val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) # highlight-next-line df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Etapa 3: Carregar dados em um DataFrame a partir do arquivo CSV
Esta etapa cria um DataFrame chamado df_csv a partir do arquivo CSV que você carregou anteriormente no volume do Catálogo Unity. Ver spark.read.csv.
Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código carrega dados de nome de bebê em DataFrame
df_csva partir do arquivo CSV e, em seguida, exibe o conteúdo do DataFrame.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)linguagem de programação Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Você pode carregar dados de muitos formatos de arquivo suportados.
Etapa 4: exibir e interagir com seu DataFrame
Visualize e interaja com os nomes do seu bebé DataFrames utilizando os seguintes métodos.
Imprimir o esquema DataFrame
Saiba como exibir o esquema de um Apache Spark DataFrame. O Apache Spark usa o termo esquema para se referir aos nomes e tipos de dados das colunas no DataFrame.
Nota
O Azure Databricks também usa o termo esquema para descrever uma coleção de tabelas registradas em um catálogo.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Este código mostra o esquema dos seus DataFrames utilizando o método
.printSchema()para visualizar os esquemas dos dois DataFrames, preparando-se para unir os dois DataFrames.Python
df_csv.printSchema() df1.printSchema()linguagem de programação Scala
dfCsv.printSchema() df1.printSchema()R
printSchema(df_csv) printSchema(df1)Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Renomear coluna no DataFrame
Saiba como renomear uma coluna em um DataFrame.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código renomeia uma coluna no
df1_csvDataFrame para corresponder à respetiva coluna nodf1DataFrame. Este código usa o método Apache SparkwithColumnRenamed().Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchemalinguagem de programação Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Combinar DataFrames
Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark
union()para combinar o conteúdo do seu primeiro DataFramedfcom DataFramedf_csvcontendo os dados de nomes de bebês carregados do arquivo CSV.Python
df = df1.union(df_csv) display(df)linguagem de programação Scala
val df = df1.union(dfCsvRenamed) display(df)R
display(df <- union(df1, df_csv))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Filtrar linhas em um DataFrame
Descubra os nomes de bebés mais populares no seu conjunto de dados filtrando linhas, utilizando os métodos Apache Spark .filter() ou .where(). Use a filtragem para selecionar um subconjunto de linhas para retornar ou modificar em um DataFrame. Não há diferença no desempenho ou na sintaxe, como visto nos exemplos a seguir.
Usando o método .filter()
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark
.filter()para exibir essas linhas no DataFrame com uma contagem de mais de 50.Python
display(df.filter(df["Count"] > 50))linguagem de programação Scala
display(df.filter(df("Count") > 50))R
display(filteredDF <- filter(df, df$Count > 50))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Usando o método .where()
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark
.where()para exibir essas linhas no DataFrame com uma contagem de mais de 50.Python
display(df.where(df["Count"] > 50))linguagem de programação Scala
display(df.where(df("Count") > 50))R
display(filtered_df <- where(df, df$Count > 50))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Selecionar colunas de um DataFrame e ordenar por frequência
Saiba mais sobre qual frequência de nome de bebê com o método select() para especificar as colunas do DataFrame a serem retornadas. Use o Apache Spark orderby e desc as funções para ordenar os resultados.
O módulo pyspark.sql para Apache Spark fornece suporte para funções SQL. Entre essas funções que usamos neste tutorial estão o Apache Spark orderBy(), desc()e expr() funções. Você habilita o uso dessas funções importando-as para sua sessão, conforme necessário.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Este código importa a
desc()função e, em seguida, usa o método Apache Sparkselect()e Apache SparkorderBy()edesc()funções para exibir os nomes mais comuns e suas contagens em ordem decrescente.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))linguagem de programação Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Criar um subconjunto DataFrame
Saiba como criar um subconjunto DataFrame a partir de um DataFrame existente.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark
filterpara criar um novo DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o método Apache Sparkselect()para limitar as colunas. Ele também usa o Apache SparkorderBy()edesc()funções para classificar o novo DataFrame por contagem.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)linguagem de programação Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Etapa 5: salvar o DataFrame
Saiba como salvar um DataFrame,. Você pode salvar seu DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.
Salvar o DataFrame em uma tabela
O Azure Databricks usa o formato Delta Lake para todas as tabelas por padrão. Para salvar seu DataFrame, você deve ter privilégios de tabela CREATE no catálogo e no esquema.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código salva o conteúdo do DataFrame em uma tabela usando a variável que você definiu no início deste tutorial.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")linguagem de programação Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
A maioria dos aplicativos Apache Spark funciona em grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. O Azure Databricks recomenda o uso de tabelas em caminhos de arquivo para a maioria dos aplicativos.
Salve o DataFrame em arquivos JSON
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código salva o DataFrame em um diretório de arquivos JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")linguagem de programação Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Ler o DataFrame de um arquivo JSON
Saiba como usar o método Apache Spark spark.read.format() para ler dados JSON de um diretório em um DataFrame.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código exibe os arquivos JSON salvos no exemplo anterior.
Python
display(spark.read.format("json").json("/tmp/json_data"))linguagem de programação Scala
display(spark.read.format("json").json("/tmp/json_data"))R
display(read.json("/tmp/json_data"))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Tarefas adicionais: Executar consultas SQL no PySpark, Scala e R
O Apache Spark DataFrames fornece as seguintes opções para combinar SQL com PySpark, Scala e R. Você pode executar o código a seguir no mesmo bloco de anotações que você criou para este tutorial.
Especificar uma coluna como uma consulta SQL
Saiba como usar o método Apache Spark selectExpr() . Esta é uma variante do select() método que aceita expressões SQL e retorna um DataFrame atualizado. Esse método permite que você use uma expressão SQL, como upper.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark
selectExpr()e a expressão SQLupperpara converter uma coluna de cadeia de caracteres em maiúsculas (e renomear a coluna).Python
display(df.selectExpr("Count", "upper(County) as big_name"))linguagem de programação Scala
display(df.selectExpr("Count", "upper(County) as big_name"))R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Usar expr() para usar a sintaxe SQL para uma coluna
Saiba como importar e usar a função Apache Spark expr() para usar a sintaxe SQL em qualquer lugar onde uma coluna seja especificada.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código importa a função
expr()e, em seguida, usa a função Apache Sparkexpr()e a expressão SQLlowerpara converter uma coluna de cadeia de caracteres em minúsculas (e renomear a coluna).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))linguagem de programação Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionalityPressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Executar uma consulta SQL arbitrária usando a função spark.sql()
Saiba como usar a função Apache Spark spark.sql() para executar consultas SQL arbitrárias.
Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Este código usa a função Apache Spark
spark.sql()para consultar uma tabela SQL usando sintaxe SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))linguagem de programação Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))Pressione
Shift+Enterpara executar a célula e, em seguida, vá para a próxima célula.
Blocos de anotações de tutorial DataFrame
Os blocos de anotações a seguir incluem as consultas de exemplos deste tutorial.