Partager via


Configurer le mode en temps réel

Cette page décrit les prérequis et la configuration nécessaires pour exécuter des requêtes en mode temps réel dans Structured Streaming. Pour obtenir un didacticiel pas à pas, consultez Tutoriel : Exécuter une charge de travail de streaming en temps réel. Pour plus d’informations conceptuelles sur le mode en temps réel, consultez le mode temps réel dans Structured Streaming.

Conditions préalables

Pour utiliser le mode en temps réel, vous devez configurer votre calcul pour répondre aux exigences suivantes :

  • Utilisez le mode d’accès dédié sur le calcul classique. Le mode d’accès standard, les pipelines déclaratifs Lakeflow Spark et les clusters serverless ne sont pas pris en charge.
  • Utilisez Databricks Runtime 16.4 LTS et versions ultérieures.
  • Désactivez la mise à l’échelle automatique.
  • Désactivez Photon.
  • Affectez la valeur spark.databricks.streaming.realTimeMode.enabled à true.
  • Désactivez les instances spot pour éviter les interruptions.

Pour obtenir des instructions sur la création et la configuration du calcul classique, consultez la référence de configuration de calcul.

Configuration des requêtes

Pour exécuter une requête en mode temps réel, vous devez activer le déclencheur en temps réel. Les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionnement des ressources de calcul

Vous pouvez exécuter un travail en temps réel par ressource de calcul si le calcul a suffisamment d’emplacements de tâches.

Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.

Exemples de calcul d'emplacement

Type de pipeline Paramétrage Emplacements requis
Étape unique sans état (source Kafka + puits) maxPartitions = 8 8 emplacements
Avec état à deux phases (source Kafka + shuffle) maxPartitions = 8, partitions aléatoires = 20 28 emplacements (8 + 20)
Trois étapes (source Kafka + mélange + répartition) maxPartitions = 8, deux phases aléatoires de 20 chacune 48 emplacements (8 + 20 + 20)

Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.

Ressources additionnelles