Freigeben über


Verwenden von „foreachBatch” zum Schreiben in beliebige Datensenken

In diesem Artikel wird die Verwendung von foreachBatch mit strukturiertem Streaming erläutert, um die Ausgabe einer Streamingabfrage in Datenquellen zu schreiben, die über keine vorhandene Streamingsenke verfügen.

Mit dem Codemuster streamingDF.writeStream.foreachBatch(...) können Sie Batchfunktionen auf die Ausgabedaten der einzelnen Mikrobatches der Streamingabfrage anwenden. Funktionen, die mit foreachBatch verwendet werden, akzeptieren zwei Parameter:

  • Ein DataFrame, der die Ausgabedaten eines Mikrobatches enthält.
  • Die eindeutige ID des Mikrobatches.

Sie müssen für Delta Lake-Zusammenführungsvorgänge bei strukturiertem Streaming foreachBatch verwenden. Weitere Informationen finden Sie unter Upsert aus Streamingabfragen mittels foreachBatch.

Anwenden zusätzlicher DataFrame-Vorgänge

Viele DataFrame- und Dataset-Vorgänge werden in Streaming-DataFrames nicht unterstützt, da Spark die Erstellung inkrementeller Pläne in diesen Fällen nicht unterstützt. Mit foreachBatch() können Sie einige dieser Vorgänge auf jede Mikrobatchausgabe anwenden. Sie können zum Beispiel foreachBath() und den SQL-Vorgang MERGE INTO verwenden, um die Ausgabe von Streamingaggregationen in eine Deltatabelle im Aktualisierungsmodus zu schreiben. Weitere Informationen finden Sie unter MERGE INTO.

Wichtig

  • foreachBatch() bietet nur At-Least-Once-Schreibgarantien. Sie können jedoch das der Funktion zur Verfügung gestellte batchId verwenden, um die Ausgabe zu deduplizieren und eine Exactly-Once-Garantie zu erhalten. In beiden Fällen müssen Sie selbst über die End-to-End-Semantik entscheiden.
  • foreachBatch() funktioniert nicht mit dem kontinuierlichen Verarbeitungsmodus, da er im Wesentlichen auf der Mikrobatchausführung einer Streamingabfrage beruht. Wenn Sie Daten im kontinuierlichen Modus schreiben, verwenden Sie stattdessen foreach().

Ein leerer Datenrahmen kann mit foreachBatch() aufgerufen werden, und der Benutzercode muss resilient sein, um einen ordnungsgemäßen Betrieb zu ermöglichen. Das folgende Beispiel soll dies erläutern:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Verhaltensänderungen bei foreachBatch in der Databricks Runtime 14.0

In der Databricks Runtime 14.0 und höher mit im freigegebenen Zugriffsmodus konfigurierten Computeressourcen wird forEachBatch in einem separaten und isolierten Python-Prozess in Apache Spark und nicht in der REPL-Umgebung ausgeführt. Es wird serialisiert und an Spark übertragen und hat für die Dauer der Sitzung keinen Zugriff auf globale spark-Objekte.

Bei allen anderen Computekonfigurationen wird foreachBatch in derselben Python-REPL ausgeführt, in der auch der übrige Codes ausgeführt wird. Daher wird die Funktion nicht serialisiert.

Wenn Sie Databricks Runtime 14.0 und höher auf Computeressourcen verwenden, die im freigegebenen Zugriffsmodus konfiguriert sind, müssen Sie die sparkSession-Variable im Bereich des lokalen DataFrames verwenden, wenn Sie foreachBatch in Python verwenden, wie im folgenden Codebeispiel gezeigt:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

Dabei gelten die folgenden Verhaltensänderungen:

  • Sie können auf keine globalen Python-Variablen innerhalb Ihrer Funktion zugreifen.
  • print()-Befehle schreiben ihre Ausgabe in die Treiberprotokolle.
  • Alle Dateien, Module oder Objekte, auf die in der Funktion verwiesen wird, müssen serialisierbar und in Spark verfügbar sein.

Wiederverwenden vorhandener Batchdatenquellen

Mit foreachBatch() können Sie vorhandene Batchdaten-Writer für Datensenken verwenden, die möglicherweise keine Unterstützung für strukturiertes Streaming bieten. Hier sind einige Beispiele:

Viele andere Batchdatenquellen können von foreachBatch() aus verwendet werden. Weitere Informationen finden Sie unter Herstellen von Verbindungen mit Datenquellen.

Schreiben an mehrere Speicherorte

Wenn Sie die Ausgabe einer Streamingabfrage an mehrere Speicherorte schreiben müssen, empfiehlt Databricks die Verwendung mehrerer Writer für strukturiertes Streaming, um eine optimale Parallelisierung und einen optimalen Durchsatz zu erzielen.

Die Verwendung foreachBatch zum Schreiben in mehrere Senken serialisiert die Ausführung von Streamingschreibvorgängen, was die Wartezeit für jeden Mikrobatch erhöhen kann.

Wenn Sie zum Schreiben in mehrere Deltatabellen foreachBatch verwenden, lesen Sie die Informationen unter Idempotente Schreibvorgänge in Tabellen in foreachBatch.