Partilhar via


Desenvolva código de pipeline com Python

Lakeflow Spark Declarative Pipelines (SDP) introduz várias novas construções de código Python para definir visualizações materializadas e tabelas de streaming em pipelines. O suporte Python para o desenvolvimento de pipelines baseia-se nos conceitos básicos do PySpark DataFrame e APIs de Streaming Estruturado.

Para usuários não familiarizados com Python e DataFrames, o Databricks recomenda o uso da interface SQL. Veja Desenvolver código Lakeflow Spark Declarative Pipelines com SQL.

Para obter uma referência completa da sintaxe Python do Lakeflow SDP, consulte Referência da linguagem Python Lakeflow Spark Declarative Pipelines.

Noções básicas de Python para desenvolvimento de pipeline

O código Python que cria conjuntos de dados pipline deve retornar DataFrames.

Todas as APIs Python do Lakeflow Spark Declarative Pipelines são implementadas no pyspark.pipelines módulo. O código de pipeline implementado em Python deve importar explicitamente o módulo pipelines à parte superior do código-fonte Python. Em nossos exemplos, usamos o seguinte comando import e usamos dp em exemplos para nos referirmos a pipelines.

from pyspark import pipelines as dp

Observação

O Apache Spark™ inclui pipelines declarativos a partir do Spark 4.1, disponíveis através do pyspark.pipelines módulo. O Databricks Runtime estende esses recursos de código aberto com APIs e integrações adicionais para uso gerenciado de produção.

O código escrito com o módulo de código pipelines aberto é executado sem modificação no Azure Databricks. Os seguintes recursos não fazem parte do Apache Spark:

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)
  • @dp.temporary_view

O pipeline, por padrão, lê e grava no catálogo e no esquema especificados durante a configuração do pipeline. Consulte Defina o catálogo de destino e o esquema.

O código Python específico do pipeline difere de outros tipos de código Python de uma maneira crítica: o código de pipeline Python não chama diretamente as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados. Em vez disso, o SDP interpreta as funções do decorador do dp módulo em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, consulte a referência de Python .

Crie uma visualização materializada ou uma tabela de streaming com Python

Use @dp.table para criar uma tabela de streaming a partir dos resultados de uma leitura de streaming. Use @dp.materialized_view para criar uma exibição materializada a partir dos resultados de uma leitura em lote.

Por padrão, os nomes de tabela de exibição materializada e streaming são inferidos a partir de nomes de funções. O exemplo de código a seguir mostra a sintaxe básica para criar uma exibição materializada e uma tabela de streaming:

Observação

Ambas as funções fazem referência à mesma tabela no catálogo de samples e usam a mesma função decoradora. Esses exemplos destacam que a única diferença na sintaxe básica para exibições materializadas e tabelas de streaming é usar spark.read versus spark.readStream.

Nem todas as fontes de dados suportam leituras de streaming. Algumas fontes de dados devem ser sempre processadas com semântica de streaming.

from pyspark import pipelines as dp

@dp.materialized_view()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, você pode especificar o nome da tabela usando o argumento name no decorador de @dp.table. O exemplo a seguir demonstra esse padrão para uma exibição materializada e uma tabela de streaming:

from pyspark import pipelines as dp

@dp.materialized_view(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carregar dados do armazenamento de objetos

Os pipelines suportam o carregamento de dados de todos os formatos suportados pelo Azure Databricks. Veja Opções de formato de dados.

Observação

Estes exemplos usam dados disponíveis no diretório /databricks-datasets, montados automaticamente no seu espaço de trabalho. O Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência a dados armazenados no armazenamento de objetos em nuvem. Consulte Quais são os volumes do Catálogo Unity?.

O Databricks recomenda o uso do Auto Loader e de tabelas de streaming ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Veja O que é o Auto Loader?.

O exemplo a seguir cria uma tabela de streaming a partir de arquivos JSON usando o Auto Loader:

from pyspark import pipelines as dp

@dp.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

O exemplo a seguir usa semântica em lote para ler um diretório JSON e criar uma exibição materializada:

from pyspark import pipelines as dp

@dp.materialized_view()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Valide dados com expectativas

Você pode usar as expectativas para definir e impor restrições de qualidade de dados. Consulte Gerir a qualidade dos dados com as expectativas do fluxo de dados.

O código a seguir usa @dp.expect_or_drop para definir uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

from pyspark import pipelines as dp

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consultar visões materializadas e tabelas de streaming definidas no seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de streaming chamada orders que carrega dados JSON.
  • Uma exibição materializada chamada customers que carrega dados CSV.
  • Uma vista materializada chamada customer_orders que une registos dos conjuntos de dados orders e customers, converte o carimbo de data/hora da encomenda numa data e seleciona os campos customer_id, order_number, statee order_date.
  • Uma vista materializada denominada daily_orders_by_state que agrega o número diário de pedidos em cada estado.

Observação

Ao consultar exibições ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar os padrões configurados em seu pipeline. Neste exemplo, as tabelas orders, customerse customer_orders são escritas e lidas do catálogo e esquema padrão configurados para seu pipeline.

O modo de publicação herdado usa o esquema LIVE para consultar outras exibições materializadas e tabelas de streaming definidas em seu pipeline. Em novos pipelines, a sintaxe do esquema LIVE é silenciosamente ignorada. Ver LIVE schema (legacy).

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dp.materialized_view()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dp.materialized_view()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dp.materialized_view()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Criar tabelas em um for loop

Você pode usar loops Python for para criar várias tabelas programaticamente. Isso pode ser útil quando você tem muitas fontes de dados ou conjuntos de dados de destino que variam em apenas alguns parâmetros, resultando em menos código total para manter e menos redundância de código.

O ciclo for avalia a lógica em ordem sequencial, mas, uma vez concluído o planeamento dos conjuntos de dados, o pipeline executa a lógica em paralelo.

Importante

Ao usar esse padrão para definir conjuntos de dados, certifique-se de que a lista de valores passados para o loop for seja sempre aditiva. Se um conjunto de dados definido anteriormente em um pipeline for omitido de uma execução de pipeline futura, esse conjunto de dados será descartado automaticamente do esquema de destino.

O exemplo a seguir cria cinco tabelas que filtram pedidos de clientes por região. Aqui, o nome da região é usado para definir o nome das exibições materializadas de destino e filtrar os dados de origem. As visualizações temporárias são usadas para definir junções das tabelas de origem usadas na construção das exibições materializadas finais.

from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col

@dp.temporary_view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dp.temporary_view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Segue-se um exemplo do gráfico de fluxo de dados para este pipeline:

Um gráfico de fluxo de dados de duas visualizações que levam a cinco tabelas regionais.

Solução de problemas: for loop cria muitas tabelas com os mesmos valores

O modelo de execução lento que os pipelines usam para avaliar o código Python requer que sua lógica faça referência direta a valores individuais quando a função decorada por @dp.materialized_view() é invocada.

O exemplo a seguir demonstra duas abordagens corretas para definir tabelas com um loop for. Em ambos os exemplos, cada nome de tabela da lista de tables é explicitamente referenciado dentro da função decorada por @dp.materialized_view().

from pyspark import pipelines as dp

# Create a parent function to set local variables

def create_table(table_name):
  @dp.materialized_view(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dp.materialized_view()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized_view(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

O exemplo a seguir não faz referência aos valores corretamente. Este exemplo cria tabelas com nomes distintos, mas todas as tabelas carregam dados do último valor no loop for:

from pyspark import pipelines as dp

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Apagar permanentemente registos de uma visualização materializada ou de uma tabela de streaming

Para excluir permanentemente registros de uma exibição materializada ou tabela de streaming com vetores de exclusão habilitados, como para conformidade com o GDPR, operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma exibição materializada, consulte Excluir permanentemente registros de uma exibição materializada com vetores de exclusão habilitados. Para garantir a exclusão de registros de uma tabela de streaming, consulte Excluir permanentemente registros de uma tabela de streaming.