Usar o Spark para trabalhar com arquivos de dados

Concluído

Um dos benefícios de usar o Spark é que você pode escrever e executar código em várias linguagens de programação, permitindo que você use as habilidades de programação que você já tem e use a linguagem mais apropriada para uma determinada tarefa. A linguagem padrão em um novo notebook do Azure Databricks Spark é o PySpark - uma versão otimizada para o Spark do Python, que é comumente usada por cientistas de dados e analistas devido ao seu forte suporte para manipulação e visualização de dados. Além disso, você pode usar linguagens como Scala (uma linguagem derivada de Java que pode ser usada interativamente) e SQL (uma variante da linguagem SQL comumente usada incluída na biblioteca SQL do Spark para trabalhar com estruturas de dados relacionais). Os engenheiros de software também podem criar soluções compiladas que são executadas no Spark usando frameworks como Java.

Explorando dados com dataframes

Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuído resiliente), mas embora você possa escrever código que funcione diretamente com RDDs, a estrutura de dados mais usada para trabalhar com dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca SQL do Spark. Os dataframes no Spark são semelhantes aos da onipresente biblioteca Pandas Python, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Nota

Além da API Dataframe, o Spark SQL fornece uma API de conjunto de dados fortemente tipada que é suportada em Java e Scala. Vamos nos concentrar na API Dataframe neste módulo.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar com dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgulas chamado products.csv na pasta de dados no armazenamento do sistema de arquivos Databricks (DBFS):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Em um bloco de anotações do Spark, você pode usar o seguinte código PySpark para carregar os dados em um dataframe e exibir as primeiras 10 linhas:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

A %pyspark linha no início é chamada de magia, e diz a Spark que a linguagem usada nesta célula é PySpark. Aqui está o código Scala equivalente para o exemplo de dados de produtos:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

A magia %spark é usada para especificar Scala.

Gorjeta

Você também pode selecionar o idioma que deseja usar para cada célula na interface do Bloco de Anotações.

Ambos os exemplos mostrados anteriormente produziriam resultados como este:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Especificando um esquema de dataframe

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark era capaz de inferir o tipo de dados de cada coluna a partir dos dados que ela contém. Você também pode especificar um esquema explícito para os dados, que é útil quando os nomes das colunas não são incluídos no arquivo de dados, como este exemplo CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O exemplo PySpark a seguir mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam, mais uma vez, semelhantes a:

ProductID ProductName Categoria PreçoListado
771 Montanha 100 Prateado, 38 Mountain Bikes 3399.9900
772 Montanha 100 Prateado, 42 Mountain Bikes 3399.9900
773 Montanha 100 Prateado, 44 Mountain Bikes 3399.9900
... ... ... ...

Filtragem e agrupamento de dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados que ela contém. Por exemplo, o exemplo de código a seguir usa o método select para recuperar as colunas ProductName e ListPrice do dataframe df que contém dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductID PreçoListado
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto dataframe.

Gorjeta

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser obtida usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos juntos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um novo dataframe contendo as colunas ProductName e ListPrice para produtos com uma categoria de Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

ProductName PreçoListado
Montanha 100 Prateado, 38 3399.9900
Road-750 Preta, 52 539.9900
... ...

Para agrupar e agregar dados, você pode usar o método groupBy e agregar funções. Por exemplo, o seguinte código PySpark conta o número de produtos para cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados deste exemplo de código seriam mais ou menos assim:

Categoria contagem
Auriculares 3
Pneus 14
Mountain Bikes 32
... ...

Usando expressões SQL no Spark

A API Dataframe faz parte de uma biblioteca do Spark chamada Spark SQL, que permite que os analistas de dados usem expressões SQL para consultar e manipular dados.

Criando objetos de banco de dados no catálogo do Spark

O catálogo do Spark é um metastore para objetos de dados relacionais, como exibições e tabelas. O tempo de execução do Spark pode usar o catálogo para integrar perfeitamente o código escrito em qualquer linguagem suportada pelo Spark com expressões SQL que podem ser mais naturais para alguns analistas de dados ou desenvolvedores.

Uma das maneiras mais simples de disponibilizar dados em um dataframe para consulta no catálogo do Spark é criar uma exibição temporária, conforme mostrado no exemplo de código a seguir:

df.createOrReplaceTempView("products")

Uma vista é temporária, o que significa que é automaticamente eliminada no final da sessão atual. Você também pode criar tabelas que são persistentes no catálogo para definir um banco de dados que pode ser consultado usando o Spark SQL.

Nota

Não exploraremos as tabelas de catálogo do Spark em profundidade neste módulo, mas vale a pena reservar um tempo para destacar alguns pontos-chave:

  • Você pode criar uma tabela vazia usando o spark.catalog.createTable método. As tabelas são estruturas de metadados que armazenam seus dados subjacentes no local de armazenamento associado ao catálogo. A exclusão de uma tabela também exclui seus dados subjacentes.
  • Você pode salvar um dataframe como uma tabela usando seu saveAsTable método.
  • Você pode criar uma tabela externa usando o spark.catalog.createExternalTable método. As tabelas externas definem metadados no catálogo, mas obtêm seus dados subjacentes de um local de armazenamento externo; normalmente uma pasta em um data lake. A exclusão de uma tabela externa não exclui os dados subjacentes.

Usando a API SQL do Spark para consultar dados

Você pode usar a API SQL do Spark em código escrito em qualquer idioma para consultar dados no catálogo. Por exemplo, o código PySpark a seguir usa uma consulta SQL para retornar dados da exibição de produtos como um dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Os resultados do exemplo de código seriam semelhantes à tabela a seguir:

ProductName PreçoListado
Montanha 100 Prateado, 38 3399.9900
Road-750 Preta, 52 539.9900
... ...

Usando código SQL

O exemplo anterior demonstrou como usar a API SQL do Spark para incorporar expressões SQL no código do Spark. Em um bloco de anotações, você também pode usar a mágica para executar o %sql código SQL que consulta objetos no catálogo, da seguinte forma:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

O exemplo de código SQL retorna um conjunto de resultados que é exibido automaticamente no bloco de anotações como uma tabela, como a abaixo:

Categoria Contagem de produtos
Bib-Calções 3
Suportes para bicicletas 1
Suportes para Bicicletas 1
... ...