Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página descreve os pré-requisitos e configurações necessários para executar consultas em modo em tempo real no Structured Streaming. Para um tutorial passo a passo, veja Tutorial: Executar uma carga de trabalho de streaming em tempo real. Para informações conceptuais sobre o modo em tempo real, veja Modo em tempo real em Streaming Estruturado.
Pré-requisitos
Para usar o modo em tempo real, deve configurar o seu cálculo para cumprir os seguintes requisitos:
- Use o modo de acesso dedicado no Classic Compute. O modo de acesso padrão, os Lakeflow Spark Declarative Pipelines e clusters serverless não são suportados.
- Use Databricks Runtime 16.4 LTS e superiores.
- Desligue o autoscaling.
- Desliga o Photon.
- Defina
spark.databricks.streaming.realTimeMode.enabledcomotrue. - Desligue as instâncias pontuais para evitar interrupções.
Para instruções sobre como criar e configurar computação clássica, veja Referência de configuração de Computação.
Configuração da consulta
Para executar uma consulta em modo em tempo real, deve ativar o gatilho em tempo real. Os gatilhos em tempo real são suportados apenas em modo de atualização.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionamento computacional
Podes executar um trabalho em tempo real por recurso de computação se o cálculo tiver espaços suficientes para tarefas.
Para ser executado no modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.
Exemplos de cálculo de faixas horárias
| Tipo de pipeline | Configuração | Slots obrigatórios |
|---|---|---|
| Sem estado de estágio único (fonte + sumidouro Kafka) |
maxPartitions = 8 |
8 encaixes |
| Com estado em dois estágios (fonte Kafka + embaralhamento) |
maxPartitions = 8, partições de shuffle = 20 |
28 ranhuras (8 + 20) |
| Três estágios (origem Kafka + shuffle + repartição) |
maxPartitions = 8, dois estágios de embaralhamento de 20 cada |
48 slots (8 + 20 + 20) |
Se não definires maxPartitions, usa o número de partições no tópico Kafka.