Lectures et écritures en diffusion en continu sur des tables Delta

Cette page explique comment utiliser des tables Delta comme sources et récepteurs pour Spark Structured Streaming avec readStream et writeStream. Delta Lake résout les problèmes courants de performances et de fiabilité pour les systèmes et fichiers de streaming. Les avantages sont les suivants :

  • Fusionner les petits fichiers produits par l’ingestion de données à faible latence et optimiser les performances.
  • Conservez le traitement « exactement une fois » avec plusieurs flux (ou travaux de traitement par lots simultanés).
  • Découvrez efficacement les nouveaux fichiers lors de l’utilisation de fichiers en tant que source de flux.

Pour savoir comment charger des données à l’aide de tables de diffusion en continu dans Databricks SQL, consultez Utiliser des tables de diffusion en continu dans Databricks SQL.

Pour les jointures entre flux et données statiques avec Delta Lake, consultez Jointures entre flux et statiques.

Utiliser des tables Delta comme récepteur

Vous pouvez écrire des données dans une table Delta à l’aide de Structured Streaming. Le journal des transactions Delta Lake garantit un traitement unique et précis, même s'il existe d'autres flux ou requêtes par lots exécutés simultanément sur la table.

Lorsque vous écrivez dans une table Delta à l’aide d’un récepteur Structured Streaming, vous pouvez voir des validations vides avec epochId = -1. Celles-ci sont attendues et se produisent généralement :

  • Sur le premier lot de chaque exécution de la requête de streaming (cela se produit chaque lot pour Trigger.AvailableNow).
  • Lorsqu’un schéma est modifié (par exemple, l’ajout d’une colonne).

Ces validations vides sont intentionnelles et n’indiquent pas d’erreur. Ils n’affectent pas la justesse ou les performances de la requête de manière significative.

Note

La fonction VACUUM Delta Lake supprime tous les fichiers non gérés par Delta Lake, mais ignore les répertoires qui commencent par _. Vous pouvez stocker sans risque des points de contrôle en même temps que d’autres données et métadonnées pour une table Delta à l’aide d’une structure de répertoires telle que <table-name>/_checkpoints.

Surveiller le backlog avec des indicateurs

Utilisez les métriques suivantes pour surveiller le backlog d’un processus de requête de streaming :

  • numBytesOutstanding: nombre d'octets restant à traiter dans le backlog.
  • numFilesOutstanding: nombre de fichiers qui doivent encore être traités dans le backlog.
  • numNewListedFiles: nombre de fichiers Delta Lake listés pour calculer l'arriéré de ce lot.
  • backlogEndOffset: version de table Delta utilisée pour calculer le backlog.

Dans un bloc-notes, affichez ces métriques sous l’onglet Données brutes dans le tableau de bord de progression de la requête de diffusion en continu :

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

Mode Append

Par défaut, les flux s’exécutent en mode Ajout et ajoutent uniquement de nouveaux enregistrements à la table.

Utilisez la méthode toTable pour la diffusion en continu vers des tables :

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Mode complet

Utilisez Structured Streaming en mode complet pour remplacer l’intégralité de la table après chaque lot. Par exemple, vous pouvez mettre à jour en continu et en temps réel un tableau de synthèse agrégé des événements par client :

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

Pour les applications sans exigences strictes en matière de latence, vous pouvez économiser des ressources informatiques et des coûts avec des déclencheurs ponctuels tels que AvailableNow. Par exemple, utilisez ce déclencheur pour mettre à jour les tables d’agrégation récapitulatives selon une planification donnée, en traitant uniquement les nouvelles données arrivées depuis la dernière mise à jour. Voir AvailableNow: Traitement par lots incrémentiel.

Gérer les modifications des tables Delta source

Structured Streaming lit de manière incrémentielle les tables Delta. Lorsqu’une requête de diffusion en continu lit à partir d’une table Delta, de nouveaux enregistrements sont traités de manière idempotente à mesure que de nouvelles versions de la table sont engagées dans la table source. Structured Streaming accepte uniquement les entrées d’ajout et lève une exception si des modifications se produisent sur la table Delta source. Par exemple, si une opération UPDATE, DELETE, MERGE INTO ou OVERWRITE modifie une table Delta source lue par une requête de streaming, le flux échoue avec une erreur.

Il existe quatre approches classiques pour gérer les modifications en amont apportées aux tables Delta sources, en fonction de votre cas d’usage. Un tableau de référence et des détails sur chacun d’eux sont fournis ci-dessous :

Approche Avantages Inconvénients
skipChangeCommits Simple, ne vous oblige pas à écrire une logique complexe. Utile pour le traitement en ajout uniquement où les modifications en amont sont gérées séparément, ou pour gérer temporairement un enregistrement incorrect. Ne propage pas les modifications et ne traite que les ajouts.
Actualisation complète En plus, c’est simple et il n’est pas nécessaire d’écrire une logique complexe. Utile pour les petits jeux de données avec de rares modifications en amont. Coûteux pour les jeux de données volumineux. Nécessite le retraitement de toutes les tables en aval.
Modifier le flux de données Traitez tous les types de modifications (insertions, mises à jour et suppressions). Databricks recommande la diffusion en continu à partir du flux CDC d’une table Delta plutôt que directement à partir de la table dans la mesure du possible. Vous devez écrire une logique plus complexe pour gérer chaque type de modification.
Vues matérialisées Alternative simple à Structured Streaming qui a une propagation automatique des modifications. Latence plus élevée. Disponible uniquement dans les pipelines déclaratifs Spark Lakeflow et Databricks SQL.

Ignorer les validations de modification en amont avec skipChangeCommits

Définissez skipChangeCommits pour ignorer les transactions qui suppriment ou modifient des enregistrements existants, et pour traiter uniquement les ajouts. Cela est utile lorsque les modifications apportées aux données existantes n’ont pas besoin d’être propagées via le flux, ou lorsque vous préférez une logique distincte pour gérer ces modifications. Vous pouvez activer et désactiver skipChangeCommits si vous devez ignorer temporairement les modifications ponctuelles.

Databricks recommande d'utiliser skipChangeCommits pour la plupart des charges de travail qui n'utilisent pas de flux de données modifiés.

Python

(spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")
)

Scala

spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")

Important

Si le schéma d’une table Delta change après le début d’une lecture en continu sur la table, la requête échoue. Pour la plupart des modifications de schéma, vous pouvez redémarrer le flux pour résoudre l’incompatibilité de schéma et poursuivre le traitement.

Dans Databricks Runtime 12.2 LTS et les versions antérieures, vous ne pouvez pas diffuser en continu à partir d’une table Delta avec le mappage de colonnes activé qui a subi une évolution de schéma non additive, telle que le changement de nom ou la suppression de colonnes. Pour plus d’informations, consultez Mappage de colonnes et diffusion en continu.

Note

Dans Databricks Runtime 12.2 LTS et versions ultérieures, skipChangeCommits remplace ignoreChanges. Dans Databricks Runtime 11.3 LTS et versions antérieures, ignoreChanges est la seule option prise en charge. Consultez l’option Héritée : ignoreChanges pour plus d’informations.

Option héritée : ignoreDeletes

ignoreDeletes est une option héritée qui gère uniquement les transactions qui suppriment des données aux limites de partition (autrement dit, les suppressions de partition complètes). Si vous devez gérer les suppressions, mises à jour ou autres modifications non partitionnelles, utilisez skipChangeCommits à la place.

Python
(spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")
)
Scala
spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Option héritée : ignoreChanges

ignoreChanges est disponible dans Databricks Runtime 11.3 LTS et inférieur. Dans Databricks Runtime 12.2 LTS et versions ultérieures, il est remplacé par skipChangeCommits.

Avec ignoreChanges activé, les fichiers de données réécrits dans la table source sont réémis après une opération de modification de données telle que UPDATE, MERGE INTO, (DELETE dans des partitions) ou OVERWRITE. Les lignes inchangées sont souvent émises à côté de nouvelles lignes, de sorte que les consommateurs en aval doivent être en mesure de gérer les doublons. Les suppressions ne sont pas propagées en aval. ignoreChanges est prioritaire par rapport à ignoreDeletes.

En revanche, skipChangeCommits ignore entièrement les opérations de modification de fichier. Les fichiers de données réécrits dans la table source en raison d’opérations de modification des données telles que UPDATE, MERGE INTO, DELETEet OVERWRITE sont ignorés entièrement. Pour refléter les modifications apportées aux tables sources de flux, vous devez implémenter une logique distincte pour propager ces modifications.

Databricks recommande d’utiliser skipChangeCommits pour toutes les nouvelles charges de travail. Pour migrer une charge de travail vers ignoreChangesskipChangeCommits, refactorisez votre logique de diffusion en continu.

Actualisation complète des tables en aval

Si les modifications en amont sont rares et que les données sont suffisamment petites pour le retraitement, vous pouvez supprimer le point de contrôle de diffusion en continu et la table de sortie, puis redémarrer le flux à partir du début. Cela entraîne le retraitement de toutes les données de la table source. N’oubliez pas que cette approche nécessite également de retraiter toutes les tables en aval qui dépendent de la sortie de ce flux.

Cette approche convient le mieux aux jeux de données ou charges de travail plus petits où les modifications en amont sont peu fréquentes et le coût d’une actualisation complète est acceptable.

Utiliser le flux de données modifiées

Pour les charges de travail qui traitent tous les types de modifications (insertions, mises à jour et suppressions), utilisez le flux de données de modification Delta Lake. Le flux de données modifiées enregistre les modifications au niveau des lignes d’une table Delta, ce qui vous permet de diffuser en continu ces modifications et d’écrire une logique pour gérer chaque type de modification dans les tables en aval. Il s’agit de l’approche la plus robuste, car votre code gère explicitement chaque type d’événement de modification. Utilisez le flux de données de modification Delta Lake sur Azure Databricks.

Si vous utilisez des pipelines déclaratifs Spark Lakeflow, consultez les API AUTO CDC : simplifier la capture de données modifiées avec des pipelines.

Important

Dans Databricks Runtime 12.2 LTS et ci-dessous, vous ne pouvez pas diffuser en continu à partir du flux de données de modification d’une table Delta avec mappage de colonnes activé qui a subi une évolution de schéma non additif, telle que le changement de nom ou la suppression de colonnes. Consultez le mappage des colonnes et la diffusion en continu.

Utiliser les vues matérialisées

Les vues matérialisées gèrent automatiquement les modifications en amont en recomputant les résultats lorsque les données sources changent. Si vous n’avez pas besoin de la latence la plus faible possible et que vous souhaitez éviter la gestion de la complexité du streaming, une vue matérialisée peut simplifier votre architecture. Les vues matérialisées sont disponibles dans les pipelines déclaratifs Spark de Lakeflow et dans Databricks SQL. Consultez Vues matérialisées.

Example

Par exemple, supposons que vous ayez une table user_events avec des colonnes date, user_email et action, partitionnée par date. Vous sortez de la table user_events et vous devez en supprimer les données conformément au RGPD.

skipChangeCommits vous permet de supprimer des données dans plusieurs partitions (dans cet exemple, le filtrage sur user_email). Utilisez la syntaxe suivante :

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Si vous mettez à jour un user_email avec l’instruction UPDATE, le fichier contenant le user_email en question est réécrit. Utilisez skipChangeCommits pour ignorer les fichiers de données modifiés.

Databricks recommande d’utiliser skipChangeCommits plutôt que ignoreDeletes sauf si vous êtes certain que les suppressions sont toujours des suppressions de partition complètes.

Utiliser foreachBatch pour les écritures de table idempotentes

Note

Databricks recommande de configurer une écriture de streaming distincte pour chaque récepteur que vous souhaitez mettre à jour au lieu d’utiliser foreachBatch. Les écritures dans plusieurs récepteurs réduisent foreachBatch la parallélisation et augmentent la latence globale, car les écritures dans plusieurs tables sont sérialisées dans foreachBatch.

Les tables Delta prennent en charge les options suivantes DataFrameWriter pour rendre les écritures vers plusieurs tables foreachBatch idempotentes.

  • txnAppId : chaîne unique que vous pouvez transmettre à chaque écriture DataFrame. Par exemple, vous pouvez utiliser l’ID StreamingQuery comme txnAppId. txnAppId peut être n’importe quelle chaîne unique générée par l’utilisateur et ne doit pas être liée à l’ID de flux.
  • txnVersion : Nombre à croissance monotone qui fait office de version de transaction.

Delta Lake utilise txnAppId et txnVersion, identifie et ignore les écritures en double. Par exemple, après qu'une défaillance a interrompu l'écriture d'un lot, vous pouvez réexécuter le lot avec le même txnAppId et txnVersion pour identifier et ignorer correctement les doublons. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.

Warning

Si vous supprimez le point de contrôle de streaming et redémarrez la requête avec un nouveau point de contrôle, vous devez fournir un autre txnAppId. Les nouveaux points de contrôle commencent par un ID de lot de 0. Delta Lake utilise l’ID de lot et txnAppId comme clé unique, et ignore les lots avec des valeurs déjà vues.

L’exemple de code suivant illustre ce modèle :

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}

Upsert à partir de requêtes de diffusion en continu à l’aide de foreachBatch

Vous pouvez utiliser merge et foreachBatch pour écrire des upserts complexes à partir d'une requête de streaming dans une table Delta. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.

Cette approche a de nombreuses applications :

Note

  • Vérifiez que votre merge instruction à l'intérieur de foreachBatch est idempotente. Sinon, les redémarrages de la requête de diffusion en continu peuvent appliquer l’opération sur le même lot de données plusieurs fois. Consultez Utiliser foreachBatch pour les écritures de tables idempotentes.

  • Lorsque merge est utilisé dans foreachBatch, la métrique du débit de données d'entrée peut retourner un multiple du taux réel auquel les données sont générées à la source des données. merge lit les données d’entrée plusieurs fois, ce qui multiplie les métriques. Pour empêcher la multiplication des métriques, mettez en cache le DataFrame par lots avant merge , puis décompressez-le après merge.

    Le débit de données d’entrée est disponible via StreamingQueryProgress et dans le graphique de débit de streaming du notebook. Consultez la surveillance des requêtes Structured Streaming sur Azure Databricks.

Par exemple, vous pouvez utiliser MERGE des instructions SQL dans foreachBatch:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Vous pouvez également utiliser les API Delta Lake pour la diffusion en continu d’upserts :

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Définir la version initiale de la table pour traiter les modifications

Par défaut, les flux commencent par la dernière version de la table Delta disponible. Cela inclut un instantané complet de la table à ce moment et toutes les modifications futures. Databricks recommande d’utiliser la version de table initiale par défaut pour la plupart des charges de travail.

Si vous le souhaitez, vous pouvez utiliser les options suivantes pour spécifier le point de départ de la source de streaming Delta Lake sans traiter la table entière.

  • startingVersion: version de la table Delta à partir de laquelle commencer la lecture. Toutes les modifications de table validées à ou après la version spécifiée sont lues par le flux. Si la version spécifiée n’est pas disponible, le flux ne démarre pas.

    Pour rechercher les versions de validation disponibles, exécutez DESCRIBE HISTORY et vérifiez version. Pour retourner uniquement les dernières modifications, spécifiez latest. Pour plus d’informations sur les versions des tables Delta, consultez Travailler avec l'historique des tables.

  • startingTimestamp: horodatage à partir duquel commencer la lecture. Toutes les modifications de table validées au moment ou après l’horodatage spécifié sont lues par le flux. Si le timestamp fourni précède toutes les validations de table, la lecture en continu commence par le timestamp disponible le plus ancien. Définissez l’une ou l’autre des options

    • Une chaîne de timestamp. Par exemple : "2019-01-01T00:00:00.000Z".
    • Une chaîne de date. Par exemple : "2019-01-01".

Vous ne pouvez pas définir les deux startingVersion et startingTimestamp en même temps. Ces paramètres s’appliquent uniquement aux nouvelles requêtes de diffusion en continu. Si une requête de diffusion en continu a démarré et que la progression a été enregistrée dans son point de contrôle, ces paramètres sont ignorés.

Important

Bien que vous puissiez démarrer la source de streaming à partir d’une version ou d’un timestamp spécifié, le schéma de la source de streaming est toujours le schéma le plus récent de la table Delta. Vous devez vous assurer qu’aucune modification de schéma incompatible n’a été apportée à la table Delta après la version ou le timestamp spécifié. Sinon, la source de diffusion en continu peut retourner des résultats incorrects lors de la lecture des données avec un schéma incorrect.

Example

Par exemple, supposons que vous ayez une table user_events. Si vous souhaitez lire les modifications apportées depuis la version 5, utilisez :

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Si vous souhaitez lire les modifications apportées depuis la version 2018-10-18, utilisez :

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Traiter l’instantané initial sans supprimer de données

Cette fonctionnalité est disponible sur Databricks Runtime 11.3 LTS et versions ultérieures.

Dans une requête de streaming avec état et un filigrane défini, le traitement des fichiers en fonction de l'heure de modification peut entraîner un traitement des enregistrements dans un ordre incorrect. Cela peut amener le filigrane à marquer incorrectement les enregistrements comme des événements tardifs et à les exclure. Cela peut se produire uniquement lorsque l’instantané Delta initial est traité dans l’ordre par défaut.

Pour les flux avec une table source Delta, la requête traite d’abord toutes les données présentes dans la table et crée une version appelée capture instantanée initiale. Par défaut, les fichiers de données de la table Delta sont traités en fonction du dernier fichier modifié. Toutefois, l’heure de la dernière modification ne représente pas nécessairement l’ordre chronologique des événements d’enregistrement.

Pour éviter les chutes de données pendant le traitement initial des captures instantanées, activez l’option withEventTimeOrder . withEventTimeOrder divise l’intervalle de temps d’événement des données d’instantané initiales en compartiments de temps. Chaque micro-lot traite un seau en filtrant les données selon l’intervalle de temps. Les options maxFilesPerTrigger et maxBytesPerTrigger sont toujours applicables pour contrôler la taille du micro-lot, mais seulement de manière approximative en raison de l’approche de traitement.

Le diagramme suivant illustre ce processus :

Instantané initial

Contraintes

  • Vous ne pouvez pas modifier withEventTimeOrder si la requête de flux a démarré et que l’instantané initial est en cours de traitement actif. Pour redémarrer avec withEventTimeOrder modifié, vous devez supprimer le point de contrôle.
  • Si withEventTimeOrder elle est activée, vous ne pouvez pas rétrograder un flux vers une version databricks Runtime qui ne prend pas en charge cette fonctionnalité tant que le traitement initial des instantanés n’est pas terminé. Pour effectuer une rétrogradation, attendez que l’instantané initial se termine, ou supprimez le point de contrôle et redémarrez la requête.
  • Cette fonctionnalité n’est pas prise en charge dans les scénarios suivants :
    • La colonne d’heure d’événement est une colonne générée et il existe des transformations de non-projection entre la source Delta et le filigrane.
    • Il existe un filigrane qui a plusieurs sources Delta dans la requête de flux.

Efficacité

Si withEventTimeOrder elle est activée, les performances initiales du traitement des instantanés peuvent être plus lentes. Chaque micro-batch analyse l’instantané initial pour filtrer les données dans la plage horaire d’événement correspondante. Pour améliorer les performances de filtrage :

  • Utilisez une colonne source Delta comme heure d'événement pour permettre l'application du saut de données. Consultez le saut de données.
  • Partitionnez la table selon la colonne temps de l’événement.

Utilisez l’interface utilisateur Spark pour voir le nombre de fichiers Delta analysés pour un micro-lot spécifique.

Example

Supposons que vous ayez une table user_events avec une colonne event_time. Votre requête de diffusion en continu est une requête d’agrégation. Si vous souhaitez vous assurer qu’aucune suppression de données n’aura lieu lors du traitement de la capture instantanée initiale, vous pouvez utiliser :

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Vous pouvez définir withEventTimeOrder avec une configuration Spark sur le cluster pour l’appliquer à toutes les requêtes de diffusion en continu : spark.databricks.delta.withEventTimeOrder.enabled true.

Limiter le taux d’entrée pour améliorer les performances de traitement

Par défaut, Structured Streaming traite autant de fichiers que possible dans chaque micro-lot. Pour limiter la quantité de données traitées par lot et gérer l’utilisation de la mémoire, stabiliser la latence ou réduire les coûts de stockage cloud, utilisez les options suivantes :

  • maxFilesPerTrigger : nombre de nouveaux fichiers à prendre en compte dans chaque micro-lot. La valeur par défaut est 1000.
  • maxBytesPerTrigger : quantité de données traitées dans chaque micro-lot. Cette option définit un « max soft », ce qui signifie qu’un lot traite approximativement cette quantité de données et peut traiter plus que la limite afin de faire avancer la requête de diffusion en continu dans les cas où la plus petite unité d’entrée est supérieure à cette limite. Elle n’est pas définie par défaut.

Si vous utilisez à la fois maxBytesPerTrigger et maxFilesPerTrigger, le micro-lot traite les données jusqu’à ce que la limite maxFilesPerTrigger ou maxBytesPerTrigger soit atteinte.

Note

Par défaut, si logRetentionDuration nettoie les transactions dans la table source et que la requête de diffusion en continu tente de traiter ces versions, la requête échoue à empêcher la perte de données. Vous pouvez choisir l’option failOnDataLoss à false pour ignorer les données perdues et poursuivre le traitement. Voir Configurer la conservation des données pour des requêtes de voyage dans le temps.

Contrôler le coût du stockage cloud

Les requêtes de diffusion en continu ont plusieurs modes de déclencheur disponibles qui vous permettent d’équilibrer les coûts et la latence, notamment processingTime, availableNowet realTime. Consultez Contrôler les coûts de stockage cloud.