Utiliser le mode en temps réel dans Lakeflow Spark Declarative Pipelines

Important

Le mode en temps réel dans Lakeflow Declarative Pipelines avec Spark est en préversion publique sur Databricks Runtime 18.1.2 dans le canal Preview.

Le mode en temps réel permet le traitement des données ultra-à-faible latence, avec une latence de bout en bout aussi faible que cinq millisecondes. Utilisez le mode en temps réel pour les charges de travail opérationnelles qui nécessitent une réponse immédiate aux données de diffusion en continu, telles que la détection des fraudes et la personnalisation en temps réel.

Le mode en temps réel est également disponible directement dans Structured Streaming en dehors des pipelines. Voir le mode temps réel dans Structured Streaming.

Comment le mode en temps réel atteint une faible latence

Le mode en temps réel diffère du traitement continu standard de trois façons clés :

  • Lots de longue durée : le système traite les données au fur et à mesure qu’il devient disponible dans la source dans les lots de longue durée (la valeur par défaut est de cinq minutes).
  • Planification simultanée des étapes : toutes les phases de requête sont planifiées en même temps. La ressource de calcul doit avoir suffisamment d’emplacements de tâches disponibles pour couvrir toutes les étapes simultanément. Voir Dimensionnement du calcul.
  • Shuffle continu : les données sont transmises entre les étapes dès qu’elles sont produites, au lieu d’attendre qu’une étape en amont soit terminée avant de démarrer l’étape en aval.

L’intervalle de point de contrôle (configuré via pipelines.trigger.interval) contrôle la fréquence à laquelle les décalages d’état et de source sont conservés dans un stockage durable. Les intervalles plus longs réduisent la surcharge de point de contrôle, mais augmentent le temps de récupération après un échec et retardent les rapports de métriques. Les intervalles plus courts améliorent la durabilité, mais ajoutent une surcharge.

Mode temps réel et pipelines continus

Le mode en temps réel est un type spécialisé de déclencheur continu. Le mode continu est toujours nécessaire : le mode en temps réel ajoute des optimisations de latence au niveau du flux en plus. Pour utiliser le mode en temps réel, le pipeline doit d’abord s’exécuter en mode continu. Le mode en temps réel applique ensuite des optimisations supplémentaires au niveau du flux pour atteindre une latence inférieure à seconde au-delà de ce que fournit le traitement continu standard.

L’activation du mode en temps réel nécessite trois étapes de configuration :

  1. Configurez le pipeline en mode continu.
  2. Activez le mode en temps réel au niveau du pipeline.
  3. Définissez un flux de mise à jour en temps réel.

Requirements

Requirement Value
Environnement d'exécution de Databricks 18.1.2 sur le canal Preview de SDP
Type de calcul Calcul classique ou serverless

Configurer le mode en temps réel

Étape 1 : Définir le pipeline en mode continu

Dans vos paramètres de pipeline, définissez le mode pipeline sur Continu ou définissez-le dans le json du pipeline :

{
  "continuous": true
}

Étape 2 : Activer le mode en temps réel au niveau du pipeline

Dans vos paramètres de pipeline, ajoutez la clé suivante à la configuration Spark sous configuration Spark avancée >:

spark.databricks.streaming.realTimeMode.enabled = true

Vous pouvez également définir ceci dans le json du pipeline :

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Étape 3 : Définir un flux de mise à jour en temps réel

Le mode en temps réel nécessite un flux de mise à jour. Utilisez dp.create_sink() pour définir la cible de sortie, puis utilisez le décorateur @dp.update_flow avec pipelines.trigger défini sur "RealTime" et target pointant vers la destination.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Paramètres de configuration au niveau du flux :

Paramètre Obligatoire Par défaut Description
pipelines.trigger Oui Définissez cette "RealTime" option pour activer le mode en temps réel pour ce flux.
pipelines.trigger.interval Non "5 minutes" Intervalle de point de contrôle. Contrôle la fréquence à laquelle l’état et les offsets sont enregistrés. Les valeurs plus courtes améliorent la récupération ; les valeurs plus longues réduisent la surcharge.

Exemples de code

Kafka vers Kafka

Lisez à partir d’un topic Kafka et écrivez vers une cible de sortie Kafka :

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Enrichir avec une jointure de diffusion

Joignez un flux Kafka à une table de recherche statique. Seules les jointures de diffusion (stream à statique) sont prises en charge. Les jointures de flux à flux ne sont pas prises en charge en mode temps réel.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agrégation

Compter les événements par clé à l’aide d’un état groupBy. Définissez spark.sql.shuffle.partitions de manière à correspondre au nombre de partitions d’entrée pour les opérations avec état :

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Sources et puits pris en charge

Connecteur En tant que source En tant que récepteur Remarques
Apache Kafka
AWS MSK Utilise l’interface compatible Kafka.
Azure Event Hubs (connecteur Kafka) Utilise l’interface compatible Kafka.
Amazon Kinesis Non pris en charge À utiliser uniquement pour le mode EFO (Fan-Out amélioré).
Delta Non pris en charge Non pris en charge

Dimensionnement du calcul

Vous pouvez exécuter un pipeline en temps réel par ressource de calcul si la ressource de calcul dispose de suffisamment de slots de tâche. Les emplacements de tâches disponibles doivent couvrir toutes les tâches de toutes les phases de requête.

Type de pipeline Configuration Emplacements de tâches requis
Étape unique sans état (source Kafka + puits) maxPartitions = 8 8
Avec état à deux phases (source Kafka + shuffle) maxPartitions = 8, partitions aléatoires = 20 28 (8 + 20)
À trois étapes (source Kafka + deux shuffles) maxPartitions = 8, deux phases aléatoires de 20 chacune 48 (8 + 20 + 20)

Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.

Prise en charge des opérateurs

Category Operator Supported
Sans état Sélection, Projection
UDFs Scala UDF ✓ (avec limitations)
UDFs Python UDF ✓ (avec limitations)
Agrégation somme, nombre, maximum, minimum, moyenne
Windowing Rotation, Glissement
Windowing Session Non pris en charge
Deduplication dropDuplicates ✓ (état non lié)
Deduplication dropDuplicatesWithinWatermark Non pris en charge
Joins Jointure de table de diffusion
Joins Jointure de flux à flux Non pris en charge
Personnalisée transformWithState ✓ (avec des différences comportementales)
Personnalisée union ✓ (avec limitations)
Personnalisée forEach Non pris en charge
Personnalisée flatMapGroupsWithState Non pris en charge
Personnalisée mapPartitions Non pris en charge
Personnalisée forEachBatch Non pris en charge

transformWithState en mode temps réel

transformWithState est pris en charge en mode en temps réel avec les différences suivantes par rapport au traitement par micro-lots :

  • handleInputRows est appelé une fois par ligne plutôt qu’une fois par clé par lot. L’itérateur inputRows génère une valeur unique par appel.
  • Les minuteurs en temps d’événement ne sont pas pris en charge. Les minuteurs en temps de traitement se déclenchent lorsqu’un traitement par lot de longue durée se termine, si aucune donnée n’est arrivée.
  • La fonction transformWithStateInPandas n'est pas prise en charge.

UDF Pandas en mode temps réel

Pour minimiser la latence avec les UDF pandas, définissez spark.sql.execution.arrow.maxRecordsPerBatch sur 1. Cela optimise la latence au détriment du débit. Si le débit est également important, définissez cette valeur sur 100 ou une valeur supérieure.

Surveiller les performances du mode en temps réel

Le mode en temps réel affiche les métriques de latence dans le champ latencies de StreamingQueryProgress. Accédez à ces métriques via une StreamingQueryListener ou en inspectant la lastProgress propriété sur la requête de diffusion en continu.

Métrique Description
processingLatencyMs Temps entre le moment où un enregistrement est lu par le flux et lorsqu’il est entièrement traité par le flux
sourceQueuingLatencyMs Délai entre le moment où un enregistrement est correctement écrit dans le bus de messages (par exemple, l’heure d’ajout du journal dans Kafka) et le moment où il est lu pour la première fois par le flux
e2eLatencyMs Latence totale de bout en bout du moment où l’enregistrement est produit à la source jusqu’au moment où il est entièrement traité par le flux

Chaque métrique est signalée comme p50, p90, p95 et p99 centiles.

Limitations

Un flux en temps réel par pipeline est recommandé. Plusieurs flux sont autorisés, mais la contention entre les flux pour les créneaux d’exécution des tâches augmente la latence.

Pour obtenir la liste complète des limitations de l’opérateur et de la source, consultez les limitations du mode en temps réel.

Ressources additionnelles