Partager via


Utiliser ForEachBatch pour écrire dans des récepteurs de données arbitraires dans des pipelines

Important

L’API foreach_batch_sink est en préversion publique.

Le récepteur ForEachBatch vous permet de traiter un flux en tant que série de micro-lots. Chaque lot peut être traité en Python avec une logique personnalisée similaire à Apache Spark Structured Streaming.foreachBatch Avec le sink ForEachBatch (SDP) de Lakeflow Spark, vous pouvez transformer, fusionner ou écrire des données de streaming sur une ou plusieurs cibles qui ne prennent pas nativement en charge les écritures de streaming. Cette page vous guide tout au long de la configuration d’un récepteur ForEachBatch, fournit des exemples et traite des considérations clés.

Le récepteur ForEachBatch fournit les fonctionnalités suivantes :

  • Logique personnalisée pour chaque micro-lot : ForEachBatch est un récepteur de streaming flexible. Vous pouvez appliquer des actions arbitraires (telles que la fusion dans une table externe, l’écriture dans plusieurs destinations ou l’exécution d’upserts) avec du code Python.
  • Prise en charge complète de l’actualisation : les pipelines gèrent les points de contrôle par flux. Par conséquent, les points de contrôle sont réinitialisés automatiquement lorsque vous effectuez une actualisation complète de votre pipeline. Avec le récepteur ForEachBatch, vous êtes responsable de la gestion de la remise à zéro des données en aval lorsque cela se produit.
  • Prise en charge du Unity Catalog : le puits de données ForEachBatch prend en charge toutes les fonctionnalités du Unity Catalog, telles que la lecture ou l’écriture dans des volumes ou des tables du Unity Catalog.
  • Nettoyage limité : le pipeline ne suit pas les données écrites à partir d’un récepteur ForEachBatch. Il ne peut donc pas nettoyer ces données. Vous êtes responsable de toute gestion des données en aval.
  • Entrées du journal des événements : le journal des événements du pipeline enregistre la création et l’utilisation de chaque puits ForEachBatch. Si votre fonction Python n’est pas sérialisable, vous voyez une entrée d’avertissement dans le journal des événements avec des suggestions supplémentaires.

Note

  • Le récepteur ForEachBatch est conçu pour les requêtes de streaming, telles que append_flow. Elle n'est pas destinée uniquement aux pipelines par lots ni à AutoCDC la sémantique.
  • Le récepteur ForEachBatch décrit sur cette page concerne les pipelines. Apache Spark Structured Streaming prend également en charge foreachBatch. Pour plus d’informations sur Structured Streaming foreachBatch, consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.

Quand utiliser un collecteur ForEachBatch

Utilisez un récepteur ForEachBatch chaque fois que votre pipeline nécessite des fonctionnalités qui ne sont pas disponibles via un format récepteur intégré tel que delta, ou kafka. Les cas d’usage classiques sont les suivants :

  • Fusion ou upserting dans une table Delta Lake : exécutez une logique de fusion personnalisée pour chaque micro-lot (par exemple, la gestion des enregistrements mis à jour).
  • Écriture sur plusieurs destinations ou non prises en charge : écrivez la sortie de chaque lot dans plusieurs tables ou systèmes de stockage externes qui ne prennent pas en charge les écritures de streaming (comme certains récepteurs JDBC).
  • Application d’une logique ou d’une transformation personnalisée : manipuler des données en Python directement (par exemple, à l’aide de bibliothèques spécialisées ou de transformations avancées).

Pour plus d’informations sur les récepteurs intégrés ou sur la création de récepteurs personnalisés avec Python, consultez Récepteurs dans les pipelines déclaratifs Spark Lakeflow.

Syntaxe

Utilisez l'ornement @dp.foreach_batch_sink() pour générer un évier ForEachBatch. Vous pouvez ensuite référencer cela comme une target dans votre définition de flux, par exemple dans @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Paramètre Descriptif
name Optional. Nom unique permettant d’identifier le récepteur dans le pipeline. La valeur par défaut est le nom de la fonction UDF, lorsqu’elle n’est pas incluse.
batch_handler Il s’agit de la fonction définie par l’utilisateur (UDF) qui sera appelée pour chaque micro-lot.
Df DataFrame Spark contenant des données pour le micro-batch actuel.
batch_id Identifiant entier du micro-lot. Spark incrémente cet ID pour chaque intervalle de déclencheur.
Un batch_id de 0 représente le début d’un flux ou le commencement d’une actualisation complète. Le foreach_batch_sink code doit gérer correctement une actualisation complète pour les sources de données en aval. Pour plus d’informations, consultez la section suivante.

Actualisation complète

Étant donné que ForEachBatch utilise une requête de diffusion en continu, le pipeline effectue le suivi du répertoire de point de contrôle pour chaque flux. Lors de l’actualisation complète :

  • Le répertoire de point de contrôle est réinitialisé.
  • Votre fonction récepteur (foreach_batch_sink UDF) voit un nouveau batch_id cycle démarrant de 0.
  • Les données de votre système cible ne sont pas automatiquement nettoyées par le pipeline (car le pipeline ne sait pas où vos données sont écrites). Si vous avez besoin d’un scénario de table rase, vous devez supprimer ou tronquer manuellement les tables externes ou les emplacements que votre destination ForEachBatch remplit.

Utilisation des fonctionnalités du catalogue Unity

Toutes les fonctionnalités existantes du catalogue Unity dans Spark Structured Streaming foreach_batch_sink restent disponibles.

Cela inclut l’écriture dans des tables du catalogue Unity, qu’elles soient gérées ou externes. Vous pouvez écrire des micro-lots dans des tables managées ou externes du Unity Catalog exactement comme vous le feriez dans n'importe quelle application Apache Spark Structured Streaming.

Entrées du journal des événements

Lorsque vous créez un récepteur ForEachBatch, un événement SinkDefinition est ajouté "format": "foreachBatch" au journal des événements du pipeline.

Cela vous permet de suivre l’utilisation des récepteurs ForEachBatch et de voir les avertissements concernant votre récepteur.

Utilisation avec Databricks Connect

Si la fonction que vous fournissez n’est pas sérialisable (une exigence importante pour Databricks Connect), le journal des événements inclut une WARN entrée qui vous recommande de simplifier ou de refactoriser votre code si la prise en charge de Databricks Connect est requise.

Par exemple, si vous utilisez dbutils pour obtenir des paramètres dans une fonction UDF ForEachBatch, vous pouvez obtenir l’argument avant de l’utiliser dans la fonction UDF :

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Meilleures pratiques

  1. Conservez votre fonction ForEachBatch concise : évitez le threading, les dépendances de bibliothèque lourdes ou les manipulations de données en mémoire volumineuses. Une logique complexe ou avec état peut entraîner des erreurs de sérialisation ou des goulots d’étranglement des performances.
  2. Surveillez votre dossier de point de contrôle : pour les requêtes en streaming, SDP gère les points de contrôle par flux, et non par destination. Si vous avez plusieurs flux dans votre pipeline, chaque flux a son propre répertoire de point de contrôle.
  3. Valider les dépendances externes : si vous vous appuyez sur des systèmes ou bibliothèques externes, vérifiez qu’elles sont installées sur tous les nœuds de cluster ou dans votre conteneur.
  4. N'oubliez pas Databricks Connect : si votre environnement est susceptible de passer à Databricks Connect à l'avenir, vérifiez que votre code est sérialisable et ne dépend pas de dbutils dans la foreach_batch_sink fonction UDF.

Limites

  • Aucune opération de nettoyage pour ForEachBatch : étant donné que votre code Python personnalisé peut écrire des données n’importe où, le pipeline ne peut pas nettoyer ou suivre ces données. Vous devez gérer vos propres stratégies de gestion ou de rétention des données pour les destinations dans lesquelles vous écrivez.
  • Métriques en micro-lots : les pipelines collectent des métriques de streaming, mais certains scénarios peuvent entraîner des métriques incomplètes ou inhabituelles lors de l’utilisation de ForEachBatch. Cela est dû à la flexibilité sous-jacente de ForEachBatch qui rend la traçabilité du flux des données et des lignes difficile pour le système.
  • Prise en charge de l’écriture dans plusieurs destinations sans plusieurs lectures : certains clients peuvent utiliser ForEachBatch pour lire à partir d’une source une seule fois, puis écrire dans plusieurs destinations. Pour ce faire, vous devez inclure df.persist ou df.cache à l’intérieur de votre fonction ForEachBatch. À l’aide de ces options, Azure Databricks tente de préparer les données une seule fois. Sans ces options, votre requête entraîne plusieurs lectures. Cela n’est pas inclus dans les exemples de code suivants.
  • Utilisation avec Databricks Connect : si votre pipeline s’exécute sur Databricks Connect, foreachBatch les fonctions définies par l’utilisateur (UDF) doivent être sérialisables et ne peuvent pas être utilisées dbutils. Le pipeline soulève des avertissements s’il détecte une fonction UDF non sérialisable, mais il ne fait pas échouer le pipeline.
  • Logique non sérialisable : le code référençant des objets locaux, des classes ou des ressources non sélectionnables peut s’interrompre dans les contextes Databricks Connect. Utilisez des modules Python purs et vérifiez que les références (par exemple) dbutilsne sont pas utilisées si Databricks Connect est obligatoire.

Examples

Exemple de syntaxe de base

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Utilisation d’exemples de données pour un pipeline simple

Cet exemple utilise l’exemple NYC Taxi. Il part du principe que l’administrateur de votre espace de travail a activé le catalogue Databricks Public Datasets. Pour le récepteur, changez my_catalog.my_schema en un catalogue et un schéma auxquels vous avez accès.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Écriture vers plusieurs destinations

Cet exemple écrit vers plusieurs destinations. Il démontre l'utilisation de txnVersion et txnAppId pour rendre l'écriture dans les tables Delta Lake idempotentes. Pour plus d’informations, consultez les écritures de table idempotentes dans foreachBatch.

Supposons que nous écrivons dans deux tables, table_a et table_b, et imaginons que, dans un lot, l’écriture dans table_a réussit tandis que l’écriture dans table_b échoue. Lorsque le lot est relancé, la paire (txnVersion, txnAppId) permet à Delta de passer outre l'écriture en double à table_a, et d'écrire uniquement le lot dans table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Utilisation de spark.sql()

Vous pouvez utiliser spark.sql() dans votre récepteur ForEachBatch, comme dans l’exemple suivant.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Questions fréquemment posées (FAQ)

Puis-je utiliser dbutils dans mon récepteur ForEachBatch ?

Si vous envisagez d’exécuter votre pipeline dans un environnement non Databricks Connect, dbutils cela peut fonctionner. Toutefois, si vous utilisez Databricks Connect, dbutils n’est pas accessible dans votre foreachBatch fonction. Le pipeline peut déclencher des avertissements s’il détecte l’utilisation de dbutils pour vous aider à éviter les ruptures.

Puis-je utiliser plusieurs flux avec un seul récepteur ForEachBatch ?

Yes. Vous pouvez définir plusieurs flux (avec @dp.append_flow) qui ont tous pour cible le même nom de destination, mais chacun conserve ses propres points de contrôle.

Le pipeline gère-t-il la rétention ou le nettoyage des données pour mon destinataire ?

Non. Étant donné que le puits de données ForEachBatch peut écrire dans n’importe quel emplacement ou système spécifié, le pipeline ne peut pas gérer ou supprimer automatiquement les données dans cette cible. Vous devez gérer ces opérations dans le cadre de votre code personnalisé ou de vos processus externes.

Comment résoudre les erreurs de sérialisation ou les échecs dans ma fonction ForEachBatch ?

Examinez les journaux d’activité de votre pilote de cluster ou les journaux d’événements de pipeline. Pour les problèmes de sérialisation liés à Spark Connect, vérifiez que votre fonction dépend uniquement des objets Python sérialisables et ne fait pas référence à des objets non autorisés (comme les handles de fichiers ouverts ou dbutils).