Executar sua primeira carga de trabalho de Fluxo Estruturado
Este artigo fornece exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Fluxo Estruturado no Azure Databricks. Você pode usar o Fluxo Estruturado para cargas de trabalho de processamento incrementais e em tempo quase real.
O Fluxo Estruturado é uma das diversas tecnologias que alimentam fluxos de tabelas no Delta Live Tables. O Databricks recomenda o uso do Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Fluxo Estruturado. ConsulteO que é o Delta Live Tables?.
Observação
Embora o Delta Live Tables forneça uma sintaxe ligeiramente modificada para declarar fluxos de tabelas, a sintaxe geral para configurar fluxos de leituras e transformações se aplica a todos os casos de uso de fluxos no Azure Databricks. O Delta Live Tables também simplifica o fluxo gerenciando informações de estado, metadados e numerosas configurações.
Usar o Auto Loader para ler dados do fluxo do armazenamento de objetos
O exemplo a seguir demonstra como carregar dados JSON com o Auto Loader, que usa cloudFiles
para denotar o formato e as opções. A opção schemaLocation
habilita a inferência e a evolução do esquema. Cole o seguinte código em uma célula de notebook do Databricks e execute a célula para criar um fluxo de DataFrame chamado raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Como ocorre com outras operações de leitura no Azure Databricks, configurar um fluxo de leitura na verdade não carrega dados. Você precisa disparar uma ação nos dados antes do início do fluxo.
Observação
Chamar display()
em um fluxo do DataFrame inicia um fluxo de trabalho. Para a maioria dos casos de uso de Fluxo Estruturado, a ação que dispara um fluxo deve estar gravando dados em um coletor. Confira Considerações de produção para o Streaming estruturado.
Executar um fluxo de transformação
O Fluxo Estruturado dá suporte à maioria das transformações que estão disponíveis no Azure Databricks e no SQL do Spark. Você pode até carregar modelos do MLflow como UDFs e fazer previsões do fluxo como uma transformação.
O exemplo de código a seguir executa uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando funções SQL do Spark:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
O resultado transformed_df
contém instruções de consulta para carregar e transformar cada registro à medida que chega à fonte de dados.
Observação
O Fluxo Estruturado trata as fontes de dados como conjuntos de dados não vinculados ou infinitos. Dessa forma, não há suporte para algumas transformações nas cargas de trabalho de Fluxo Estruturado porque exigiriam a classificação de um número infinito de itens.
A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Confira Aplicar marcas d'água para controlar os limites do processamento de dados
Executar uma gravação em lote incremental no Delta Lake
O exemplo a seguir grava no Delta Lake usando um caminho de arquivo e um ponto de verificação especificados.
Importante
Sempre se certifique de especificar um local de ponto de verificação exclusivo para cada fluxo de gravador que você configurar. O ponto de verificação fornece a identidade exclusiva para o seu fluxo, acompanhando todos os registros processados e informações de estado associadas ao fluxo da consulta.
A configuração availableNow
do gatilho instrui o Fluxo Estruturado a processar todos os registros não processados anteriormente do conjunto de dados de origem e a seguir desligar, para que você possa executar com segurança o código seguinte sem se preocupar em deixar um fluxo em execução:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Nesse exemplo, nenhum registro novo chega à nossa fonte de dados, então repetir a execução desse código não ingere novos registros.
Aviso
A execução do Fluxo Estruturado pode impedir que a terminação automática desligue os recursos de computação. Para evitar custos inesperados, certifique-se de encerrar os fluxos de consultas.
Ler dados do Delta Lake, transformar e gravar no Delta Lake
O Delta Lake tem um amplo suporte para o trabalho com o Fluxo Estruturado, tanto como uma fonte quanto como um coletor. Consulte Streaming de tabela Delta lê e grava.
O exemplo a seguir mostra um exemplo de sintaxe para carregar incrementalmente todos os novos registros de uma tabela Delta, fazer sua junção com um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Você precisa ter as permissões adequadas configuradas para ler as tabelas de origem e gravar em tabelas de destino e no local de ponto de verificação especificado. Preencha todos os parâmetros indicados com sinais de maior que e menor que (<>
), usando os valores relevantes para seus coletores e fontes de dados.
Observação
O Delta Live Tables fornece uma sintaxe totalmente declarativa para criar pipelines do Delta Lake e gerencia automaticamente propriedades como gatilhos e pontos de verificação. ConsulteO que é o Delta Live Tables?.
Ler dados do Kafka, transformar e gravar no Kafka
O Apache Kafka e outros barramentos de mensagens proporcionam algumas das menores latências disponíveis para grandes conjuntos de dados. Você pode usar o Azure Databricks para aplicar transformações aos dados ingeridos do Kafka e, a seguir, gravar os dados no Kafka novamente.
Observação
Gravar dados no armazenamento de objetos na nuvem adiciona uma sobrecarga de latência extra. Se você quiser armazenar dados de um barramento de mensagens no Delta Lake, mas precisar da menor latência possível para fluxos de cargas de trabalho, o Databricks recomenda configurar fluxos de trabalho separados para ingerir dados no lakehouse e aplicar transformações em tempo quase real para coletores do barramento de mensagens downstream.
O exemplo de código a seguir demonstra um padrão simples para enriquecer dados do Kafka ao fazer sua junção com os dados de uma tabela Delta e, a seguir, os gravando novamente no Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Você precisa ter permissões adequadas configuradas para o acesso ao seu serviço do Kafka. Preencha todos os parâmetros indicados com sinais de maior que e menor que (<>
), usando os valores relevantes para seus coletores e fontes de dados. Confira Processamento de fluxos com o Apache Kafka e o Azure Databricks.