Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O pyspark.pipelines módulo (aqui conhecido como dp) implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta, sendo esta de streaming ou em lote, e retorna um DataFrame do Apache Spark. A sintaxe a seguir mostra um exemplo simples para definir um conjunto de dados de pipeline:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Esta página fornece uma visão geral das funções e consultas que definem conjuntos de dados em pipelines. Para obter uma lista completa dos decoradores disponíveis, consulte Referência do desenvolvedor Pipeline.
As funções que você usa para definir conjuntos de dados não devem incluir lógica Python arbitrária não relacionada ao conjunto de dados, incluindo chamadas para APIs de terceiros. Os pipelines executam essas funções várias vezes durante o planejamento, a validação e as atualizações. Incluir lógica arbitrária pode levar a resultados inesperados.
Ler dados para iniciar uma definição de conjunto de dados
As funções utilizadas para definir datasets de pipeline normalmente começam com uma operação spark.read ou spark.readStream. Essas operações de leitura retornam um objeto DataFrame estático ou de streaming que você usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações de faísca que retornam um DataFrame incluem spark.table, ou spark.range.
As funções nunca devem fazer referência a DataFrames definidos fora da função. A tentativa de fazer referência a DataFrames definidos em um escopo diferente pode resultar em um comportamento inesperado. Para obter um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um for loop.
Os exemplos a seguir mostram a sintaxe básica para ler dados usando lógica de lote ou streaming:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Se você precisar ler dados de uma API REST externa, implemente essa conexão usando uma fonte de dados personalizada do Python. Consulte Fontes de dados personalizadas do PySpark.
Observação
É possível criar DataFrames Apache Spark arbitrários a partir de coleções Python de dados, incluindo pandas DataFrames, dicts e listas. Esses padrões podem ser úteis durante o desenvolvimento e o teste, mas a maioria das definições de conjunto de dados do pipeline de produção deve começar carregando dados de arquivos, de um sistema externo ou de uma tabela ou exibição existente.
Encadeamento de transformações
Os pipelines suportam quase todas as transformações do Apache Spark DataFrame. Você pode incluir qualquer número de transformações em sua função de definição de conjunto de dados, mas deve garantir que os métodos usados sempre retornem um objeto DataFrame.
Se tiver uma transformação intermédia que gere várias cargas de trabalho downstream, mas não precisar de a materializar como uma tabela, use @dp.temporary_view() para adicionar uma vista temporária ao seu pipeline. Em seguida, pode referenciar esta exibição usando spark.read.table("temp_view_name") em várias definições de conjunto de dados downstream. A sintaxe a seguir demonstra esse padrão:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Isso garante que o pipeline tenha total consciência das transformações em sua exibição durante o planejamento do pipeline e evita possíveis problemas relacionados a código Python arbitrário em execução fora das definições de conjunto de dados.
Dentro da sua função, pode encadear DataFrames para criar novos DataFrames sem escrever resultados incrementais como visões, visões materializadas ou tabelas de fluxo contínuo, como no exemplo a seguir:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Se todos os seus DataFrames executarem suas leituras iniciais usando lógica de lote, seu resultado de retorno será um DataFrame estático. Se tiveres consultas que estejam em fluxo, o teu resultado de retorno é um DataFrame de streaming.
Retornar um DataFrame
Use @dp.table para criar uma tabela de streaming a partir dos resultados de uma leitura de streaming. Use @dp.materialized_view para criar uma exibição materializada a partir dos resultados de uma leitura em lote. A maioria dos outros decoradores trabalha em DataFrames estáticos e de streaming, enquanto alguns exigem um DataFrame de streaming.
A função usada para definir um conjunto de dados deve retornar um Spark DataFrame. Nunca use métodos que salvem ou gravem em arquivos ou tabelas como parte do código do conjunto de dados do pipeline.
Exemplos de operações do Apache Spark que nunca devem ser usadas no código de pipeline:
collect()count()toPandas()save()saveAsTable()start()toTable()
Observação
Os pipelines também suportam o uso do Pandas no Spark para funções de definição de conjunto de dados. Consulte a API do Pandas no Spark.
Usar SQL em um pipeline Python
O PySpark suporta o spark.sql operador para escrever código DataFrame usando SQL. Quando você usa esse padrão no código-fonte do pipeline, ele é compilado para exibições materializadas ou tabelas de streaming.
O exemplo de código a seguir é equivalente ao uso spark.read.table("catalog_name.schema_name.table_name") para a lógica de consulta do conjunto de dados:
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read e dlt.read_stream (legado)
O módulo dlt mais antigo inclui as funções dlt.read() e dlt.read_stream() que foram introduzidas para suportar a funcionalidade no modo de publicação de pipeline herdado. Esses métodos são suportados, mas o Databricks recomenda sempre usar as spark.read.table() funções e spark.readStream.table() devido ao seguinte:
- As
dltfunções têm suporte limitado para leitura de conjuntos de dados definidos fora do pipeline atual. - As
sparkfunções suportam a especificação de opções, comoskipChangeCommits, para ler operações. Especificar opções não é suportado pelas funçõesdlt. - O
dltmódulo foi substituído pelopyspark.pipelinesmódulo. O Databricks recomenda o usofrom pyspark import pipelines as dppara importarpyspark.pipelinespara uso ao escrever código de pipelines em Python.