Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page décrit les prérequis et la configuration nécessaires pour exécuter des requêtes en mode temps réel dans Structured Streaming. Pour obtenir un didacticiel pas à pas, consultez Tutoriel : Exécuter une charge de travail de streaming en temps réel. Pour plus d’informations conceptuelles sur le mode en temps réel, consultez le mode temps réel dans Structured Streaming.
Conditions préalables
Pour utiliser le mode en temps réel, vous devez configurer votre calcul pour répondre aux exigences suivantes :
- Utilisez le mode d’accès dédié sur le calcul classique. Le mode d’accès standard, les pipelines déclaratifs Lakeflow Spark et les clusters serverless ne sont pas pris en charge.
- Utilisez Databricks Runtime 16.4 LTS et versions ultérieures.
- Désactivez la mise à l’échelle automatique.
- Désactivez Photon.
- Affectez la valeur
spark.databricks.streaming.realTimeMode.enabledàtrue. - Désactivez les instances spot pour éviter les interruptions.
Pour obtenir des instructions sur la création et la configuration du calcul classique, consultez la référence de configuration de calcul.
Configuration des requêtes
Pour exécuter une requête en mode temps réel, vous devez activer le déclencheur en temps réel. Les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionnement des ressources de calcul
Vous pouvez exécuter un travail en temps réel par ressource de calcul si le calcul a suffisamment d’emplacements de tâches.
Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.
Exemples de calcul d'emplacement
| Type de pipeline | Paramétrage | Emplacements requis |
|---|---|---|
| Étape unique sans état (source Kafka + puits) |
maxPartitions = 8 |
8 emplacements |
| Avec état à deux phases (source Kafka + shuffle) |
maxPartitions = 8, partitions aléatoires = 20 |
28 emplacements (8 + 20) |
| Trois étapes (source Kafka + mélange + répartition) |
maxPartitions = 8, deux phases aléatoires de 20 chacune |
48 emplacements (8 + 20 + 20) |
Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.