Noções básicas do PySpark
Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que você entenda conceitos fundamentais do Apache Spark e esteja executando comandos em um notebook do Azure Databricks conectado à computação. Você cria DataFrames usando dados de exemplo, executa transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e, em seguida, salva-os em uma tabela ou arquivo.
Carregar dados
Alguns exemplos neste artigo usam dados de exemplo fornecidos pelo Databricks para demonstrar o uso de DataFrames para carregar, transformar e salvar dados. Se você quiser usar seus próprios dados que ainda não estão no Databricks, você pode carregá-los primeiro e criar um DataFrame a partir dele. Consulte Criar ou modificar uma tabela usando o upload de arquivos e Carregar arquivos em um volume do Catálogo do Unity.
Sobre dados de exemplo do Databricks
O Databricks fornece dados de exemplo no catálogo de samples
e no diretório /databricks-datasets
.
- Para acessar os dados de exemplo no catálogo
samples
, use o formatosamples.<schema-name>.<table-name>
. Este artigo usa tabelas no esquemasamples.tpch
, que contém dados de uma empresa fictícia. A tabelacustomer
contém informações sobre clientes eorders
contém informações sobre pedidos feitos por esses clientes. - Use
dbutils.fs.ls
para explorar dados em/databricks-datasets
. Use o SPARK SQL ou DataFrames para consultar dados neste local usando caminhos de arquivo. Para saber mais sobre os dados de exemplo fornecidos pelo Databricks, consulte Conjuntos de dados de exemplo.
Importar tipos de dados
Muitas operações do PySpark exigem que você use funções SQL ou interaja com tipos nativos do Spark. Você pode importar diretamente apenas as funções e os tipos necessários ou importar o módulo inteiro.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Como algumas funções importadas podem substituir funções internas do Python, alguns usuários optam por importar esses módulos usando um alias. Os exemplos a seguir mostram um alias comum usado em exemplos de código do Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Para obter uma lista abrangente de tipos de dados, consulte Tipos de Dados do Spark.
Para obter uma lista abrangente de funções SQL do PySpark, consulte Funções do Spark.
Criar um DataFrame
Há várias maneiras de criar um DataFrame. Normalmente, você define um DataFrame em relação a uma fonte de dados, como uma tabela ou coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display
, para disparar as transformações a serem executadas. O método display
gera DataFrames.
Criar um DataFrame com valores especificados
Para criar um DataFrame com valores especificados, use o método createDataFrame
, em que as linhas são expressas como uma lista de tuplas:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Observe na saída que os tipos de dados de colunas de df_children
são inferidos automaticamente. Como alternativa, você pode especificar os tipos adicionando um esquema. Esquemas são definidos usando o StructType
que é composto por StructFields
que especificam o nome, o tipo de dados e um sinalizador booliano indicando se eles contêm um valor nulo ou não. Você deve importar tipos de dados de pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Criar um DataFrame de uma tabela no Catálogo do Unity
Para criar um DataFrame de uma tabela no Catálogo do Unity, use o método table
que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>
. Clique no Catálogo na barra de navegação à esquerda para usar o Explorador do Catálogo para navegar até sua tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no notebook.
O exemplo a seguir carrega a tabela samples.tpch.customer
, mas você pode, como alternativa, fornecer o caminho para sua própria tabela.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Criar um DataFrame de um arquivo carregado
Para criar um DataFrame de um arquivo que você carregou em volumes do Catálogo do Unity, use a propriedade read
. Esse método retorna um DataFrameReader
, que você pode usar para ler o formato apropriado. Clique na opção de catálogo na barra lateral pequena à esquerda e use o navegador do catálogo para localizar o arquivo. Selecione-o e clique em Copiar caminho do arquivo de volume.
O exemplo a seguir lê de um arquivo *.csv
, mas DataFrameReader
dá suporte ao carregamento de arquivos em muitos outros formatos. Consulte os métodos do DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Para obter mais informações sobre volumes do Catálogo do Unity, consulte O que são volumes do Catálogo do Unity?.
Criar um DataFrame com base em uma resposta JSON
Para criar um DataFrame a partir de um conteúdo de resposta JSON retornado por uma API REST, use o pacote requests
do Python para consultar e analisar a resposta. Você deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de aplicativos de medicamentos da Food and Drug Administration dos Estados Unidos.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados no Databricks, consulte Modelo de dados semiestruturados.
Selecionar um campo ou objeto JSON
Para selecionar um campo ou objeto específico do JSON convertido, use a notação []
. Por exemplo, para selecionar o campo products
que é uma matriz de produtos:
display(df_drugs.select(df_drugs["products"]))
Você também pode encadear chamadas de método para percorrer vários campos. Por exemplo, para gerar o nome da marca do primeiro produto em um aplicativo de medicamentos:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Criar um DataFrame usando um arquivo
Para demonstrar a criação de um DataFrame de um arquivo, este exemplo carrega dados CSV no diretório /databricks-datasets
.
Para navegar até os conjuntos de dados de exemplo, você pode usar os comandos do sistema de arquivos Utilitários do Databricks. O exemplo a seguir usa dbutils
para listar os conjuntos de dados disponíveis no /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Como alternativa, você pode usar %fs
para acessar Comandos do sistema de arquivos da CLI do Databricks, conforme mostrado no exemplo a seguir:
%fs ls '/databricks-datasets'
Para criar um DataFrame de um arquivo ou diretório de arquivos, especifique o caminho no método load
:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformar dados com DataFrames
DataFrames facilitam a transformação de dados usando métodos internos para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no pacote spark.sql.functions
. Consulte Funções do Databricks Spark SQL.
- Operações de coluna
- Operações de linha
- Ingressar no DataFrames
- Agregação de dados
- Chamadas de encadeamento
Column operations
O Spark fornece muitas operações básicas de coluna:
Dica
Para gerar todas as colunas em um DataFrame, use columns
, por exemplo, df_customer.columns
.
Selecionar colunas
Você pode selecionar colunas específicas usando select
e col
. A função col
está no submódulo pyspark.sql.functions
.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Você também pode se referir a uma coluna usando expr
que usa uma expressão definida como uma cadeia de caracteres:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Você também pode usar selectExpr
, que aceita expressões SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Para selecionar explicitamente uma coluna de um DataFrame específico, você pode usar o operador []
ou .
. (O operador .
não pode ser usado para selecionar colunas começando com um inteiro ou as que contêm um espaço ou caractere especial.) Isso pode ser especialmente útil quando você estiver ingressando em DataFrames em que algumas colunas têm o mesmo nome.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Criar colunas
Para criar uma nova coluna, use o método withColumn
. O exemplo a seguir cria uma nova coluna que contém um valor booliano com base em se o saldo da conta do cliente c_acctbal
excede 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Renomear colunas
Para renomear uma coluna, use o método withColumnRenamed
, que aceita os nomes de coluna existentes e novos:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
O método alias
é especialmente útil quando você deseja renomear suas colunas como parte de agregações:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Converter tipos de coluna
Em alguns casos, talvez você queira alterar o tipo de dados para uma ou mais colunas em seu DataFrame. Para fazer isso, use o método cast
para converter entre tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um inteiro em um tipo de cadeia de caracteres, usando o método col
para fazer referência a uma coluna:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Remover colunas
Para remover colunas, você pode omitir colunas durante uma seleção select(*) except
ou pode usar o método drop
:
df_customer_flag_renamed.drop("balance_flag_renamed")
Você também pode remover várias colunas ao mesmo tempo:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Row operations
O Spark fornece muitas operações básicas de linha:
- Filtrar linhas
- Remover linhas duplicadas
- Manipular valores nulos
- Acrescentar linhas
- Classificar linhas
- Filtrar linhas
Filtrar linhas
Para filtrar linhas, use o método filter
ou where
em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o método col
ou uma expressão que seja avaliada como uma coluna.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Para filtrar em várias condições, use operadores lógicos. Por exemplo, &
e |
permitem que você AND
e OR
as condições, respectivamente. O exemplo a seguir filtra linhas em que c_nationkey
é igual a 20
e c_acctbal
é maior que 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Remover linhas duplicadas
Para desativar linhas, use distinct
, que retorna apenas as linhas exclusivas.
df_unique = df_customer.distinct()
Manipular valores nulos
Para manipular com valores nulos, solte linhas que contêm valores nulos usando o método na.drop
. Esse método permite que você especifique se deseja remover linhas que contêm valores nulos any
ou all
.
Para remover valores nulos, use um dos exemplos a seguir.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Se, em vez disso, você quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:
df_customer_no_nulls = df_customer.na.drop("all")
Você pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Para preencher valores ausentes, use o método fill
. Você pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo, os saldos de conta que têm um valor nulo para o saldo da conta c_acctbal
são preenchidos com 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Para substituir cadeias de caracteres por outros valores, use o método replace
. No exemplo abaixo, todas as cadeias de caracteres de endereço vazias são substituídas pela palavra UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Acrescentar linhas
Para acrescentar linhas, você precisa usar o método union
para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer
criado anteriormente e df_filtered_customer
são combinados, o que retorna um DataFrame com três clientes:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Observação
Você também pode combinar DataFrames gravando-os em uma tabela e acrescentando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados para uma tabela de destino pode reduzir drasticamente os custos de latência e computação à medida que os dados aumentam de tamanho. Veja Ingerir dados em um databricks lakehouse.
Classificar linhas
Importante
A classificação pode ser cara em escala e, se você armazenar dados classificados e recarregar os dados com o Spark, a ordem não será garantida. Verifique se você usa a classificação intencionalmente.
Para classificar linhas em uma ou mais colunas, use o método sort
ou orderBy
. Por padrão, esses métodos classificam em ordem crescente:
df_customer.orderBy(col("c_acctbal"))
Para filtrar em ordem decrescente, use desc
:
df_customer.sort(col("c_custkey").desc())
O exemplo a seguir mostra como classificar em duas colunas:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o método limit
. O exemplo a seguir exibe apenas os principais resultados de 10
:
display(df_sorted.limit(10))
Ingressar no DataFrames
Para unir dois ou mais DataFrames, use o método join
. Você pode especificar como deseja que os DataFrames sejam unidos nos parâmetros how
(o tipo de junção) e on
(em quais colunas basear a junção). Os tipos comuns de junção incluem:
inner
: esse é o padrão de tipo de junção, que retorna um DataFrame que mantém apenas as linhas em que há uma correspondência para o parâmetroon
entre DataFrames.left
: isso mantém todas as linhas do primeiro DataFrame especificado e apenas linhas do segundo DataFrame especificado que têm uma correspondência com a primeira.outer
: uma junção externa mantém todas as linhas de ambos os DataFrames, independentemente da correspondência.
Para obter informações detalhadas sobre junções, consulte Trabalhar com junções no Azure Databricks. Para obter uma lista de junções com suporte no PySpark, consulte Junções no DataFrame.
O exemplo a seguir retorna um único DataFrame em que cada linha do DataFrame orders
é unida à linha correspondente do DataFrame customers
. Uma junção interna é usada, pois a expectativa é que cada pedido corresponda a exatamente um cliente.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Para ingressar em várias condições, use operadores boolianos, como &
e |
, para especificar AND
e OR
, respectivamente. O exemplo a seguir inclui uma condição adicional, filtrando apenas as linhas que têm o_totalprice
maior que 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Agregação de dados
Para agregar dados em um DataFrame, semelhante a um GROUP BY
no SQL, use o método groupBy
para especificar colunas para agrupar e o método agg
para especificar agregações. Importar agregações comuns, incluindo avg
, sum
, max
e min
de pyspark.sql.functions
. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Algumas agregações são ações, o que significa que elas disparam cálculos. Nesse caso, você não precisa usar outras ações para gerar resultados.
Para contar linhas em um DataFrame, use o método count
:
df_customer.count()
Chamadas de encadeamento
Os métodos que transformam DataFrames retornam DataFrames e o Spark não atua em transformações até que as ações sejam chamadas. Essa avaliação lenta significa que você pode encadear vários métodos para conveniência e legibilidade. O exemplo a seguir mostra como encadear filtragem, agregação e ordenação:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualizar seus DataFrame
Para visualizar um DataFrame em um notebook, clique no sinal + ao lado da tabela no canto superior esquerdo do DataFrame e selecione Visualização para adicionar um ou mais gráficos com base em seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em notebooks do Databricks.
display(df_order)
Para executar visualizações adicionais, o Databricks recomenda o uso da API do Pandas para Spark. O .pandas_api()
permite que você converta para a API do Pandas correspondente para um DataFrame do Spark. Para obter mais informações, consulte API do Pandas no Spark.
Salvar seus dados
Depois de transformar seus dados, você pode salvá-los usando os métodos DataFrameWriter
. Uma lista completa desses métodos pode ser encontrada em DataFrameWriter. As seções a seguir mostram como salvar seu DataFrame como uma tabela e como uma coleção de arquivos de dados.
Salvar seu DataFrame como uma tabela
Para salvar o DataFrame como uma tabela no Catálogo do Unity, use o método write.saveAsTable
e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Gravar seu DataFrame como CSV
Para gravar seu DataFrame no formato *.csv
, use o método write.csv
, especificando o formato e as opções. Por padrão, se houver dados no caminho especificado, a operação de gravação falhará. Você pode especificar um dos seguintes modos para executar uma ação diferente:
overwrite
substitui todos os dados existentes no caminho de destino com o conteúdo do DataFrame.append
acrescenta o conteúdo do DataFrame aos dados no caminho de destino.ignore
falha silenciosamente na gravação se houver dados no caminho de destino.
O exemplo a seguir demonstra a substituição de dados com o conteúdo do DataFrame como arquivos CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Próximas etapas
Para aproveitar mais recursos do Spark no Databricks, confira: