Partager via


Mode temps réel dans Structured Streaming

Important

Cette fonctionnalité est disponible en préversion publique.

Cette page décrit le mode en temps réel, un type de déclencheur pour Structured Streaming qui permet un traitement de données ultra-faible latence avec une latence de bout en bout aussi faible que 5 ms. Ce mode est conçu pour les charges de travail opérationnelles qui nécessitent une réponse immédiate aux données de streaming.

Le mode en temps réel est disponible dans Databricks Runtime 16.4 LTS et versions ultérieures.

Charges de travail opérationnelles

Les charges de travail de streaming peuvent être largement divisées en charges de travail analytiques et charges de travail opérationnelles :

  • Les charges de travail analytiques utilisent l'ingestion et la transformation des données, en suivant généralement l'architecture en médaillon (par exemple, l'ingestion des données dans les tables bronze, argent et or).
  • Les charges de travail opérationnelles consomment des données en temps réel, appliquent la logique métier et déclenchent des actions ou des décisions en aval.

Voici quelques exemples de charges de travail opérationnelles :

  • Blocage ou indicateur d’une transaction de carte de crédit en temps réel si un score de fraude dépasse un seuil, en fonction de facteurs tels que l’emplacement inhabituel, la grande taille des transactions ou les modèles de dépenses rapides.
  • Envoyer un message promotionnel lorsque les données de parcours indiquent qu’un utilisateur regarde des jeans depuis cinq minutes, en offrant une remise de 25% s'il achète dans les 15 prochaines minutes.

En général, les charges de travail opérationnelles sont caractérisées par la nécessité d’une latence de bout en bout sous-seconde. Cela peut être réalisé avec le mode en temps réel dans Apache Spark Structured Streaming.

Comment le mode en temps réel atteint une faible latence

Le mode en temps réel améliore l’architecture d’exécution en :

  • Exécution de lots de longue durée (la valeur par défaut est de 5 minutes), dans laquelle les données sont traitées au fur et à mesure qu’elles sont disponibles dans la source.
  • Toutes les étapes de la requête sont planifiées simultanément. Cela nécessite que le nombre d’emplacements de tâches disponibles soit égal ou supérieur au nombre de tâches de toutes les étapes d’un lot.
  • Les données passent entre les phases dès qu'elles sont produites à l'aide d'une lecture aléatoire de flux.

À la fin du traitement d’un lot et avant le démarrage du lot suivant, les points de contrôle Structured Streaming progressent et rendent les métriques pour le dernier lot disponible. Si les lots sont plus longs, ces activités peuvent être moins fréquentes, ce qui entraîne des relectures plus longues en cas de défaillance et de retard dans la disponibilité des métriques. En revanche, si les lots sont plus petits, ces activités deviennent plus fréquentes, ce qui peut affecter la latence. Databricks vous recommande d’évaluer le mode en temps réel par rapport à votre charge de travail cible et aux exigences pour trouver l’intervalle de déclencheur approprié.

Configuration du cluster

Pour utiliser le mode en temps réel dans Structured Streaming, vous devez configurer un travail Lakeflow classique :

  1. Dans votre espace de travail Azure Databricks, cliquez sur Nouveau dans le coin supérieur gauche. Choisissez Plus , puis cliquez sur Cluster.

  2. Effacer l'accélération de photons.

  3. Désactivez Mise à l’échelle automatique.

  4. Sous Performances avancées, désactivez utiliser des instances spot.

  5. Sous Mode Avancé et Accès, cliquez sur Manuel et sélectionnez Dédié (anciennement : Utilisateur unique).

  6. Sous Spark, entrez les éléments suivants sous configuration Spark :

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Cliquez sur Créer.

Configuration requise pour la taille du cluster

Vous pouvez exécuter un travail en temps réel par cluster si le cluster a suffisamment d’emplacements de tâches.

Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.

Exemples de calcul d'emplacement

Pipeline sans état à étape unique (source Kafka + récepteur) :

Si maxPartitions = 8, vous avez besoin d’au moins 8 emplacements. Si maxPartitions n’est pas défini, utilisez le nombre de partitions de rubrique Kafka.

Pipeline avec état à deux étapes (source Kafka + shuffle) :

Si maxPartitions = 8 et partitions aléatoires = 20, vous avez besoin de 8 + 20 = 28 emplacements.

Pipeline à trois étapes (source Kafka + shuffle + repartition) :

Avec maxPartitions = 8 et deux phases de répartition de 20 chacune, vous avez besoin de 8 + 20 + 20 = 48 slots.

Considérations clés

Lorsque vous configurez votre cluster, prenez en compte ce qui suit :

  • Contrairement au mode micro-batch, les tâches en temps réel peuvent rester inactives lors de l’attente des données, de sorte que le dimensionnement approprié est essentiel pour éviter les ressources perdues.
  • Visez un niveau d’utilisation cible (par exemple, 50%) en réglant :
    • maxPartitions (pour Kafka)
    • spark.sql.shuffle.partitions (pour les phases de lecture aléatoire)
  • Databricks recommande de définir maxPartitions afin que chaque tâche gère plusieurs partitions Kafka pour réduire la surcharge.
  • Ajustez les emplacements de tâche par travailleur pour qu'ils correspondent à la charge de travail des tâches simples à une étape.
  • Pour les travaux à forte charge de lecture aléatoire, essayez de trouver le nombre minimal de partitions de lecture aléatoire qui évitent les retards, et ajustez-les en fonction. Le travail ne sera pas planifié si le cluster n’a pas suffisamment de créneaux.

Note

À partir de Databricks Runtime 16.4 LTS et versions ultérieures, tous les pipelines en temps réel utilisent le checkpoint v2, permettant un basculement fluide entre les modes en temps réel et micro-batch.

Configuration des requêtes

Vous devez activer le déclencheur en temps réel pour spécifier qu’une requête doit s’exécuter à l’aide du mode à faible latence. En outre, les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour. Par exemple:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Auparavant, la latence des requêtes de bout en bout était étroitement liée à la durée du traitement par lots, ce qui rendait la durée du lot un bon indicateur de la latence des requêtes. Toutefois, cette méthode ne s’applique plus en mode temps réel, nécessitant d’autres approches pour mesurer la latence. La latence de bout en bout est spécifique à la charge de travail et peut parfois être mesurée avec précision avec la logique métier. Par exemple, si l’horodatage source est généré dans Kafka, la latence peut être calculée comme différence entre l’horodatage de sortie de Kafka et l’horodatage source.

Vous pouvez estimer la latence de bout en bout de plusieurs façons en fonction des informations partielles collectées pendant le processus de diffusion en continu.

Utiliser StreamingQueryProgress

Les métriques suivantes sont incluses dans l’événement StreamingQueryProgress , qui est automatiquement journalisée dans les journaux du pilote. Vous pouvez également y accéder par le biais de la fonction de rappel de StreamingQueryListeneronQueryProgress(). QueryProgressEvent.json() ou toString() inclure des métriques en mode temps réel supplémentaires.

  1. Latence de traitement (processingLatencyMs). Temps écoulé entre le moment où la requête en mode temps réel lit un enregistrement et avant qu’il soit écrit à l’étape suivante ou en aval. Pour les requêtes à phase unique, cela mesure la même durée que la latence E2E. Cette métrique est signalée par tâche.
  2. Latence de mise en file d’attente source (sourceQueuingLatencyMs). Le temps écoulé entre l’écriture réussie d’un enregistrement dans un bus de messages, par exemple, le temps d'ajout au journal dans Kafka, et la première lecture de l’enregistrement par une requête en mode temps réel. Cette métrique est signalée par tâche.
  3. Latence E2E (e2eLatencyMs). Temps entre le moment où l’enregistrement est correctement écrit dans un bus de messages et lorsque l’enregistrement est écrit en aval par la requête en mode temps réel. Cette métrique est agrégée par lot sur tous les enregistrements traités par toutes les tâches.

Par exemple:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Utiliser l’API Observer dans les travaux

L’API Observer permet de mesurer la latence sans lancer un autre travail. Si vous disposez d’un horodatage source qui correspond approximativement à l’heure d’arrivée des données sources et qu’il est passé avant d’atteindre le récepteur, ou si vous pouvez trouver un moyen de passer l’horodatage, vous pouvez estimer la latence de chaque lot à l’aide de l’API Observer :

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Dans cet exemple, un horodatage actuel est enregistré avant de sortir l’entrée, et la latence est estimée en calculant la différence entre cet horodatage et l’horodatage source de l’enregistrement. Les résultats sont inclus dans les rapports d’avancement et mis à la disposition des écouteurs. Voici un exemple de sortie :

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Qu’est-ce qui est pris en charge ?

Environments

Type de cluster Supported
Dédié (anciennement : utilisateur unique) Yes
Standard (anciennement : partagé) No
Pipelines déclaratifs Spark Lakeflow Classic No
Pipelines déclaratifs Spark Lakeflow sans serveur No
Serverless No

Languages

Language Supported
Scala Yes
Java Yes
Python Yes

Modes d’exécution

Mode d’exécution Supported
Mode de mise à jour Yes
Append mode No
Mode complet No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (à l’aide du connecteur Kafka) Yes
Kinesis Oui (mode EFO uniquement)
Google Pub/Sub No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (à l’aide du connecteur Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Récepteurs arbitraires (à l’aide de forEachWriter) Yes

Operators

Operators Supported
Opérations sans état
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Oui (avec certaines limitations)
  • Python UDF
Oui (avec certaines limitations)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Fonctions d’agrégation Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Oui (l’état n’est pas lié)
  • dropDuplicatesWithinWatermark
No
Jointure flux-table
  • Table de diffusion (doit être petite)
Yes
Jointure flux-flux No
(plat)MapGroupsWithState No
transformWithState Oui (avec certaines différences)
union Oui (avec certaines limitations)
forEach Yes
forEachBatch No
mapPartitions Non (voir limitation)

Utiliser transformWithState en mode temps réel

Pour la création d’applications personnalisées avec état, Databricks prend en charge transformWithState, une API dans Apache Spark Structured Streaming. Consultez Créer une application avec état personnalisé pour plus d’informations sur l’API et les extraits de code.

Toutefois, il existe des différences entre le comportement de l’API en mode réel et les requêtes de diffusion en continu traditionnelles qui tirent parti de l’architecture de micro-lots.

  • La méthode en mode temps réel handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) est appelée pour chaque ligne.
    • L’itérateur inputRows retourne une valeur unique. En mode micro batch, il est appelé une fois pour chaque clé, et l’itérateur inputRows retourne toutes les valeurs d’une clé dans le micro batch.
    • Vous devez être conscient de cette différence lors de l’écriture de leur code.
  • Les temporisateurs d’événements ne sont pas pris en charge en mode temps réel.
  • En mode temps réel, les minuteurs sont retardés en fonction de l’arrivée des données. Sinon, s’il n’y a pas de données, elle est déclenchée à la fin d’un lot de longue durée. Par exemple, si un minuteur est censé se déclencher à 10:00:00 et qu’il n’y a pas d’arrivée de données simultanément, il n’est pas déclenché. Au lieu de cela, si les données arrivent à 10:00:10, le minuteur est déclenché avec un délai de 10 secondes. Ou, si aucune donnée n’arrive et que le lot de longue durée est arrêté, le retardateur s'exécute avant de terminer le lot de longue durée.

Fonctions définies par l’utilisateur (UDF) Python

Databricks prend en charge la majorité des fonctions utilisateur définies en Python en mode temps réel.

Type UDF Supported
UDF sans état
  • UDF scalaire Python (lien)
Yes
  • Fonctions UDF scalaires de flèche
Yes
  • UDF scalaire Pandas (lien)
Yes
  • Fonction fléchée (mapInArrow)
Yes
  • Fonction de pandas (lien)
Yes
Regroupement à états UDF (UDAF)
  • transformWithState (REMARQUE : interface uniquement Row )
Yes
  • applyInPandasWithState
No
UDF sans état (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Table, fonction
No
UC UDF No

Il existe plusieurs points à prendre en compte lors de l'utilisation des UDFs Python en mode temps réel :

  • Pour réduire la latence, configurez la taille du lot Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) sur 1.
    • Compromis : cette configuration optimise la latence au détriment du débit. Pour la plupart des charges de travail, ce paramètre est recommandé.
    • Augmentez la taille du lot uniquement si un débit plus élevé est nécessaire pour prendre en charge le volume d’entrée, en acceptant l’augmentation potentielle de la latence.
  • Les fonctions pandas et UDF ne fonctionnent pas correctement avec une taille de lot Arrow de 1.
    • Si vous utilisez des fonctions UDFs ou pandas, définissez la taille du lot Arrow sur une valeur supérieure (par exemple, 100 ou plus).
    • Notez que cela implique une latence plus élevée. Databricks recommande d’utiliser les UDF Arrow ou une autre fonction si possible.
  • En raison du problème de performances avec pandas, transformWithState est uniquement pris en charge avec l’interface Row .

Techniques d’optimisation

Technique Activée par défaut
Suivi de progression asynchrone : transfère l'écriture dans le journal de décalage et le journal de validation dans un thread asynchrone, ce qui réduit le temps de traitement entre deux micro-lots. Cela peut aider à réduire la latence des requêtes stateless de streaming. No
Point de contrôle d’état asynchrone : permet de réduire la latence des requêtes de streaming avec état en commençant à traiter le micro-lot suivant dès que le calcul du micro-lot précédent se termine, sans attendre le point de contrôle d’état. No

Limitations

Limitation de la source

Pour Kinesis, le mode d’interrogation n’est pas pris en charge. De plus, les repartitions fréquentes peuvent avoir un impact négatif sur la latence.

Limitation de l’union

Pour Union, il existe certaines limitations :

  • L’auto-union n’est pas prise en charge :
    • Kafka : Vous ne pouvez pas utiliser le même objet de trame de données source et unir les trames de données qui en sont dérivées. Solution de contournement : utilisez différents dataframes qui accèdent à la même source.
    • Kinesis : vous ne pouvez pas fusionner les ensembles de données dérivés de la même source Kinesis avec la même configuration. Solution de contournement : Outre l’utilisation de différents dataframes, vous pouvez attribuer une autre option « consumerName » à chaque DataFrame.
  • Les opérateurs avec état (par exemple, aggregate, deduplicate, transformWithState) définis avant l’Union ne sont pas pris en charge.
  • L’union avec les sources de lots n’est pas prise en charge.

Limitation de MapPartitions

mapPartitions dans Scala et les API Python similaires (mapInPandas, mapInArrow) prennent un itérateur de la partition d’entrée entière et produisent un itérateur de la sortie entière avec un mappage arbitraire entre l’entrée et la sortie. Ces API peuvent entraîner des problèmes de performances en streaming Real-Time Mode en bloquant l’intégralité de la sortie, ce qui augmente la latence. La sémantique de ces API ne prend pas bien en charge la propagation des filigranes.

Utilisez des fonctions scalaires définies par l'utilisateur combinées avec la transformation des types de données complexes ou filter pour atteindre des fonctionnalités similaires.

Examples

Les exemples ci-dessous montrent les requêtes prises en charge.

Requêtes sans état

Les requêtes sans état, qu'elles soient simples ou à plusieurs étapes, sont toutes prises en charge.

Source Kafka vers le puits Kafka

Dans cet exemple, vous lisez à partir d'une source Kafka et écrivez dans un récepteur Kafka.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Répartition

Dans cet exemple, vous lisez à partir d’une source Kafka, repartitionnez les données dans 20 partitions et écrivez dans un récepteur Kafka.

Définissez la configuration de Spark spark.sql.execution.sortBeforeRepartition sur false avant d'utiliser le repartitionnement.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Jointure flux-instantané (diffusion uniquement)

Dans cet exemple, vous lisez à partir de Kafka, joignez les données à une table statique et écrivez dans un récepteur Kafka. Notez que seules les jointures statiques de flux qui diffusent la table statique sont prises en charge, ce qui signifie que la table statique doit tenir en mémoire.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Source Kinesis vers récepteur Kafka

Dans cet exemple, vous lisez à partir d'une source Kinesis et écrivez dans un récepteur Kafka.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

Dans cet exemple, vous unissez deux dataframes Kafka à partir de deux rubriques différentes et écrivez dans un récepteur Kafka.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Requêtes avec état

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Union avec agrégation

Dans cet exemple, vous unionez d’abord deux DataFrames Kafka à partir de deux rubriques différentes, puis effectuez une agrégation. À la fin, vous écrivez dans le récepteur Kafka.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Il existe une différence entre la façon dont le mode en temps réel et les autres modes d’exécution dans Structured Streaming exécutent le StatefulProcessor en transformWithState. Voir Utiliser transformWithState en mode temps réel

TransformWithState (PySpark, interface de ligne)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Il existe une différence entre la façon dont le mode en temps réel et d'autres modes d'exécution dans Structured Streaming exécutent StatefulProcessor dans transformWithState. Voir Utiliser transformWithState en mode temps réel

Sinks

Écriture sur Postgres via foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Cette fonctionnalité est disponible dans Databricks Runtime 17.1 et versions ultérieures.

Source de taux d’affichage

Dans cet exemple, vous lisez à partir d’une source de taux et affichez le dataframe de flux dans un notebook.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())