Usar o Spark para trabalhar com arquivos de dados

Concluído

Um dos benefícios do uso do Spark é escrever e executar código em várias linguagens de programação, permitindo que você use as suas habilidades de programação e a linguagem mais apropriada para uma tarefa específica. A linguagem padrão em um novo notebook do Spark do Azure Databricks é o PySpark, uma versão do Python otimizada para Spark, que é popularmente usada por analistas e cientistas de dados devido ao forte suporte à manipulação e visualização de dados. Além disso, você pode usar linguagens como Scala (uma linguagem derivada de Java que pode ser usada interativamente) e SQL (uma variante da linguagem SQL bastante usada incluída na biblioteca do Spark SQL para trabalhar com estruturas de dados relacionais). Os engenheiros de software também podem criar soluções compiladas que são executadas no Spark usando estruturas como Java.

Explorando dados com dataframes

Nativamente, o Spark usa uma estrutura de dados chamada RDD (conjunto de dados distribuídos resilientes), mas, embora você possa escrever um código que funcione de maneira direta com os RDDs, a estrutura de dados mais usada para trabalhar com os dados estruturados no Spark é o dataframe, que é fornecido como parte da biblioteca do Spark SQL. Os dataframes no Spark são semelhantes aos da biblioteca Python onipresente Pandas, mas otimizados para funcionar no ambiente de processamento distribuído do Spark.

Observação

Além da API de Dataframe, o Spark SQL fornece uma API de Conjunto de Dados fortemente tipada com suporte para Java e Scala. Neste módulo, vamos no concentrar na API de Dataframe.

Carregando dados em um dataframe

Vamos explorar um exemplo hipotético para ver como você pode usar um dataframe para trabalhar usando dados. Suponha que você tenha os seguintes dados em um arquivo de texto delimitado por vírgula chamado products.csv na pasta data no armazenamento DBFS (Sistema de Arquivos Databricks):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Em um notebook Spark, você pode usar o seguinte código PySpark para carregar os dados em um dataframe e exibir as primeiras dez linhas:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

A linha %pyspark no início é chamada magic e informa ao Spark que a linguagem usada nessa célula é PySpark. Este é o código Scala equivalente do exemplo de dados de produtos:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

O magic %spark é usada para especificar Scala.

Dica

Você também pode selecionar a linguagem que deseja usar para cada célula na interface do Notebook.

Ambos os exemplos mostrados anteriormente produziriam uma saída como esta:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Especificando um esquema de dataframe

No exemplo anterior, a primeira linha do arquivo CSV continha os nomes das colunas, e o Spark conseguiu inferir o tipo de dados de cada coluna com base nos dados contidos nela. Você também pode especificar um esquema explícito para os dados, o que é útil quando os nomes de coluna não são incluídos no arquivo de dados, como este exemplo de CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

O seguinte exemplo do PySpark mostra como especificar um esquema para o dataframe a ser carregado de um arquivo chamado product-data.csv neste formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Os resultados seriam mais uma vez semelhantes a:

ProductID ProductName Categoria ListPrice
771 Mountain-100 Silver, 38 Mountain bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain bikes 3399.9900
... ... ... ...

Filtrando e agrupando dataframes

Você pode usar os métodos da classe Dataframe para filtrar, classificar, agrupar e manipular os dados contidos nela. Por exemplo, o seguinte exemplo de código usa o método select para recuperar as colunas ProductName e ListPrice do dataframe df que contém os dados do produto no exemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Em comum com a maioria dos métodos de manipulação de dados, select retorna um novo objeto de dataframe.

Dica

Selecionar um subconjunto de colunas de um dataframe é uma operação comum, que também pode ser realizada usando a seguinte sintaxe mais curta:

pricelist_df = df["ProductID", "ListPrice"]

Você pode "encadear" métodos para executar uma série de manipulações que resultam em um dataframe transformado. Por exemplo, este código de exemplo encadeia os métodos select e where para criar um dataframe contendo as colunas ProductName e ListPrice para produtos nas categorias Mountain Bikes ou Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539,9900
... ...

Para agrupar e agregar dados, você pode usar o método groupBy e as funções de agregação. Por exemplo, o seguinte código PySpark conta o número de produtos de cada categoria:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Os resultados desse exemplo de código seriam semelhantes a estes:

Categoria count
Fones de ouvido 3
Rodas 14
Mountain bikes 32
... ...

Usando expressões SQL no Spark

A API de Dataframe faz parte de uma biblioteca do Spark chamada Spark SQL, a qual permite que analistas de dados usem expressões SQL para consultar e manipular dados.

Criando objetos de banco de dados no catálogo do Spark

O catálogo do Spark é um metastore para objetos de dados relacionais, como exibições e tabelas. O runtime do Spark pode usar o catálogo para integrar perfeitamente o código escrito em qualquer linguagem com suporte do Spark a expressões SQL que podem ser mais naturais para alguns analistas de dados ou desenvolvedores.

Uma das maneiras mais simples de disponibilizar dados em um dataframe para consulta no catálogo do Spark é criar uma exibição temporária, conforme mostrado no seguinte código de exemplo:

df.createOrReplaceTempView("products")

Uma exibição é temporária, o que significa que ela é excluída automaticamente no final da sessão atual. Você também pode criar tabelas que persistem no catálogo para definir um banco de dados que possa ser consultado usando o Spark SQL.

Observação

Não exploraremos as tabelas de catálogo do Spark a fundo neste módulo, mas aproveitaremos para realçar alguns pontos-chave:

  • Você pode criar uma tabela vazia usando o método spark.catalog.createTable. Tabelas são estruturas de metadados que armazenam dados subjacentes no local de armazenamento associado ao catálogo. Excluir uma tabela também excluirá os dados subjacentes.
  • Você pode salvar um dataframe como uma tabela usando o método saveAsTable.
  • Você pode criar uma tabela externa usando o método spark.catalog.createExternalTable. As tabelas externas definem os metadados no catálogo, mas obtêm os dados subjacentes de um local de armazenamento externo; normalmente, de uma pasta em um data lake. Excluir uma tabela externa não excluirá os dados subjacentes.

Usando a API do Spark SQL para consultar dados

Você pode usar a API do Spark SQL em código escrito em qualquer linguagem para consultar dados no catálogo. Por exemplo, o código PySpark a seguir usa uma consulta SQL para retornar dados da exibição products como um dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Os resultados do exemplo de código seriam semelhantes a estes:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539,9900
... ...

Usando SQL código

O exemplo anterior demonstrou como usar a API do Spark SQL para inserir expressões SQL no código Spark. Em um notebook, você também pode usar o magic %sql para executar um código SQL que consulta objetos no catálogo, conforme exemplificado abaixo:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

O exemplo de código SQL retorna um conjunto de resultados exibido automaticamente no notebook como uma tabela, por exemplo:

Categoria ProductCount
Bretelles 3
Racks de bicicleta 1
Suportes de bicicleta 1
... ...