ForeachBatch gebruiken om naar willekeurige gegevenssinks te schrijven

In dit artikel wordt het gebruik foreachBatch met Structured Streaming besproken om de uitvoer van een streamingquery te schrijven naar gegevensbronnen die geen bestaande streaming-sink hebben.

Met het codepatroon streamingDF.writeStream.foreachBatch(...) kunt u batchfuncties toepassen op de uitvoergegevens van elke microbatch van de streamingquery. Functies die worden gebruikt met foreachBatch twee parameters:

  • Een DataFrame met de uitvoergegevens van een microbatch.
  • De unieke id van de microbatch.

U moet voor Delta Lake-samenvoegbewerkingen in Structured Streaming gebruiken foreachBatch . Zie Upsert van streamingquery's met behulp van foreachBatch.

Aanvullende DataFrame-bewerkingen toepassen

Veel dataframe- en gegevenssetbewerkingen worden niet ondersteund in streaming DataFrames, omdat Spark in die gevallen geen ondersteuning biedt voor het genereren van incrementele plannen. Met behulp foreachBatch() van kunt u een aantal van deze bewerkingen toepassen op elke microbatch-uitvoer. U kunt bijvoorbeeld en de SQL-bewerking MERGE INTO gebruiken foreachBath() om de uitvoer van streamingaggregaties naar een Delta-tabel te schrijven in de updatemodus. Zie meer informatie in MERGE INTO.

Belangrijk

  • foreachBatch() biedt slechts ten minste eenmaal schrijfgaranties. U kunt de batchId opgegeven functie echter gebruiken als manier om de uitvoer te ontdubbelen en exact één keer een garantie te krijgen. In beide gevallen moet u zelf redeneren over de end-to-end semantiek.
  • foreachBatch() werkt niet met de modus voor continue verwerking, omdat deze fundamenteel afhankelijk is van de microbatchuitvoering van een streamingquery. Als u gegevens schrijft in de doorlopende modus, gebruikt foreach() u in plaats daarvan.

Een leeg dataframe kan worden aangeroepen met foreachBatch() en gebruikerscode moet tolerant zijn om de juiste werking mogelijk te maken. Hier kunt u een voorbeeld bekijken:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Gedragswijzigingen voor foreachBatch Databricks Runtime 14.0

In Databricks Runtime 14.0 en hoger op berekeningen die zijn geconfigureerd met de modus voor gedeelde toegang, forEachBatch wordt uitgevoerd in een afzonderlijk geïsoleerd Python-proces op Apache Spark in plaats van in de REPL-omgeving. Het wordt geserialiseerd en naar Spark gepusht en heeft gedurende de duur van de sessie geen toegang tot globale spark objecten.

In alle andere rekenconfiguraties wordt foreachBatch deze uitgevoerd in dezelfde Python REPL die de rest van uw code uitvoert. Als gevolg hiervan wordt de functie niet geserialiseerd.

Wanneer u Databricks Runtime 14.0 en hoger gebruikt voor berekening die is geconfigureerd met de modus voor gedeelde toegang, moet u de sparkSession variabele gebruiken die is gericht op het lokale DataFrame wanneer u in Python gebruikt foreachBatch , zoals in het volgende codevoorbeeld:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

De volgende gedragswijzigingen zijn van toepassing:

  • U hebt geen toegang tot globale Python-variabelen vanuit uw functie.
  • print() opdrachten schrijven uitvoer naar de stuurprogrammalogboeken.
  • Bestanden, modules of objecten waarnaar in de functie wordt verwezen, moeten serialiseren en beschikbaar zijn in Spark.

Bestaande batchgegevensbronnen opnieuw gebruiken

Met behulp van foreachBatch()kunt u bestaande batchgegevensschrijvers gebruiken voor gegevenssinks die mogelijk geen ondersteuning voor Structured Streaming hebben. Enkele voorbeelden:

Veel andere batchgegevensbronnen kunnen worden gebruikt uit foreachBatch(). Zie Verbinding maken naar gegevensbronnen.

Schrijven naar meerdere locaties

Als u de uitvoer van een streamingquery naar meerdere locaties moet schrijven, raadt Databricks aan om meerdere structured streaming-schrijvers te gebruiken voor de beste parallelle uitvoering en doorvoer.

Het gebruik foreachBatch om naar meerdere sinks te schrijven serialiseert de uitvoering van streaming-schrijfbewerkingen, waardoor de latentie voor elke microbatch kan toenemen.

Als u wel foreachBatch naar meerdere Delta-tabellen schrijft, raadpleegt u Idempotent table writes in foreachBatch.