Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Le mode en temps réel dans Lakeflow Declarative Pipelines avec Spark est en préversion publique sur Databricks Runtime 18.1.2 dans le canal Preview.
Le mode en temps réel permet le traitement des données ultra-à-faible latence, avec une latence de bout en bout aussi faible que cinq millisecondes. Utilisez le mode en temps réel pour les charges de travail opérationnelles qui nécessitent une réponse immédiate aux données de diffusion en continu, telles que la détection des fraudes et la personnalisation en temps réel.
Le mode en temps réel est également disponible directement dans Structured Streaming en dehors des pipelines. Voir le mode temps réel dans Structured Streaming.
Comment le mode en temps réel atteint une faible latence
Le mode en temps réel diffère du traitement continu standard de trois façons clés :
- Lots de longue durée : le système traite les données au fur et à mesure qu’il devient disponible dans la source dans les lots de longue durée (la valeur par défaut est de cinq minutes).
- Planification simultanée des étapes : toutes les phases de requête sont planifiées en même temps. La ressource de calcul doit avoir suffisamment d’emplacements de tâches disponibles pour couvrir toutes les étapes simultanément. Voir Dimensionnement du calcul.
- Shuffle continu : les données sont transmises entre les étapes dès qu’elles sont produites, au lieu d’attendre qu’une étape en amont soit terminée avant de démarrer l’étape en aval.
L’intervalle de point de contrôle (configuré via pipelines.trigger.interval) contrôle la fréquence à laquelle les décalages d’état et de source sont conservés dans un stockage durable. Les intervalles plus longs réduisent la surcharge de point de contrôle, mais augmentent le temps de récupération après un échec et retardent les rapports de métriques. Les intervalles plus courts améliorent la durabilité, mais ajoutent une surcharge.
Mode temps réel et pipelines continus
Le mode en temps réel est un type spécialisé de déclencheur continu. Le mode continu est toujours nécessaire : le mode en temps réel ajoute des optimisations de latence au niveau du flux en plus. Pour utiliser le mode en temps réel, le pipeline doit d’abord s’exécuter en mode continu. Le mode en temps réel applique ensuite des optimisations supplémentaires au niveau du flux pour atteindre une latence inférieure à seconde au-delà de ce que fournit le traitement continu standard.
L’activation du mode en temps réel nécessite trois étapes de configuration :
- Configurez le pipeline en mode continu.
- Activez le mode en temps réel au niveau du pipeline.
- Définissez un flux de mise à jour en temps réel.
Requirements
| Requirement | Value |
|---|---|
| Environnement d'exécution de Databricks | 18.1.2 sur le canal Preview de SDP |
| Type de calcul | Calcul classique ou serverless |
Configurer le mode en temps réel
Étape 1 : Définir le pipeline en mode continu
Dans vos paramètres de pipeline, définissez le mode pipeline sur Continu ou définissez-le dans le json du pipeline :
{
"continuous": true
}
Étape 2 : Activer le mode en temps réel au niveau du pipeline
Dans vos paramètres de pipeline, ajoutez la clé suivante à la configuration Spark sous configuration Spark avancée >:
spark.databricks.streaming.realTimeMode.enabled = true
Vous pouvez également définir ceci dans le json du pipeline :
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Étape 3 : Définir un flux de mise à jour en temps réel
Le mode en temps réel nécessite un flux de mise à jour. Utilisez dp.create_sink() pour définir la cible de sortie, puis utilisez le décorateur @dp.update_flow avec pipelines.trigger défini sur "RealTime" et target pointant vers la destination.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Paramètres de configuration au niveau du flux :
| Paramètre | Obligatoire | Par défaut | Description |
|---|---|---|---|
pipelines.trigger |
Oui | — | Définissez cette "RealTime" option pour activer le mode en temps réel pour ce flux. |
pipelines.trigger.interval |
Non | "5 minutes" |
Intervalle de point de contrôle. Contrôle la fréquence à laquelle l’état et les offsets sont enregistrés. Les valeurs plus courtes améliorent la récupération ; les valeurs plus longues réduisent la surcharge. |
Exemples de code
Kafka vers Kafka
Lisez à partir d’un topic Kafka et écrivez vers une cible de sortie Kafka :
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Enrichir avec une jointure de diffusion
Joignez un flux Kafka à une table de recherche statique. Seules les jointures de diffusion (stream à statique) sont prises en charge. Les jointures de flux à flux ne sont pas prises en charge en mode temps réel.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Agrégation
Compter les événements par clé à l’aide d’un état groupBy. Définissez spark.sql.shuffle.partitions de manière à correspondre au nombre de partitions d’entrée pour les opérations avec état :
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Sources et puits pris en charge
| Connecteur | En tant que source | En tant que récepteur | Remarques |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Utilise l’interface compatible Kafka. |
| Azure Event Hubs (connecteur Kafka) | ✓ | ✓ | Utilise l’interface compatible Kafka. |
| Amazon Kinesis | ✓ | Non pris en charge | À utiliser uniquement pour le mode EFO (Fan-Out amélioré). |
| Delta | Non pris en charge | Non pris en charge | — |
Dimensionnement du calcul
Vous pouvez exécuter un pipeline en temps réel par ressource de calcul si la ressource de calcul dispose de suffisamment de slots de tâche. Les emplacements de tâches disponibles doivent couvrir toutes les tâches de toutes les phases de requête.
| Type de pipeline | Configuration | Emplacements de tâches requis |
|---|---|---|
| Étape unique sans état (source Kafka + puits) |
maxPartitions = 8 |
8 |
| Avec état à deux phases (source Kafka + shuffle) |
maxPartitions = 8, partitions aléatoires = 20 |
28 (8 + 20) |
| À trois étapes (source Kafka + deux shuffles) |
maxPartitions = 8, deux phases aléatoires de 20 chacune |
48 (8 + 20 + 20) |
Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.
Prise en charge des opérateurs
| Category | Operator | Supported |
|---|---|---|
| Sans état | Sélection, Projection | ✓ |
| UDFs | Scala UDF | ✓ (avec limitations) |
| UDFs | Python UDF | ✓ (avec limitations) |
| Agrégation | somme, nombre, maximum, minimum, moyenne | ✓ |
| Windowing | Rotation, Glissement | ✓ |
| Windowing | Session | Non pris en charge |
| Deduplication | dropDuplicates |
✓ (état non lié) |
| Deduplication | dropDuplicatesWithinWatermark |
Non pris en charge |
| Joins | Jointure de table de diffusion | ✓ |
| Joins | Jointure de flux à flux | Non pris en charge |
| Personnalisée | transformWithState |
✓ (avec des différences comportementales) |
| Personnalisée | union |
✓ (avec limitations) |
| Personnalisée | forEach |
Non pris en charge |
| Personnalisée | flatMapGroupsWithState |
Non pris en charge |
| Personnalisée | mapPartitions |
Non pris en charge |
| Personnalisée | forEachBatch |
Non pris en charge |
transformWithState en mode temps réel
transformWithState est pris en charge en mode en temps réel avec les différences suivantes par rapport au traitement par micro-lots :
-
handleInputRowsest appelé une fois par ligne plutôt qu’une fois par clé par lot. L’itérateurinputRowsgénère une valeur unique par appel. - Les minuteurs en temps d’événement ne sont pas pris en charge. Les minuteurs en temps de traitement se déclenchent lorsqu’un traitement par lot de longue durée se termine, si aucune donnée n’est arrivée.
- La fonction
transformWithStateInPandasn'est pas prise en charge.
UDF Pandas en mode temps réel
Pour minimiser la latence avec les UDF pandas, définissez spark.sql.execution.arrow.maxRecordsPerBatch sur 1. Cela optimise la latence au détriment du débit. Si le débit est également important, définissez cette valeur sur 100 ou une valeur supérieure.
Surveiller les performances du mode en temps réel
Le mode en temps réel affiche les métriques de latence dans le champ latencies de StreamingQueryProgress. Accédez à ces métriques via une StreamingQueryListener ou en inspectant la lastProgress propriété sur la requête de diffusion en continu.
| Métrique | Description |
|---|---|
processingLatencyMs |
Temps entre le moment où un enregistrement est lu par le flux et lorsqu’il est entièrement traité par le flux |
sourceQueuingLatencyMs |
Délai entre le moment où un enregistrement est correctement écrit dans le bus de messages (par exemple, l’heure d’ajout du journal dans Kafka) et le moment où il est lu pour la première fois par le flux |
e2eLatencyMs |
Latence totale de bout en bout du moment où l’enregistrement est produit à la source jusqu’au moment où il est entièrement traité par le flux |
Chaque métrique est signalée comme p50, p90, p95 et p99 centiles.
Limitations
Un flux en temps réel par pipeline est recommandé. Plusieurs flux sont autorisés, mais la contention entre les flux pour les créneaux d’exécution des tâches augmente la latence.
Pour obtenir la liste complète des limitations de l’opérateur et de la source, consultez les limitations du mode en temps réel.