Trabalhar com dados em um dataframe do Spark

Concluído

Na unidade anterior, você aprendeu como se conectar a uma fonte de dados, carregar dados em um dataframe e, opcionalmente, salvar o dataframe em uma lakehouse como um arquivo ou tabela. Agora vamos explorar o dataframe com um pouco mais de detalhes.

Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuído resiliente), mas embora você possa escrever código que funcione diretamente com RDDs, a estrutura de dados mais usada para trabalhar com dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca SQL do Spark. Os dataframes no Spark são semelhantes aos da onipresente biblioteca Pandas Python, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Nota

Além da API Dataframe, o Spark SQL fornece uma API de conjunto de dados fortemente tipada que é suportada em Java e Scala. Vamos nos concentrar na API Dataframe neste módulo.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar com dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta Arquivos/dados em sua casa do lago:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferindo um esquema

Em um bloco de anotações do Spark, você pode usar o seguinte código PySpark para carregar os dados do arquivo em um dataframe e exibir as primeiras 10 linhas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Como você aprendeu anteriormente, a %%pyspark linha no início é chamada de magia, e diz ao Spark que a linguagem usada nesta célula é PySpark. Na maioria dos casos, PySpark é o idioma padrão; e geralmente vamos nos ater a ele nos exemplos deste módulo. No entanto, para completar, aqui está o código Scala equivalente para o exemplo de dados de produtos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

A magia %%spark é usada para especificar Scala. Observe que a implementação Scala do dataframe se comporta de forma semelhante à versão do PySpark.

Ambos os exemplos de código produziriam uma saída como esta:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Especificando um esquema explícito

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark era capaz de inferir o tipo de dados de cada coluna a partir dos dados que ela contém. Você também pode especificar um esquema explícito para os dados, que é útil quando os nomes das colunas não são incluídos no arquivo de dados, como este exemplo CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O exemplo PySpark a seguir mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam, mais uma vez, semelhantes a:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Gorjeta

Especificar um esquema explícito também melhora o desempenho!

Filtragem e agrupamento de dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados que ela contém. Por exemplo, o exemplo de código a seguir usa o método select para recuperar as colunas ProductID e ListPrice do dataframe df que contém dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductID PreçoListado
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto dataframe.

Gorjeta

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser obtida usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos juntos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um novo dataframe contendo as colunas ProductName e ListPrice para produtos com uma categoria de Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductName Categoria PreçoListado
Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
Road-750 Preta, 52 Road Bikes 539.9900
... ... ...

Para agrupar e agregar dados, você pode usar o método groupBy e agregar funções. Por exemplo, o seguinte código PySpark conta o número de produtos para cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

Categoria contagem
Auriculares 3
Pneus 14
Mountain Bikes 32
... ...