Execute sua primeira carga de trabalho de Streaming Estruturado
Este artigo fornece exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Streaming Estruturado no Azure Databricks. Você pode usar o Streaming Estruturado para cargas de trabalho de processamento incrementais e quase em tempo real.
O Streaming Estruturado é uma das várias tecnologias que alimentam as mesas de streaming no Delta Live Tables. O Databricks recomenda o uso do Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Structured Streaming. Consulte O que é Delta Live Tables?.
Nota
Enquanto o Delta Live Tables fornece uma sintaxe ligeiramente modificada para declarar tabelas de streaming, a sintaxe geral para configurar leituras e transformações de streaming se aplica a todos os casos de uso de streaming no Azure Databricks. O Delta Live Tables também simplifica o streaming gerenciando informações de estado, metadados e várias configurações.
Use o Auto Loader para ler dados de streaming do armazenamento de objetos
O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que usa cloudFiles
para indicar formato e opções. A schemaLocation
opção permite a inferência e evolução do esquema. Cole o seguinte código em uma célula de bloco de anotações Databricks e execute a célula para criar um DataFrame de streaming chamado raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Como outras operações de leitura no Azure Databricks, a configuração de uma leitura de streaming não carrega dados. Você deve acionar uma ação nos dados antes que o fluxo comece.
Nota
Chamar display()
um DataFrame de streaming inicia um trabalho de streaming. Para a maioria dos casos de uso de Streaming Estruturado, a ação que aciona um fluxo deve ser gravar dados em um coletor. Consulte Considerações sobre produção para Streaming estruturado.
Executar uma transformação de streaming
O Streaming Estruturado dá suporte à maioria das transformações disponíveis no Azure Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de streaming como uma transformação.
O exemplo de código a seguir conclui uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando funções Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
O resultado transformed_df
contém instruções de consulta para carregar e transformar cada registro à medida que ele chega na fonte de dados.
Nota
O Streaming Estruturado trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Como tal, algumas transformações não são suportadas em cargas de trabalho de Streaming Estruturado porque exigiriam a classificação de um número infinito de itens.
A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar limites de processamento de dados.
Executar uma gravação em lote incremental no Delta Lake
O exemplo a seguir grava no Delta Lake usando um caminho de arquivo especificado e um ponto de verificação.
Importante
Certifique-se sempre de especificar um local de ponto de verificação exclusivo para cada gravador de streaming que você configurar. O ponto de verificação fornece a identidade exclusiva para seu fluxo, rastreando todos os registros processados e informações de estado associadas à sua consulta de streaming.
A availableNow
configuração para o gatilho instrui o Streaming Estruturado a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para que você possa executar com segurança o seguinte código sem se preocupar em deixar um fluxo em execução:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto, a execução repetida desse código não ingere novos registros.
Aviso
A execução do Streaming estruturado pode impedir que o encerramento automático desligue os recursos de computação. Para evitar custos inesperados, certifique-se de encerrar as consultas de streaming.
Leia dados do Delta Lake, transforme e grave no Delta Lake
A Delta Lake tem amplo suporte para trabalhar com Streaming Estruturado como fonte e coletor. Consulte Leituras e gravações de streaming de tabela Delta.
O exemplo a seguir mostra sintaxe de exemplo para carregar incrementalmente todos os novos registros de uma tabela Delta, juntá-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local de ponto de verificação especificado. Preencha todos os parâmetros indicados com colchetes angulares (<>
) usando os valores relevantes para suas fontes de dados e coletores.
Nota
Delta Live Tables fornece uma sintaxe totalmente declarativa para criar pipelines Delta Lake e gerencia propriedades como gatilhos e pontos de verificação automaticamente. Consulte O que é Delta Live Tables?.
Leia dados de Kafka, transforme e escreva em Kafka
O Apache Kafka e outros barramentos de mensagens fornecem algumas das menores latências disponíveis para grandes conjuntos de dados. Você pode usar o Azure Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar dados de volta no Kafka.
Nota
A gravação de dados no armazenamento de objetos na nuvem adiciona uma sobrecarga de latência adicional. Se você deseja armazenar dados de um barramento de mensagens no Delta Lake, mas requer a menor latência possível para cargas de trabalho de streaming, o Databricks recomenda configurar trabalhos de streaming separados para ingerir dados para o lakehouse e aplicar transformações quase em tempo real para coletores de barramento de mensagens downstream.
O exemplo de código a seguir demonstra um padrão simples para enriquecer dados de Kafka juntando-os com dados em uma tabela Delta e, em seguida, gravando de volta em Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Você deve ter as permissões adequadas configuradas para acessar seu serviço Kafka. Preencha todos os parâmetros indicados com colchetes angulares (<>
) usando os valores relevantes para suas fontes de dados e coletores. Consulte Processamento de fluxo com Apache Kafka e Azure Databricks.