Compartilhar via


Funções para definir conjuntos de dados

O módulo pyspark.pipelines (aqui apelidado como dp) implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta por streaming ou em modo batch e retorna um DataFrame do Apache Spark. A sintaxe a seguir mostra um exemplo simples para definir um conjunto de dados de pipeline:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Esta página fornece uma visão geral das funções e consultas que definem conjuntos de dados em pipelines. Para obter uma lista completa de decoradores disponíveis, consulte a referência do desenvolvedor do Pipeline.

As funções usadas para definir conjuntos de dados não devem incluir lógica arbitrária do Python não relacionada ao conjunto de dados, incluindo chamadas para APIs de terceiros. Os pipelines executam essas funções várias vezes durante o planejamento, a validação e as atualizações. Incluir lógica arbitrária pode levar a resultados inesperados.

Ler dados para iniciar uma definição de conjunto de dados

As funções usadas para definir conjuntos de dados de pipeline normalmente começam com uma operação spark.read ou spark.readStream. Essas operações de leitura retornam um objeto DataFrame estático ou de streaming que você usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações do Spark que retornam um DataFrame incluem spark.table, ou spark.range.

As funções nunca devem fazer referência a DataFrames definidos fora da função. Tentar fazer referência a DataFrames definidos em um escopo diferente pode resultar em um comportamento inesperado. Para obter um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um for loop.

Os exemplos a seguir mostram a sintaxe básica para ler dados usando a lógica de lote ou de streaming.

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Se você precisar ler dados de uma API REST externa, implemente essa conexão usando uma fonte de dados personalizada do Python. Consulte fontes de dados personalizadas do PySpark.

Observação

É possível criar DataFrames arbitrários do Apache Spark a partir de coleções de dados do Python, incluindo DataFrames pandas, ditados e listas. Esses padrões podem ser úteis durante o desenvolvimento e teste, mas a maioria das definições do conjunto de dados de pipeline de produção deve começar carregando dados de arquivos, um sistema externo ou uma tabela ou exibição existente.

Encadeamento de Transformações

Os pipelines dão suporte a quase todas as transformações do DataFrame do Apache Spark. Você pode incluir qualquer número de transformações em sua função de definição de conjunto de dados, mas deve garantir que os métodos usados sempre retornem um objeto DataFrame.

Se você tiver uma transformação intermediária que suporta várias cargas de trabalho subsequentes, mas não precisar materializá-la como uma tabela, use @dp.temporary_view() para adicionar uma visualização temporária ao pipeline. Em seguida, você pode referenciar essa exibição usando spark.read.table("temp_view_name") em várias definições de conjunto de dados a jusante. A sintaxe a seguir demonstra esse padrão:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Isso garante que o pipeline tenha total consciência das transformações na sua visualização durante o planejamento do pipeline e impede possíveis problemas relacionados à execução de código Python arbitrário fora das configurações do conjunto de dados.

Em sua função, você pode encadear DataFrames para criar novos DataFrames sem gravar resultados incrementais como visões, visões materializadas ou tabelas de streaming, como no exemplo a seguir:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Se todos os DataFrames executarem suas leituras iniciais usando a lógica de lote, o resultado de retorno será um DataFrame estático. Se você tiver alguma consulta em streaming, seu resultado será um DataFrame de streaming.

Retornar um DataFrame

Use @dp.table para criar uma tabela de streaming com base nos resultados de uma leitura de streaming. Use @dp.materialized_view para criar uma exibição materializada com base nos resultados de uma leitura em lote. A maioria dos outros decoradores trabalha em dataFrames estáticos e streaming, enquanto alguns exigem um DataFrame de streaming.

A função usada para definir um conjunto de dados deve retornar um DataFrame do Spark. Nunca use métodos que salvem ou escrevam em arquivos ou tabelas como parte do código do conjunto de dados do pipeline.

Exemplos de operações do Apache Spark que nunca devem ser usadas no código do pipeline:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Observação

Os pipelines também dão suporte ao uso do Pandas no Spark para funções de definição de conjuntos de dados. Consulte a API do Pandas no Spark.

Usar o SQL em um pipeline do Python

O PySpark dá suporte ao operador spark.sql para escrever código DataFrame usando SQL. Quando você usa esse padrão no código-fonte do pipeline, ele é convertido para visões materializadas ou tabelas de streaming.

O exemplo de código a seguir é equivalente a usar spark.read.table("catalog_name.schema_name.table_name") para a lógica de consulta do conjunto de dados:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read e dlt.read_stream (herdado)

O módulo mais antigo dlt inclui funções dlt.read() e dlt.read_stream() que foram introduzidas para dar suporte à funcionalidade no modo legado de publicação de pipeline. Esses métodos são suportados, mas o Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() pelas seguintes razões:

  • As dlt funções têm suporte limitado para ler conjuntos de dados definidos fora do pipeline atual.
  • As spark funções dão suporte à especificação de opções, como skipChangeCommitsoperações de leitura. Não há suporte para a especificação de opções pelas dlt funções.
  • O dlt módulo foi substituído pelo pyspark.pipelines módulo. O Databricks recomenda usar from pyspark import pipelines as dp para importar pyspark.pipelines ao escrever código de pipelines em Python.