Compartilhar via


Noções básicas do PySpark

Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que você entenda conceitos fundamentais do Apache Spark e esteja executando comandos em um notebook do Azure Databricks conectado à computação. Você cria DataFrames usando dados de exemplo, executa transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e, em seguida, salva-os em uma tabela ou arquivo.

Carregar dados

Alguns exemplos neste artigo usam dados de exemplo fornecidos pelo Databricks para demonstrar o uso de DataFrames para carregar, transformar e salvar dados. Se você quiser usar seus próprios dados que ainda não estão no Databricks, você pode carregá-los primeiro e criar um DataFrame a partir dele. Consulte Criar ou modificar uma tabela usando o upload de arquivos e Carregar arquivos em um volume do Catálogo do Unity.

Sobre dados de exemplo do Databricks

O Databricks fornece dados de exemplo no catálogo de samples e no diretório /databricks-datasets.

  • Para acessar os dados de exemplo no catálogo samples, use o formato samples.<schema-name>.<table-name>. Este artigo usa tabelas no esquema samples.tpch, que contém dados de uma empresa fictícia. A tabela customer contém informações sobre clientes e orders contém informações sobre pedidos feitos por esses clientes.
  • Use dbutils.fs.ls para explorar dados em /databricks-datasets. Use o SPARK SQL ou DataFrames para consultar dados neste local usando caminhos de arquivo. Para saber mais sobre os dados de exemplo fornecidos pelo Databricks, consulte Conjuntos de dados de exemplo.

Importar tipos de dados

Muitas operações do PySpark exigem que você use funções SQL ou interaja com tipos nativos do Spark. Você pode importar diretamente apenas as funções e os tipos necessários ou importar o módulo inteiro.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Como algumas funções importadas podem substituir funções internas do Python, alguns usuários optam por importar esses módulos usando um alias. Os exemplos a seguir mostram um alias comum usado em exemplos de código do Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Para obter uma lista abrangente de tipos de dados, consulte Tipos de Dados do Spark.

Para obter uma lista abrangente de funções SQL do PySpark, consulte Funções do Spark.

Criar um DataFrame

Há várias maneiras de criar um DataFrame. Normalmente, você define um DataFrame em relação a uma fonte de dados, como uma tabela ou coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display, para disparar as transformações a serem executadas. O método display gera DataFrames.

Criar um DataFrame com valores especificados

Para criar um DataFrame com valores especificados, use o método createDataFrame, em que as linhas são expressas como uma lista de tuplas:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Observe na saída que os tipos de dados de colunas de df_children são inferidos automaticamente. Como alternativa, você pode especificar os tipos adicionando um esquema. Esquemas são definidos usando o StructType que é composto por StructFields que especificam o nome, o tipo de dados e um sinalizador booliano indicando se eles contêm um valor nulo ou não. Você deve importar tipos de dados de pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Criar um DataFrame de uma tabela no Catálogo do Unity

Para criar um DataFrame de uma tabela no Catálogo do Unity, use o método table que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>. Clique no Catálogo na barra de navegação à esquerda para usar o Explorador do Catálogo para navegar até sua tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no notebook.

O exemplo a seguir carrega a tabela samples.tpch.customer, mas você pode, como alternativa, fornecer o caminho para sua própria tabela.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Criar um DataFrame de um arquivo carregado

Para criar um DataFrame de um arquivo que você carregou em volumes do Catálogo do Unity, use a propriedade read. Esse método retorna um DataFrameReader, que você pode usar para ler o formato apropriado. Clique na opção de catálogo na barra lateral pequena à esquerda e use o navegador do catálogo para localizar o arquivo. Selecione-o e clique em Copiar caminho do arquivo de volume.

O exemplo a seguir lê de um arquivo *.csv, mas DataFrameReader dá suporte ao carregamento de arquivos em muitos outros formatos. Consulte os métodos do DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Para obter mais informações sobre volumes do Catálogo do Unity, consulte O que são volumes do Catálogo do Unity?.

Criar um DataFrame com base em uma resposta JSON

Para criar um DataFrame a partir de um conteúdo de resposta JSON retornado por uma API REST, use o pacote requests do Python para consultar e analisar a resposta. Você deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de aplicativos de medicamentos da Food and Drug Administration dos Estados Unidos.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados no Databricks, consulte Modelo de dados semiestruturados.

Selecionar um campo ou objeto JSON

Para selecionar um campo ou objeto específico do JSON convertido, use a notação []. Por exemplo, para selecionar o campo products que é uma matriz de produtos:

display(df_drugs.select(df_drugs["products"]))

Você também pode encadear chamadas de método para percorrer vários campos. Por exemplo, para gerar o nome da marca do primeiro produto em um aplicativo de medicamentos:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Criar um DataFrame usando um arquivo

Para demonstrar a criação de um DataFrame de um arquivo, este exemplo carrega dados CSV no diretório /databricks-datasets.

Para navegar até os conjuntos de dados de exemplo, você pode usar os comandos do sistema de arquivos Utilitários do Databricks. O exemplo a seguir usa dbutils para listar os conjuntos de dados disponíveis no /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Como alternativa, você pode usar %fs para acessar Comandos do sistema de arquivos da CLI do Databricks, conforme mostrado no exemplo a seguir:

%fs ls '/databricks-datasets'

Para criar um DataFrame de um arquivo ou diretório de arquivos, especifique o caminho no método load:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformar dados com DataFrames

DataFrames facilitam a transformação de dados usando métodos internos para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no pacote spark.sql.functions. Consulte Funções do Databricks Spark SQL.

Column operations

O Spark fornece muitas operações básicas de coluna:

Dica

Para gerar todas as colunas em um DataFrame, use columns, por exemplo, df_customer.columns.

Selecionar colunas

Você pode selecionar colunas específicas usando select e col. A função col está no submódulo pyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Você também pode se referir a uma coluna usando expr que usa uma expressão definida como uma cadeia de caracteres:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Você também pode usar selectExpr, que aceita expressões SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Para selecionar explicitamente uma coluna de um DataFrame específico, você pode usar o operador [] ou .. (O operador . não pode ser usado para selecionar colunas começando com um inteiro ou as que contêm um espaço ou caractere especial.) Isso pode ser especialmente útil quando você estiver ingressando em DataFrames em que algumas colunas têm o mesmo nome.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Criar colunas

Para criar uma nova coluna, use o método withColumn. O exemplo a seguir cria uma nova coluna que contém um valor booliano com base em se o saldo da conta do cliente c_acctbal excede 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Renomear colunas

Para renomear uma coluna, use o método withColumnRenamed, que aceita os nomes de coluna existentes e novos:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

O método alias é especialmente útil quando você deseja renomear suas colunas como parte de agregações:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Converter tipos de coluna

Em alguns casos, talvez você queira alterar o tipo de dados para uma ou mais colunas em seu DataFrame. Para fazer isso, use o método cast para converter entre tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um inteiro em um tipo de cadeia de caracteres, usando o método col para fazer referência a uma coluna:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Remover colunas

Para remover colunas, você pode omitir colunas durante uma seleção select(*) except ou pode usar o método drop:

df_customer_flag_renamed.drop("balance_flag_renamed")

Você também pode remover várias colunas ao mesmo tempo:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Row operations

O Spark fornece muitas operações básicas de linha:

Filtrar linhas

Para filtrar linhas, use o método filter ou where em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o método col ou uma expressão que seja avaliada como uma coluna.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Para filtrar em várias condições, use operadores lógicos. Por exemplo, & e | permitem que você AND e OR as condições, respectivamente. O exemplo a seguir filtra linhas em que c_nationkey é igual a 20 e c_acctbal é maior que 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Remover linhas duplicadas

Para desativar linhas, use distinct, que retorna apenas as linhas exclusivas.

df_unique = df_customer.distinct()

Manipular valores nulos

Para manipular com valores nulos, solte linhas que contêm valores nulos usando o método na.drop. Esse método permite que você especifique se deseja remover linhas que contêm valores nulos any ou all.

Para remover valores nulos, use um dos exemplos a seguir.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Se, em vez disso, você quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:

df_customer_no_nulls = df_customer.na.drop("all")

Você pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Para preencher valores ausentes, use o método fill. Você pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo, os saldos de conta que têm um valor nulo para o saldo da conta c_acctbal são preenchidos com 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Para substituir cadeias de caracteres por outros valores, use o método replace. No exemplo abaixo, todas as cadeias de caracteres de endereço vazias são substituídas pela palavra UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Acrescentar linhas

Para acrescentar linhas, você precisa usar o método union para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer criado anteriormente e df_filtered_customer são combinados, o que retorna um DataFrame com três clientes:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Observação

Você também pode combinar DataFrames gravando-os em uma tabela e acrescentando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados para uma tabela de destino pode reduzir drasticamente os custos de latência e computação à medida que os dados aumentam de tamanho. Veja Ingerir dados em um databricks lakehouse.

Classificar linhas

Importante

A classificação pode ser cara em escala e, se você armazenar dados classificados e recarregar os dados com o Spark, a ordem não será garantida. Verifique se você usa a classificação intencionalmente.

Para classificar linhas em uma ou mais colunas, use o método sort ou orderBy. Por padrão, esses métodos classificam em ordem crescente:

df_customer.orderBy(col("c_acctbal"))

Para filtrar em ordem decrescente, use desc:

df_customer.sort(col("c_custkey").desc())

O exemplo a seguir mostra como classificar em duas colunas:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o método limit. O exemplo a seguir exibe apenas os principais resultados de 10:

display(df_sorted.limit(10))

Ingressar no DataFrames

Para unir dois ou mais DataFrames, use o método join. Você pode especificar como deseja que os DataFrames sejam unidos nos parâmetros how (o tipo de junção) e on (em quais colunas basear a junção). Os tipos comuns de junção incluem:

  • inner: esse é o padrão de tipo de junção, que retorna um DataFrame que mantém apenas as linhas em que há uma correspondência para o parâmetro on entre DataFrames.
  • left: isso mantém todas as linhas do primeiro DataFrame especificado e apenas linhas do segundo DataFrame especificado que têm uma correspondência com a primeira.
  • outer: uma junção externa mantém todas as linhas de ambos os DataFrames, independentemente da correspondência.

Para obter informações detalhadas sobre junções, consulte Trabalhar com junções no Azure Databricks. Para obter uma lista de junções com suporte no PySpark, consulte Junções no DataFrame.

O exemplo a seguir retorna um único DataFrame em que cada linha do DataFrame orders é unida à linha correspondente do DataFrame customers. Uma junção interna é usada, pois a expectativa é que cada pedido corresponda a exatamente um cliente.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Para ingressar em várias condições, use operadores boolianos, como & e |, para especificar AND e OR, respectivamente. O exemplo a seguir inclui uma condição adicional, filtrando apenas as linhas que têm o_totalprice maior que 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Agregação de dados

Para agregar dados em um DataFrame, semelhante a um GROUP BY no SQL, use o método groupBy para especificar colunas para agrupar e o método agg para especificar agregações. Importar agregações comuns, incluindo avg, sum, max e min de pyspark.sql.functions. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Algumas agregações são ações, o que significa que elas disparam cálculos. Nesse caso, você não precisa usar outras ações para gerar resultados.

Para contar linhas em um DataFrame, use o método count:

df_customer.count()

Chamadas de encadeamento

Os métodos que transformam DataFrames retornam DataFrames e o Spark não atua em transformações até que as ações sejam chamadas. Essa avaliação lenta significa que você pode encadear vários métodos para conveniência e legibilidade. O exemplo a seguir mostra como encadear filtragem, agregação e ordenação:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualizar seus DataFrame

Para visualizar um DataFrame em um notebook, clique no sinal + ao lado da tabela no canto superior esquerdo do DataFrame e selecione Visualização para adicionar um ou mais gráficos com base em seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em notebooks do Databricks.

display(df_order)

Para executar visualizações adicionais, o Databricks recomenda o uso da API do Pandas para Spark. O .pandas_api() permite que você converta para a API do Pandas correspondente para um DataFrame do Spark. Para obter mais informações, consulte API do Pandas no Spark.

Salvar seus dados

Depois de transformar seus dados, você pode salvá-los usando os métodos DataFrameWriter. Uma lista completa desses métodos pode ser encontrada em DataFrameWriter. As seções a seguir mostram como salvar seu DataFrame como uma tabela e como uma coleção de arquivos de dados.

Salvar seu DataFrame como uma tabela

Para salvar o DataFrame como uma tabela no Catálogo do Unity, use o método write.saveAsTable e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Gravar seu DataFrame como CSV

Para gravar seu DataFrame no formato *.csv, use o método write.csv, especificando o formato e as opções. Por padrão, se houver dados no caminho especificado, a operação de gravação falhará. Você pode especificar um dos seguintes modos para executar uma ação diferente:

  • overwrite substitui todos os dados existentes no caminho de destino com o conteúdo do DataFrame.
  • append acrescenta o conteúdo do DataFrame aos dados no caminho de destino.
  • ignore falha silenciosamente na gravação se houver dados no caminho de destino.

O exemplo a seguir demonstra a substituição de dados com o conteúdo do DataFrame como arquivos CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Próximas etapas

Para aproveitar mais recursos do Spark no Databricks, confira: