Partilhar via


Execute sua primeira carga de trabalho de Streaming Estruturado

Este artigo fornece exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Streaming Estruturado no Azure Databricks. Você pode usar o Streaming Estruturado para cargas de trabalho de processamento incrementais e quase em tempo real.

O Streaming Estruturado é uma das várias tecnologias que alimentam as mesas de streaming no Delta Live Tables. O Databricks recomenda o uso do Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Structured Streaming. Consulte O que é Delta Live Tables?.

Nota

Enquanto o Delta Live Tables fornece uma sintaxe ligeiramente modificada para declarar tabelas de streaming, a sintaxe geral para configurar leituras e transformações de streaming se aplica a todos os casos de uso de streaming no Azure Databricks. O Delta Live Tables também simplifica o streaming gerenciando informações de estado, metadados e várias configurações.

Use o Auto Loader para ler dados de streaming do armazenamento de objetos

O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que usa cloudFiles para indicar formato e opções. A schemaLocation opção permite a inferência e evolução do esquema. Cole o seguinte código em uma célula de bloco de anotações Databricks e execute a célula para criar um DataFrame de streaming chamado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Como outras operações de leitura no Azure Databricks, a configuração de uma leitura de streaming não carrega dados. Você deve acionar uma ação nos dados antes que o fluxo comece.

Nota

Chamar display() um DataFrame de streaming inicia um trabalho de streaming. Para a maioria dos casos de uso de Streaming Estruturado, a ação que aciona um fluxo deve ser gravar dados em um coletor. Consulte Considerações sobre produção para Streaming estruturado.

Executar uma transformação de streaming

O Streaming Estruturado dá suporte à maioria das transformações disponíveis no Azure Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de streaming como uma transformação.

O exemplo de código a seguir conclui uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando funções Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

O resultado transformed_df contém instruções de consulta para carregar e transformar cada registro à medida que ele chega na fonte de dados.

Nota

O Streaming Estruturado trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Como tal, algumas transformações não são suportadas em cargas de trabalho de Streaming Estruturado porque exigiriam a classificação de um número infinito de itens.

A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar limites de processamento de dados.

Executar uma gravação em lote incremental no Delta Lake

O exemplo a seguir grava no Delta Lake usando um caminho de arquivo especificado e um ponto de verificação.

Importante

Certifique-se sempre de especificar um local de ponto de verificação exclusivo para cada gravador de streaming que você configurar. O ponto de verificação fornece a identidade exclusiva para seu fluxo, rastreando todos os registros processados e informações de estado associadas à sua consulta de streaming.

A availableNow configuração para o gatilho instrui o Streaming Estruturado a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para que você possa executar com segurança o seguinte código sem se preocupar em deixar um fluxo em execução:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto, a execução repetida desse código não ingere novos registros.

Aviso

A execução do Streaming estruturado pode impedir que o encerramento automático desligue os recursos de computação. Para evitar custos inesperados, certifique-se de encerrar as consultas de streaming.

Leia dados do Delta Lake, transforme e grave no Delta Lake

A Delta Lake tem amplo suporte para trabalhar com Streaming Estruturado como fonte e coletor. Consulte Leituras e gravações de streaming de tabela Delta.

O exemplo a seguir mostra sintaxe de exemplo para carregar incrementalmente todos os novos registros de uma tabela Delta, juntá-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local de ponto de verificação especificado. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores.

Nota

Delta Live Tables fornece uma sintaxe totalmente declarativa para criar pipelines Delta Lake e gerencia propriedades como gatilhos e pontos de verificação automaticamente. Consulte O que é Delta Live Tables?.

Leia dados de Kafka, transforme e escreva em Kafka

O Apache Kafka e outros barramentos de mensagens fornecem algumas das menores latências disponíveis para grandes conjuntos de dados. Você pode usar o Azure Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar dados de volta no Kafka.

Nota

A gravação de dados no armazenamento de objetos na nuvem adiciona uma sobrecarga de latência adicional. Se você deseja armazenar dados de um barramento de mensagens no Delta Lake, mas requer a menor latência possível para cargas de trabalho de streaming, o Databricks recomenda configurar trabalhos de streaming separados para ingerir dados para o lakehouse e aplicar transformações quase em tempo real para coletores de barramento de mensagens downstream.

O exemplo de código a seguir demonstra um padrão simples para enriquecer dados de Kafka juntando-os com dados em uma tabela Delta e, em seguida, gravando de volta em Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Você deve ter as permissões adequadas configuradas para acessar seu serviço Kafka. Preencha todos os parâmetros indicados com colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores. Consulte Processamento de fluxo com Apache Kafka e Azure Databricks.