Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires

Cet article traite de l’utilisation de foreachBatch avec le flux structuré pour écrire la sortie d'une requête de streaming vers des sources de données qui ne disposent pas d'un récepteur de streaming existant.

Le modèle de code streamingDF.writeStream.foreachBatch(...) vous permet d’appliquer des fonctions de lot aux données de sortie de chaque micro-batch de la requête de streaming. Fonctions utilisées avec foreachBatch qui admettent deux paramètres :

  • Un DataFrame contenant les données de sortie d’un micro-lot.
  • L’identifiant unique du micro-lot.

Vous devez utiliser foreachBatch pour les opérations de fusion de Delta Lake dans le flux structuré. Consultez Upsert à partir de requêtes de diffusion en continu à l’aide de foreachBatch.

Appliquer d’autres opérations DataFrame

De nombreuses opérations DataFrame et DataSet ne sont pas prises en charge dans les DataFrames de streaming, Spark ne prend pas en charge la génération de plans incrémentiels dans ces cas. En utilisant foreachBatch(), vous pouvez appliquer certaines de ces opérations sur chaque sortie de micro-batch. Par exemple, vous pouvez utiliser foreachBath() et l'opération SQL MERGE INTO pour écrire la sortie des agrégations de streaming dans une table Delta en mode de mise à jour. Pour plus de détails, voir MERGE INTO.

Important

  • foreachBatch() ne fournit que des garanties d'écriture au moins une fois. Mais vous pouvez utiliser le batchId fourni à la fonction pour dédupliquer la sortie et obtenir une garantie une seule fois. Dans les deux cas, vous devrez définir vous-même la sémantique de bout en bout.
  • foreachBatch() ne fonctionne pas avec le mode de traitement continu car il repose fondamentalement sur l'exécution par micro-batch d'une requête de streaming. Si vous écrivez des données en mode continu, utilisez foreach() à la place.

Un dataframe vide peut être appelé avec foreachBatch() et le code utilisateur doit être résilient pour permettre un fonctionnement approprié. En voici un exemple :

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Changements de comportement pour foreachBatch dans Databricks Runtime 14.0

Dans Databricks Runtime 14.0 et versions ultérieures sur le calcul configuré avec le mode d’accès partagé, forEachBatch s’exécute dans un processus Python isolé distinct sur Apache Spark, plutôt que dans l’environnement REPL. Il est sérialisé et envoyé (push) à Spark et n’a pas accès aux objets spark globaux pendant la durée de la session.

Dans toutes les autres configurations de calcul, foreachBatch s’exécute dans le même REPL Python qui exécute le reste de votre code. Par conséquent, la fonction n’est pas sérialisée.

Lorsque vous utilisez Databricks Runtime 14.0 et versions ultérieures sur le calcul configuré avec le mode d’accès partagé, vous devez utiliser la variable sparkSession délimitée au DataFrame local lors de l’utilisation de foreachBatch dans Python, comme dans l’exemple de code suivant :

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

Les modifications de comportement suivantes s’appliquent :

  • Vous ne pouvez pas accéder à des variables Python globales à partir de votre fonction.
  • Les commandes print() écrivent la sortie dans les journaux du pilote.
  • Tous les fichiers, modules ou objets référencés dans la fonction doivent être sérialisables et disponibles sur Spark.

Réutiliser les sources de données de lot existantes

En utilisant foreachBatch(), vous pouvez utiliser les rédacteurs de données par lots existants pour les récepteurs de données qui ne prennent pas en charge la diffusion en flux structuré. Voici quelques exemples :

De nombreuses autres sources de données par lots peuvent être utilisées à partir de foreachBatch(). Consultez Se connecter aux sources de données.

Écrire dans plusieurs emplacements

Si vous devez écrire la sortie d'une requête en continu à plusieurs emplacements, Databricks recommande d'utiliser plusieurs rédacteurs en flux structuré pour une meilleure parallélisation et un meilleur débit.

L’utilisation de foreachBatch pour écrire dans plusieurs récepteurs sérialise l’exécution des écritures en streaming, ce qui peut augmenter la latence pour chaque micro-lot.

Si vous utilisez foreachBatch pour écrire dans plusieurs tables Delta, consultez Écritures de table idempotentes dans foreachBatch.