Delen via


Realtimemodus instellen

Op deze pagina worden de vereisten en configuratie beschreven die nodig zijn voor het uitvoeren van realtimemodusquery's in Structured Streaming. Zie Zelfstudie: Een realtime streamingworkload uitvoeren voor een stapsgewijze zelfstudie. Zie de realtime-modus in Structured Streaming voor conceptuele informatie over de realtimemodus.

Voorwaarden

Als u de realtimemodus wilt gebruiken, moet u uw rekenproces configureren om te voldoen aan de volgende vereisten:

  • Gebruik de toegewezen toegangsmodus op klassieke berekeningen. Standaardtoegangsmodus, Lakeflow Spark-declaratieve pijplijnen en serverloze clusters worden niet ondersteund.
  • Gebruik Databricks Runtime 16.4 LTS en hoger.
  • Schakel automatisch schalen uit.
  • Schakel Photon uit.
  • Stel spark.databricks.streaming.realTimeMode.enabled in op true.
  • Schakel spot-exemplaren uit om onderbrekingen te voorkomen.

Zie de referentie voor compute-configuratie voor instructies over het maken en configureren van klassieke berekeningen.

Queryconfiguratie

Als u een query in realtime wilt uitvoeren, moet u de realtime-trigger inschakelen. Realtime-triggers worden alleen ondersteund in de updatemodus.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Berekening van de computergrootte

U kunt één realtime taak per computing resource uitvoeren als er voldoende taaksloten beschikbaar zijn.

Als u in lage-latentiemodus wilt werken, moet het totale aantal beschikbare taakslots groter zijn dan of gelijk zijn aan het aantal taken over alle querystadia heen.

Voorbeelden van sleufberekeningen

Pijplijntype Configuratie Vereiste aansluitingen
Staatloos met één fase (Kafka-bron + sink) maxPartitions = 8 8 sleuven
Twee-fasen toestandsafhankelijk systeem (Kafka-bron + shuffle) maxPartitions = 8, partities verdelen = 20 28 sleuven (8 + 20)
Drie fasen (Kafka-bron + shuffle + repartition) maxPartitions = 8, twee willekeurige fasen van elk 20 48 sleuven (8 + 20 + 20)

Als u maxPartitions niet instelt, gebruikt u het aantal partities van het Kafka-topic.

Aanvullende bronnen