Partilhar via


Configurar o modo em tempo real

Esta página descreve os pré-requisitos e configurações necessários para executar consultas em modo em tempo real no Structured Streaming. Para um tutorial passo a passo, veja Tutorial: Executar uma carga de trabalho de streaming em tempo real. Para informações conceptuais sobre o modo em tempo real, veja Modo em tempo real em Streaming Estruturado.

Pré-requisitos

Para usar o modo em tempo real, deve configurar o seu cálculo para cumprir os seguintes requisitos:

  • Use o modo de acesso dedicado no Classic Compute. O modo de acesso padrão, os Lakeflow Spark Declarative Pipelines e clusters serverless não são suportados.
  • Use Databricks Runtime 16.4 LTS e superiores.
  • Desligue o autoscaling.
  • Desliga o Photon.
  • Defina spark.databricks.streaming.realTimeMode.enabled como true.
  • Desligue as instâncias pontuais para evitar interrupções.

Para instruções sobre como criar e configurar computação clássica, veja Referência de configuração de Computação.

Configuração da consulta

Para executar uma consulta em modo em tempo real, deve ativar o gatilho em tempo real. Os gatilhos em tempo real são suportados apenas em modo de atualização.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento computacional

Podes executar um trabalho em tempo real por recurso de computação se o cálculo tiver espaços suficientes para tarefas.

Para ser executado no modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de faixas horárias

Tipo de pipeline Configuração Slots obrigatórios
Sem estado de estágio único (fonte + sumidouro Kafka) maxPartitions = 8 8 encaixes
Com estado em dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de shuffle = 20 28 ranhuras (8 + 20)
Três estágios (origem Kafka + shuffle + repartição) maxPartitions = 8, dois estágios de embaralhamento de 20 cada 48 slots (8 + 20 + 20)

Se não definires maxPartitions, usa o número de partições no tópico Kafka.

Recursos adicionais