Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
In diesem Artikel wird die Verwendung von foreachBatch mit strukturiertem Streaming erläutert, um die Ausgabe einer Streamingabfrage in Datenquellen zu schreiben, die über keine vorhandene Streamingsenke verfügen.
Mit dem Codemuster streamingDF.writeStream.foreachBatch(...) können Sie Batchfunktionen auf die Ausgabedaten der einzelnen Mikrobatches der Streamingabfrage anwenden. Funktionen, die mit foreachBatch verwendet werden, akzeptieren zwei Parameter:
- Ein DataFrame, der die Ausgabedaten eines Mikrobatches enthält.
- Die eindeutige ID des Mikrobatches.
Sie müssen für Delta Lake-Zusammenführungsvorgänge bei strukturiertem Streaming foreachBatch verwenden. Weitere Informationen finden Sie unter Ausführen eines Upserts aus Streamingabfragen mithilfe von foreachBatch.
Anwenden zusätzlicher DataFrame-Vorgänge
Viele DataFrame- und Dataset-Vorgänge werden in Streaming-DataFrames nicht unterstützt, da Spark die Erstellung inkrementeller Pläne in diesen Fällen nicht unterstützt. Mit foreachBatch() können Sie einige dieser Vorgänge auf jede Mikrobatchausgabe anwenden. Sie können zum Beispiel foreachBatch() und den SQL-Vorgang MERGE INTO verwenden, um die Ausgabe von Streamingaggregationen in eine Deltatabelle im Aktualisierungsmodus zu schreiben. Weitere Details finden Sie in MERGE INTO.
Wichtig
-
foreachBatch()bietet nur At-Least-Once-Schreibgarantien. Sie können jedoch das der Funktion zur Verfügung gestelltebatchIdverwenden, um die Ausgabe zu deduplizieren und eine Exactly-Once-Garantie zu erhalten. In beiden Fällen müssen Sie selbst über die End-to-End-Semantik entscheiden. -
foreachBatch()funktioniert nicht mit dem kontinuierlichen Verarbeitungsmodus, da er im Wesentlichen auf der Mikrobatchausführung einer Streamingabfrage beruht. Wenn Sie Daten im kontinuierlichen Modus schreiben, verwenden Sie stattdessenforeach(). - Bei der Verwendung von
foreachBatchmit einem zustandsbehafteten Operator ist es wichtig, den Batch vollständig zu nutzen, bevor die Verarbeitung abgeschlossen ist. Weitere Informationen finden Sie unter Vollständige Nutzung aller Datenrahmen-Batches.
Ein leerer Datenrahmen kann mit foreachBatch() aufgerufen werden, und der Benutzercode muss resilient sein, um einen ordnungsgemäßen Betrieb zu ermöglichen. Das folgende Beispiel soll dies erläutern:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Verhaltensänderungen bei foreachBatch in Databricks Runtime 14.0
In Databricks Runtime 14.0 und höher bei der Berechnung, die mit dem Standardzugriffsmodus konfiguriert ist, gelten die folgenden Verhaltensänderungen:
-
print()Befehle schreiben Ausgaben in die Treiberprotokolle. - Sie können nicht auf das Untermodul
dbutils.widgetsinnerhalb der Funktion zugreifen. - Alle Dateien, Module oder Objekte, auf die in der Funktion verwiesen wird, müssen serialisierbar und in Spark verfügbar sein.
Wiederverwenden vorhandener Batchdatenquellen
Mit foreachBatch() können Sie vorhandene Batchdaten-Writer für Datensenken verwenden, die möglicherweise keine Unterstützung für strukturiertes Streaming bieten. Hier sind einige Beispiele:
Viele andere Batchdatenquellen können von foreachBatch() aus verwendet werden. Siehe Herstellen einer Verbindung mit Datenquellen und externen Diensten.
Schreiben an mehrere Speicherorte
Wenn Sie die Ausgabe einer Streamingabfrage an mehrere Speicherorte schreiben müssen, empfiehlt Databricks die Verwendung mehrerer Writer für strukturiertes Streaming, um eine optimale Parallelisierung und einen optimalen Durchsatz zu erzielen.
Die Verwendung foreachBatch zum Schreiben in mehrere Senken serialisiert die Ausführung von Streamingschreibvorgängen, was die Wartezeit für jeden Mikrobatch erhöhen kann.
Wenn Sie zum Schreiben in mehrere Delta-Tabellen foreachBatch verwenden, lesen Sie die Informationen unter Idempotente Schreibvorgänge in Tabellen in foreachBatch.
Verwenden Sie jeden DataFrame-Batch vollständig.
Wenn Sie zustandsbehaftete Operatoren (z. B. die Verwendung dropDuplicatesWithinWatermark) verwenden, muss jede Batch-Iteration den gesamten DataFrame verwenden oder die Abfrage neu starten. Wenn Sie den gesamten DataFrame nicht nutzen, schlägt die Streamingabfrage mit dem nächsten Batch fehl.
Dies kann in mehreren Fällen geschehen. Die folgenden Beispiele zeigen, wie Abfragen behoben werden, die ein DataFrame nicht ordnungsgemäß nutzen.
Eine Teilmenge des Batches gezielt verwenden
Wenn Sie sich nur um eine Teilmenge des Batches kümmern, könnten Sie Code wie den folgenden haben.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
In diesem Fall verarbeitet batch_df.show(2) nur die ersten beiden Elemente im Batch, was auch erwartet wird. Wenn jedoch mehr Elemente vorhanden sind, müssen sie genutzt werden. Der folgende Code verwendet den vollständigen DataFrame.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Hier ignoriert die do_nothing Funktion im Hintergrund den Rest des DataFrames.
Behandeln eines Fehlers in einem Batch
Beim Ausführen eines foreachBatch-Prozesses kann ein Fehler auftreten. Sie können Code wie den folgenden haben (in diesem Fall löst das Beispiel absichtlich einen Fehler aus, um das Problem anzuzeigen).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Durch die Behandlung (und das automatische Beibehalten) des Fehlers wird der Rest des Batches möglicherweise nicht genutzt. Es gibt zwei Optionen für die Behandlung dieser Situation.
Zuerst könnten Sie den Fehler erneut auslösen. Dadurch wird er an die Orchestrierungsebene übergeben, um den Batch erneut auszuführen. Dies kann den Fehler beheben, wenn es sich um ein vorübergehendes Problem handelt, oder es weiterleiten, damit Ihr Operationsteam es manuell beheben kann. Ändern Sie dazu den Code partial_func, damit er wie folgt aussieht:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Die zweite Option, wenn Sie die Ausnahme abfangen und den Rest des Batches ignorieren möchten, besteht darin, den Code in diesen zu ändern.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Dieser Code verwendet die do_nothing Funktion, um den Rest des Batches im Hintergrund zu ignorieren.