Partilhar via


Noções básicas do PySpark

Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que você entenda os conceitos fundamentais do Apache Spark e esteja executando comandos em um bloco de anotações do Azure Databricks conectado à computação. Você cria DataFrames usando dados de exemplo, executa transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e os salva em uma tabela ou arquivo.

Carregar dados

Alguns exemplos neste artigo usam dados de exemplo fornecidos pelo Databricks para demonstrar o uso de DataFrames para carregar, transformar e salvar dados. Se você quiser usar seus próprios dados que ainda não estão no Databricks, você pode carregá-los primeiro e criar um DataFrame a partir dele. Consulte Criar ou modificar uma tabela usando o upload de arquivos e Carregar arquivos para um volume do Catálogo Unity.

Sobre os dados de exemplo do Databricks

Databricks fornece dados de exemplo no samples catálogo e no /databricks-datasets diretório.

  • Para acessar os dados de exemplo no samples catálogo, use o formato samples.<schema-name>.<table-name>. Este artigo usa tabelas no samples.tpch esquema, que contém dados de uma empresa fictícia. A customer tabela contém informações sobre os clientes e orders contém informações sobre os pedidos feitos por esses clientes.
  • Use dbutils.fs.ls para explorar dados no /databricks-datasets. Use o Spark SQL ou DataFrames para consultar dados nesse local usando caminhos de arquivo. Para saber mais sobre os dados de exemplo fornecidos pelo Databricks, consulte Conjuntos de dados de exemplo.

Importar tipos de dados

Muitas operações do PySpark exigem que você use funções SQL ou interaja com tipos nativos do Spark. Você pode importar diretamente apenas as funções e os tipos necessários ou importar o módulo inteiro.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Como algumas funções importadas podem substituir funções internas do Python, alguns usuários optam por importar esses módulos usando um alias. Os exemplos a seguir mostram um alias comum usado em exemplos de código do Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Para obter uma lista abrangente de tipos de dados, consulte Tipos de dados do Spark.

Para obter uma lista abrangente das funções SQL do PySpark, consulte Funções do Spark.

Criar um DataFrame

Há várias maneiras de criar um DataFrame. Normalmente, você define um DataFrame em relação a uma fonte de dados, como uma tabela ou coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display, para acionar as transformações a serem executadas. O display método produz DataFrames.

Criar um DataFrame com valores especificados

Para criar um DataFrame com valores especificados, use o createDataFrame método, onde as linhas são expressas como uma lista de tuplas:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Observe na saída que os tipos de dados de colunas de df_children são automaticamente inferidos. Como alternativa, você pode especificar os tipos adicionando um esquema. Os esquemas são definidos usando o StructType que é composto por que especificam o nome, o tipo de StructFields dados e um sinalizador booleano indicando se eles contêm um valor nulo ou não. Você deve importar tipos de dados do pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Criar um DataFrame a partir de uma tabela no Unity Catalog

Para criar um DataFrame a partir de uma tabela no Unity Catalog, use o table método que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>. Clique em Catálogo na barra de navegação esquerda para usar o Catalog Explorer para navegar até a tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no bloco de anotações.

O exemplo a seguir carrega a tabela samples.tpch.customer, mas você pode, alternativamente, fornecer o caminho para sua própria tabela.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Criar um DataFrame a partir de um arquivo carregado

Para criar um DataFrame a partir de um arquivo que você carregou para volumes do Catálogo Unity, use a read propriedade. Esse método retorna um DataFrameReader, que você pode usar para ler o formato apropriado. Clique na opção de catálogo na pequena barra lateral à esquerda e use o navegador de catálogo para localizar seu arquivo. Selecione-o e clique em Copiar caminho do arquivo de volume.

O exemplo abaixo lê a partir de um *.csv arquivo, mas DataFrameReader suporta o upload de arquivos em muitos outros formatos. Consulte Métodos DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Para obter mais informações sobre os volumes do Catálogo Unity, consulte O que são volumes do Catálogo Unity?.

Criar um DataFrame a partir de uma resposta JSON

Para criar um DataFrame a partir de uma carga útil de resposta JSON retornada por uma API REST, use o pacote Python requests para consultar e analisar a resposta. Você deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de aplicativos de medicamentos da Food and Drug Administration dos Estados Unidos.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados no Databricks, consulte Modelar dados semiestruturados.

Selecionar um campo ou objeto JSON

Para selecionar um campo ou objeto específico do JSON convertido, use a [] notação. Por exemplo, para selecionar o products campo que é uma matriz de produtos:

display(df_drugs.select(df_drugs["products"]))

Você também pode encadear chamadas de método para atravessar vários campos. Por exemplo, para produzir o nome da marca do primeiro produto em uma aplicação de medicamento:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Criar um DataFrame a partir de um arquivo

Para demonstrar a criação de um DataFrame a partir de um arquivo, este exemplo carrega dados CSV no /databricks-datasets diretório.

Para navegar até os conjuntos de dados de exemplo, você pode usar os comandos do sistema de arquivos Databricks Utilties . O exemplo a seguir usa dbutils para listar os conjuntos de dados disponíveis em /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Como alternativa, você pode usar %fs para acessar os comandos do sistema de arquivos da CLI do Databricks, conforme mostrado no exemplo a seguir:

%fs ls '/databricks-datasets'

Para criar um DataFrame a partir de um arquivo ou diretório de arquivos, especifique o load caminho no método:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transforme dados com DataFrames

DataFrames facilitam a transformação de dados usando métodos internos para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no spark.sql.functions pacote. Consulte Databricks Spark SQL Functions.

Operações de coluna

O Spark fornece muitas operações básicas de coluna:

Gorjeta

Para gerar a saída de todas as colunas em um DataFrame, use columns, por exemplo df_customer.columns, .

Selecionar colunas

Você pode selecionar colunas específicas usando select e col. A col função está no pyspark.sql.functions submódulo.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Você também pode se referir a uma coluna usando expr uma expressão definida como uma cadeia de caracteres:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Você também pode usar selectExpr, que aceita expressões SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Para selecionar explicitamente uma coluna de um DataFrame específico, você pode usar o [] operador ou o . operador. (O . operador não pode ser usado para selecionar colunas que comecem com um número inteiro ou que contenham um espaço ou caractere especial.) Isso pode ser especialmente útil quando você está unindo DataFrames onde algumas colunas têm o mesmo nome.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Criar colunas

Para criar uma nova coluna, use o withColumn método. O exemplo a seguir cria uma nova coluna que contém um valor booleano com base em se o saldo c_acctbal da conta do cliente excede 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Mudar o nome das colunas

Para renomear uma coluna, use o withColumnRenamed método, que aceita os nomes de coluna novos e existentes:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

O alias método é especialmente útil quando você deseja renomear suas colunas como parte de agregações:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Tipos de coluna de transmissão

Em alguns casos, você pode querer alterar o tipo de dados para uma ou mais das colunas em seu DataFrame. Para fazer isso, use o cast método para converter entre tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um inteiro para o tipo de cadeia de caracteres, usando o col método para fazer referência a uma coluna:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Remover colunas

Para remover colunas, você pode omitir colunas durante uma seleção ou select(*) except pode usar o drop método:

df_customer_flag_renamed.drop("balance_flag_renamed")

Você também pode soltar várias colunas de uma só vez:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operações de linha

O Spark fornece muitas operações básicas de linha:

Filtrar linhas

Para filtrar linhas, use o filter método ou where em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o col método ou uma expressão que é avaliada como uma coluna.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Para filtrar em várias condições, use operadores lógicos. Por exemplo, & e | permitir que você e OR AND condições, respectivamente. O exemplo a seguir filtra linhas em que o c_nationkey é igual a 20 e c_acctbal é maior que 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Remover linhas duplicadas

Para eliminar a duplicação de linhas, use distinct, que retorna apenas as linhas exclusivas.

df_unique = df_customer.distinct()

Manipular valores nulos

Para manipular valores nulos, solte linhas que contenham valores nulos usando o na.drop método. Esse método permite especificar se você deseja soltar linhas contendo any valores nulos ou all valores nulos.

Para descartar quaisquer valores nulos, use um dos exemplos a seguir.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Se, em vez disso, você quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:

df_customer_no_nulls = df_customer.na.drop("all")

Você pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Para preencher os valores em falta, utilize o fill método. Você pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo, os saldos de conta que têm um valor nulo para o saldo c_acctbal da conta são preenchidos com 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Para substituir cadeias de caracteres por outros valores, use o replace método. No exemplo abaixo, todas as cadeias de endereços vazias são substituídas pela palavra UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Acrescentar linhas

Para acrescentar linhas, você precisa usar o union método para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer criado anteriormente e df_filtered_customer são combinados, que retorna um DataFrame com três clientes:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Nota

Você também pode combinar DataFrames gravando-os em uma tabela e, em seguida, anexando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados para uma tabela de destino pode reduzir drasticamente a latência e os custos de computação à medida que os dados crescem em tamanho. Consulte Ingerir dados em uma casa de lago Databricks.

Ordenar linhas

Importante

A classificação pode ser cara em escala, e se você armazenar dados classificados e recarregar os dados com o Spark, o pedido não é garantido. Certifique-se de que você é intencional em seu uso de classificação.

Para classificar linhas por uma ou mais colunas, use o sort método ou orderBy . Por padrão, esses métodos classificam em ordem crescente:

df_customer.orderBy(col("c_acctbal"))

Para filtrar em ordem decrescente, use desc:

df_customer.sort(col("c_custkey").desc())

O exemplo a seguir mostra como classificar em duas colunas:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o limit método. O exemplo a seguir exibe apenas os 10 principais resultados:

display(df_sorted.limit(10))

Junte-se a DataFrames

Para unir dois ou mais DataFrames, use o join método. Você pode especificar como gostaria que os DataFrames fossem unidos nos how parâmetros (o tipo de junção) e on (em quais colunas basear a junção). Os tipos comuns de junção incluem:

  • inner: Este é o padrão de tipo de junção, que retorna um DataFrame que mantém apenas as linhas onde há uma correspondência para o on parâmetro em todos os DataFrames.
  • left: Isso mantém todas as linhas do primeiro DataFrame especificado e apenas as linhas do segundo DataFrame especificado que têm uma correspondência com o primeiro.
  • outer: Uma junção externa mantém todas as linhas de ambos os DataFrames, independentemente da correspondência.

Para obter informações detalhadas sobre associações, consulte Trabalhar com associações no Azure Databricks. Para obter uma lista de junções suportadas no PySpark, consulte Associações DataFrame.

O exemplo a seguir retorna um único DataFrame onde cada linha do DataFrame é unida orders com a linha correspondente do customers DataFrame. Uma junção interna é usada, pois a expectativa é que cada pedido corresponda exatamente a um cliente.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Para unir em várias condições, use operadores booleanos como & e | para especificar AND e OR, respectivamente. O exemplo a seguir adiciona uma condição adicional, filtrando apenas para as linhas que têm o_totalprice maior que 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Dados agregados

Para agregar dados em um DataFrame, semelhante a um GROUP BY em SQL, use o groupBy método para especificar colunas para agrupar e o agg método para especificar agregações. Importar agregações comuns, incluindo avg, sum, max, e min de pyspark.sql.functions. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Algumas agregações são ações, o que significa que desencadeiam cálculos. Neste caso, você não precisa usar outras ações para produzir resultados.

Para contar linhas em um DataFrame, use o count método:

df_customer.count()

Encadeamento de chamadas

Os métodos que transformam DataFrames retornam DataFrames, e o Spark não age em transformações até que as ações sejam chamadas. Esta avaliação preguiçosa significa que você pode encadear vários métodos para conveniência e legibilidade. O exemplo a seguir mostra como encadear a filtragem, a agregação e a ordenação:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualize seu DataFrame

Para visualizar um DataFrame em um bloco de anotações, clique no + sinal ao lado da tabela no canto superior esquerdo do DataFrame e selecione Visualização para adicionar um ou mais gráficos com base no seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em blocos de anotações Databricks.

display(df_order)

Para executar visualizações adicionais, o Databricks recomenda o uso da API pandas para Spark. O .pandas_api() permite que você converta para a API pandas correspondente para um Spark DataFrame. Para obter mais informações, consulte API Pandas no Spark.

Guardar os dados

Depois de transformar seus dados, você pode salvá-los usando os DataFrameWriter métodos. Uma lista completa desses métodos pode ser encontrada em DataFrameWriter. As seções a seguir mostram como salvar seu DataFrame como uma tabela e como uma coleção de arquivos de dados.

Salve seu DataFrame como uma tabela

Para salvar seu DataFrame como uma tabela no Unity Catalog, use o write.saveAsTable método e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Escreva seu DataFrame como CSV

Para gravar seu DataFrame no *.csv formato, use o write.csv método, especificando o formato e as opções. Por padrão, se existirem dados no caminho especificado, a operação de gravação falhará. Você pode especificar um dos seguintes modos para executar uma ação diferente:

  • overwrite substitui todos os dados existentes no caminho de destino pelo conteúdo do DataFrame.
  • append acrescenta conteúdo do DataFrame aos dados no caminho de destino.
  • ignore Falha silenciosamente na gravação se existirem dados no caminho de destino.

O exemplo a seguir demonstra a substituição de dados com conteúdo DataFrame como arquivos CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Próximos passos

Para aproveitar mais recursos do Spark no Databricks, consulte: