Trabalhar com dados em um dataframe do Spark

Concluído

Na unidade anterior, você aprendeu a conectar a uma fonte de dados, carregar dados em um dataframe e, opcionalmente, salvar o dataframe em um data lake como arquivo ou tabela. Agora, vamos explorar o dataframe em um pouco mais de detalhes.

Nativamente, o Spark usa uma estrutura de dados chamada conjunto de dados distribuído resiliente (RDD); mas, embora você possa escrever o código que trabalhe diretamente com RDDs, a estrutura de dados mais comumente usada para trabalhar com dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca SQL do Spark. Os dataframes no Spark são semelhantes aos da biblioteca Python onipresente Pandas, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Observação

Além da API de Dataframe, o Spark SQL fornece uma API de Conjunto de Dados fortemente tipada com suporte para Java e Scala. Neste módulo, vamos no concentrar na API de Dataframe.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar usando dados. Imagine que você tem os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta Arquivos/dados do lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferir um esquema

Em um notebook do Spark, é possível usar o seguinte código PySpark para carregar os dados do arquivo em um dataframe e exibir as dez primeiras linhas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Como você aprendeu anteriormente, a linha %%pyspark no início é chamada de mágica, e informa ao Spark que a linguagem usada nesta célula é o PySpark. Na maioria dos casos, PySpark é a linguagem padrão; e geralmente nos manteremos com ela nos exemplos desse módulo. No entanto, para fins de completude, aqui está o código Scala equivalente para o exemplo de dados de produtos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

O magic %%spark é usada para especificar Scala. Observe que a implementação em Scala do dataframe se comporta de maneira semelhante à versão do PySpark.

Ambos os exemplos de código produziriam uma saída como esta:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Especificar um esquema explícito

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark conseguiu inferir o tipo de dados de cada coluna com base nos dados contidos nela. Você também pode especificar um esquema explícito para os dados, o que é útil quando os nomes de coluna não são incluídos no arquivo de dados, como este exemplo de CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O seguinte exemplo do PySpark mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam mais uma vez semelhantes a:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Dica

A especificação de um esquema explícito também melhora o desempenho.

Filtrando e agrupando dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados contidos nela. Por exemplo, o exemplo de código a seguir usa o método select para recuperar as colunas ProductID e ListPrice do dataframe df que contém dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto de dataframe.

Dica

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser realizada usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um dataframe contendo as colunas ProductName e ListPrice para produtos nas categorias Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductName Categoria ListPrice
Mountain-100 Silver, 38 Mountain bikes 3399.9900
Road-750 Black, 52 Bicicletas de estrada 539,9900
... ... ...

Para agrupar e agregar dados, você pode usar o método groupBy e as funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos de cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

Categoria count
Fones de ouvido 3
Rodas 14
Mountain bikes 32
... ...