Condividi tramite


Configurare la modalità in tempo reale

Questa pagina descrive i prerequisiti e la configurazione necessari per eseguire query in modalità in tempo reale in Structured Streaming. Per un'esercitazione dettagliata, vedere Esercitazione: Eseguire un carico di lavoro di streaming in tempo reale. Per informazioni concettuali sulla modalità in tempo reale, vedere Modalità in tempo reale in Structured Streaming.

Prerequisiti

Per usare la modalità in tempo reale, è necessario configurare il calcolo per soddisfare i requisiti seguenti:

  • Usare la modalità di accesso dedicato nel calcolo classico. La modalità di accesso standard, le pipeline dichiarative di Lakeflow Spark e i cluster serverless non sono supportati.
  • Usare Databricks Runtime 16.4 LTS e versioni successive.
  • Disattiva la scalabilità automatica.
  • Disattiva Photon.
  • Impostare spark.databricks.streaming.realTimeMode.enabled su true.
  • Disattivare le istanze spot per evitare interruzioni.

Per istruzioni sulla creazione e la configurazione del calcolo classico, vedere Informazioni di riferimento sulla configurazione di calcolo.

Configurazione delle query

Per eseguire una query in modalità in tempo reale, è necessario abilitare il trigger in tempo reale. I trigger in tempo reale sono supportati solo in modalità di aggiornamento.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento delle risorse di calcolo

È possibile eseguire un processo in tempo reale per ogni risorsa di calcolo se il calcolo dispone di slot di attività sufficienti.

Per l'esecuzione in modalità a bassa latenza, il numero totale di slot di attività disponibili deve essere maggiore o uguale al numero di attività in tutte le fasi della query.

Esempi di calcolo degli slot

Tipo di pipeline Configurazione Slot obbligatori
Senza stato a fase singola (origine Kafka + sink) maxPartitions = 8 8 slot
Con stato a due fasi (origine Kafka + shuffle) maxPartitions = 8, partizioni shuffle = 20 28 slot (8 + 20)
Tre fasi (di origine di Kafka + rimescolamento + ripartizione) maxPartitions = 8, due fasi di mescolamento di 20 ciascuno 48 slot (8 + 20 + 20)

Se non si imposta maxPartitions, usare il numero di partizioni nel topic Kafka.

Risorse aggiuntive