Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Von Bedeutung
Die foreach_batch_sink API befindet sich in der öffentlichen Vorschau.
Mit der ForEachBatch-Senke können Sie einen Datenstrom als eine Reihe von Mikrobatches verarbeiten. Jeder Batch kann in Python mit benutzerdefinierter Logik wie Apache Spark Structured Streaming foreachBatchverarbeitet werden. Mit der Lakeflow Spark Declarative Pipelines (SDP) ForEachBatch-Spüle können Sie Streamingdaten in ein oder mehrere Ziele umwandeln, zusammenführen oder schreiben, die Streaming-Schreibvorgänge nicht nativ unterstützen. Diese Seite führt Sie durch das Einrichten einer ForEachBatch-Spüle, enthält Beispiele und erläutert wichtige Überlegungen.
ForEachBatch-Spüle bietet die folgenden Funktionen:
- Benutzerdefinierte Logik für jeden Mikro-Batch: ForEachBatch ist eine flexible Streaming-Senke. Sie können beliebige Aktionen (z. B. zusammenführen in eine externe Tabelle, Schreiben auf mehrere Ziele oder Ausführen von Upserts) mit Python-Code anwenden.
- Vollständige Aktualisierungsunterstützung: Pipelines verwalten Prüfpunkte pro Fluss, sodass Prüfpunkte automatisch zurückgesetzt werden, wenn Sie eine vollständige Aktualisierung Ihrer Pipeline durchführen. Mit dem ForEachBatch-Sink sind Sie dafür verantwortlich, die Zurücksetzung der nachgelagerten Daten zu verwalten, wenn dies geschieht.
- Unterstützung für Unity-Katalog: Die ForEachBatch-Senke unterstützt alle Funktionen des Unity-Katalogs, wie z. B. das Lesen von oder Schreiben in Volumes oder Tabellen des Unity-Katalogs.
- Begrenzte Aufräumarbeiten: Die Pipeline verfolgt nicht, welche Daten über eine ForEachBatch-Senke geschrieben werden, sodass diese Daten nicht bereinigt werden können. Sie sind für jede nachgelagerte Datenverwaltung verantwortlich.
- Ereignisprotokolleinträge: Das Pipelineereignisprotokoll zeichnet die Erstellung und Verwendung der einzelnen ForEachBatch-Senken auf. Wenn Ihre Python-Funktion nicht serialisierbar ist, wird im Ereignisprotokoll ein Warneintrag mit zusätzlichen Vorschlägen angezeigt.
Hinweis
- Die ForEachBatch-Senke ist für Streamingabfragen konzipiert, wie z. B.
append_flow. Sie ist nicht für nur-Batch-Pipelines oder fürAutoCDC-Semantik vorgesehen. - Die auf dieser Seite beschriebene ForEachBatch-Spüle richtet sich an Pipelines. Apache Spark Structured Streaming unterstützt auch
foreachBatch. Informationen zum strukturierten StreamingforeachBatchfinden Sie unter Verwendung von foreachBatch zum Schreiben in beliebige Datensenken.
Wann man ein ForEachBatch-Sink verwendet
Verwenden Sie eine ForEachBatch-Senke, wenn Ihre Pipeline Funktionen benötigt, die nicht über ein integriertes Senkenformat wie delta oder kafka verfügbar sind. Typische Anwendungsfälle sind:
- Zusammenführen oder Upserting in eine Delta Lake-Tabelle: Führen Sie benutzerdefinierte Zusammenführungslogik für jeden Microbatch aus (z. B. die Behandlung aktualisierter Datensätze).
- Schreiben in mehrere oder nicht unterstützte Ziele: Schreiben Sie die Ausgabe jedes Batches in mehrere Tabellen oder externe Speichersysteme, die Streaming-Schreibvorgänge nicht unterstützen (z. B. bestimmte JDBC-Schnittstellen).
- Anwenden benutzerdefinierter Logik oder Transformationen: Bearbeiten von Daten in Python direkt (z. B. mithilfe spezieller Bibliotheken oder erweiterter Transformationen).
Informationen zu den integrierten Senken oder zum Erstellen von benutzerdefinierten Senken mit Python finden Sie unter Sinks in Lakeflow Spark Declarative Pipelines.
Syntax
Verwenden Sie die @dp.foreach_batch_sink() Dekoration, um eine ForEachBatch-Spüle zu erzeugen. Sie können dies dann als target in Ihrer Flussdefinition referenzieren, z. B. in @dp.append_flow.
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
| Parameter | Description |
|---|---|
| Name | Wahlfrei. Ein eindeutiger Name zum Identifizieren der Spüle innerhalb der Pipeline. Der Standardwert ist der Name der UDF, wenn dieser nicht angegeben ist. |
| batch_handler | Dies ist die benutzerdefinierte Funktion (UDF), die für jeden Mikrobatch aufgerufen wird. |
| Df | Spark DataFrame mit Daten für den aktuellen Mikrobatch. |
| batch_id | Die ganzzahlige ID des Mikrobatches. Spark erhöht diese ID für jedes Triggerintervall. Ein batch_id von 0 stellt den Beginn eines Streams oder den Anfang eines vollständigen Aktualisierungsprozesses dar. Der foreach_batch_sink Code sollte eine vollständige Aktualisierung für nachgeschaltete Datenquellen ordnungsgemäß verarbeiten. Weitere Informationen finden Sie im nächsten Abschnitt. |
Vollständige Aktualisierung
Da ForEachBatch eine Streamingabfrage verwendet, verfolgt die Pipeline das Prüfpunktverzeichnis für jeden Fluss. Bei vollständiger Aktualisierung:
- Das Prüfpunktverzeichnis wird zurückgesetzt.
- Ihre Sink-Funktion (
foreach_batch_sinkUDF) sieht einen neuenbatch_idZyklus beginnend bei 0. - Daten in Ihrem Zielsystem werden von der Pipeline nicht automatisch bereinigt (da die Pipeline nicht weiß, wo Ihre Daten geschrieben werden). Wenn Sie ein Neuanfang-Szenario benötigen, müssen Sie die externen Tabellen oder Speicherorte, die Ihr ForEachBatch füllt, manuell löschen oder kürzen.
Verwendung der Funktionen des Unity-Katalogs
Alle vorhandenen Unity-Katalogfunktionen in Spark Structured Streaming foreach_batch_sink bleiben verfügbar.
Dies schließt das Schreiben in verwaltete oder externe Unity-Katalogtabellen ein. Sie können Mikrobatches in den von Unity Catalog verwalteten oder externen Tabellen genauso schreiben, wie Sie es in jedem Apache Spark Structured Streaming-Job tun würden.
Ereignisprotokolleinträge
Wenn Sie einen ForEachBatch-Sink erstellen, wird ein SinkDefinition-Ereignis zusammen mit "format": "foreachBatch" zum Ereignisprotokoll der Pipeline hinzugefügt.
Auf diese Weise können Sie die Verwendung von ForEachBatch-Senken nachverfolgen und Warnungen zu Ihrer Spüle anzeigen.
Verwenden mit Databricks Connect
Wenn die von Ihnen bereitstellende Funktion nicht serialisierbar ist (eine wichtige Anforderung für Databricks Connect), enthält das Ereignisprotokoll einen WARN Eintrag, der Sie empfehlen, Ihren Code zu vereinfachen oder umzugestalten, wenn die Unterstützung von Databricks Connect erforderlich ist.
Zum Beispiel, wenn Sie dbutils verwenden, um Parameter innerhalb einer ForEachBatch-UDF zu erhalten, können Sie alternativ das Argument abrufen, bevor Sie es in der UDF verwenden:
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
Bewährte Methoden
- Halten Sie Ihre ForEachBatch-Funktion präzise: Vermeiden Sie Threading, starke Bibliotheksabhängigkeiten oder große In-Memory-Datenmanipulationen. Komplexe oder zustandsbehaftete Logik kann zu Serialisierungsfehlern oder Leistungsengpässen führen.
- Überwachen Sie Ihren Prüfpunktordner: Für Streamingabfragen verwaltet SDP Prüfpunkte nach Datenfluss und nicht nach Senke. Wenn Sie über mehrere Flüsse in Ihrer Pipeline verfügen, verfügt jeder Fluss über ein eigenes Prüfpunktverzeichnis.
- Überprüfen externer Abhängigkeiten: Wenn Sie auf externe Systeme oder Bibliotheken angewiesen sind, überprüfen Sie, ob sie auf allen Clusterknoten oder in Ihrem Container installiert sind.
-
Achten Sie auf Databricks Connect: Wenn Ihre Umgebung in Zukunft zu Databricks Connect wechseln kann, überprüfen Sie, ob Ihr Code serialisierbar ist und nicht von
dbutilsinnerhalb derforeach_batch_sink-UDF abhängt.
Einschränkungen
- Kein Housekeeping für ForEachBatch: Da Ihr benutzerdefinierter Python-Code Daten an beliebiger Stelle schreiben kann, kann die Pipeline diese Daten nicht bereinigen oder nachverfolgen. Sie müssen Ihre eigenen Richtlinien zur Datenverwaltung oder Datenaufbewahrung für die Zielorte, in die Sie schreiben, verwalten.
- Metriken in Mikrobatch: Pipelines sammeln Streamingmetriken, aber einige Szenarien können unvollständige oder ungewöhnliche Metriken verursachen, wenn ForEachBatch verwendet wird. Dies liegt an der zugrunde liegenden Flexibilität von ForEachBatch, was das Nachverfolgen von Datenfluss und Zeilen für das System erschwert.
-
Unterstützen des Schreibens an mehrere Ziele ohne mehrere Lesevorgänge: Einige Kunden können ForEachBatch verwenden, um einmal aus einer Quelle zu lesen und dann in mehrere Ziele zu schreiben. Dazu müssen Sie
df.persistoderdf.cachein Ihrer ForEachBatch-Funktion einfügen. Mit diesen Optionen wird Azure Databricks versuchen, die Daten nur einmal fertig zu stellen. Ohne diese Optionen führt Ihre Abfrage zu mehreren Lesevorgängen. Dies ist nicht in den folgenden Codebeispielen enthalten. -
Verwenden mit Databricks Connect: Wenn Ihre Pipeline auf Databricks Connect ausgeführt wird, müssen
foreachBatchbenutzerdefinierte Funktionen (UDF) serialisierbar sein und dürfendbutilsnicht verwenden. Die Pipeline löst Warnungen aus, wenn eine nicht serialisierbare UDF erkannt wird, die Pipeline jedoch nicht fehlschlägt. -
Nicht serialisierbare Logik: Code, der auf lokale Objekte, Klassen oder nicht auswählbare Ressourcen verweist, kann in Databricks Connect-Kontexten unterbrechen. Verwenden Sie reine Python-Module und bestätigen Sie, dass Verweise (z. B.
dbutils) nicht verwendet werden, wenn Databricks Connect eine Anforderung ist.
Examples
Einfaches Syntaxbeispiel
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
Verwenden von Beispieldaten für eine einfache Pipeline
In diesem Beispiel wird das NYC Taxi-Beispiel verwendet. Es wird davon ausgegangen, dass Ihr Arbeitsbereichsadministrator den Databricks Public Datasets-Katalog aktiviert hat. Ändern Sie my_catalog.my_schema für die Spüle einen Katalog und ein Schema, auf das Sie Zugriff haben.
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
Schreiben an mehrere Zielorte
In diesem Beispiel wird an mehrere Ziele geschrieben. Es veranschaulicht, wie durch die Verwendung von txnVersion und txnAppId Schreibvorgänge in Delta Lake-Tabellen idempotent gemacht werden. Ausführliche Informationen finden Sie unter Idempotente Tabellen-Schreibvorgänge in foreachBatch.
Angenommen, wir schreiben in zwei Tabellen, table_a und table_b, und innerhalb eines Batches wird der Schreibvorgang in table_a erfolgreich ausgeführt, während er in table_b fehlschlägt. Wenn die Batch erneut ausgeführt wird, ermöglicht das Paar (txnVersion, txnAppId) Delta, den doppelten Schreibvorgang auf table_a zu ignorieren und die Batch nur auf table_b zu schreiben.
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
Verwenden von spark.sql()
Sie können die ForEachBatch-Senke wie im folgenden Beispiel spark.sql() verwenden.
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
Häufig gestellte Fragen (FAQ)
Kann ich dbutils in meinem ForEachBatch-Sink verwenden?
Wenn Sie beabsichtigen, Ihre Pipeline in einer Nicht-Databricks Connect-Umgebung auszuführen, könnte dbutils funktionieren. Wenn Sie Databricks Connect verwenden, dbutils kann jedoch nicht innerhalb Ihrer foreachBatch Funktion zugegriffen werden. Die Pipeline kann Warnungen auslösen, wenn sie die Verwendung von dbutils erkennt, um Unterbrechungen zu vermeiden.
Kann ich mehrere Datenflüsse mit einem einzelnen ForEachBatch-Sink verwenden?
Ja. Sie können mehrere Flüsse (mit @dp.append_flow) definieren, die alle auf denselben Sinknamen abzielen, aber sie behalten jeweils ihre eigenen Prüfpunkte.
Verwaltet die Pipeline die Datenaufbewahrung oder Bereinigung für mein Ziel?
Nein. Da der ForEachBatch-Sink an beliebige Standorte oder Systeme schreiben kann, kann die Pipeline die Daten in diesem Zielsystem nicht automatisch verwalten oder löschen. Sie müssen diese Vorgänge als Teil Ihres benutzerdefinierten Codes oder externer Prozesse behandeln.
Wie kann ich Serialisierungsfehler oder Fehler in meiner ForEachBatch-Funktion beheben?
Sehen Sie sich die Cluster-Treiberprotokolle oder Pipeline-Ereignisprotokolle an. Überprüfen Sie bei Serialisierungsproblemen im Zusammenhang mit Spark Connect, ob Ihre Funktion nur von serialisierbaren Python-Objekten abhängt und nicht auf unzulässige Objekte verweist (z. B. geöffnete Dateihandles oder dbutils).