Delen via


ForEachBatch gebruiken om naar willekeurige datasinks te schrijven in pijplijnen

Belangrijk

De foreach_batch_sink API bevindt zich in openbare preview.

Met de ForEachBatch-sink kunt u een stroom verwerken als een reeks microbatches. Elke batch kan worden verwerkt in Python met aangepaste logica die vergelijkbaar is met apache Spark Structured Streaming foreachBatch. Met de SDP-sink (Lakeflow Spark Declarative Pipelines) ForEachBatch kunt u streaminggegevens transformeren, samenvoegen of schrijven naar een of meer doelen die geen systeemeigen ondersteuning bieden voor streaming-schrijfbewerkingen. Deze pagina begeleidt u bij het instellen van een ForEachBatch-sink, biedt voorbeelden en bespreekt belangrijke overwegingen.

ForEachBatch-sink biedt de volgende functionaliteit:

  • Aangepaste logica voor elke microbatch: ForEachBatch is een flexibele streaming-sink. U kunt willekeurige acties (zoals samenvoegen in een externe tabel, schrijven naar meerdere bestemmingen of upserts uitvoeren) toepassen met Python-code.
  • Volledige vernieuwingsondersteuning: pijplijnen beheren controlepunten per stroom, dus controlepunten worden automatisch opnieuw ingesteld wanneer u een volledige vernieuwing van uw pijplijn uitvoert. Met de ForEachBatch-sink bent u verantwoordelijk voor het beheren van het resetten van de downstreamgegevens wanneer dit gebeurt.
  • Ondersteuning voor Unity Catalog: ForEachBatch-sink ondersteunt alle Unity Catalog-functies, zoals lezen van of schrijven naar Unity Catalog-volumes of -tabellen.
  • Beperkt beheer: De pijplijn houdt niet bij welke gegevens worden geschreven vanuit een ForEachBatch-sink, waardoor die gegevens niet kunnen worden beheerd of verwijderd. U bent verantwoordelijk voor alle downstreamgegevensbeheer.
  • Event log entries: het gebeurtenislogboek van de pijplijn registreert het maken en gebruiken van elke ForEachBatch-sink. Als uw Python-functie niet serialiseerbaar is, ziet u een waarschuwingsvermelding in het gebeurtenislogboek met aanvullende suggesties.

Opmerking

  • De ForEachBatch-sink is ontworpen voor het streamen van query's, zoals append_flow. Het is niet bedoeld voor pijplijnen die alleen batchgewijs zijn of voor AutoCDC semantiek.
  • De ForEachBatch-sink die op deze pagina wordt beschreven, is bedoeld voor pijplijnen. Apache Spark Structured Streaming ondersteunt foreachBatchook . Zie foreachBatch voor informatie over structured streaming.

Wanneer moet u een ForEachBatch-sink gebruiken

Gebruik een ForEachBatch-sink wanneer uw pijplijn functionaliteit vereist die niet beschikbaar is via een ingebouwde sink-indeling, zoals delta, of kafka. Typische gebruiksvoorbeelden zijn onder andere:

  • Samenvoegen of upserting in een Delta Lake-tabel: Voer aangepaste samenvoeglogica uit voor elke microbatch (bijvoorbeeld het verwerken van bijgewerkte records).
  • Schrijven naar meerdere of niet-ondersteunde bestemmingen: schrijf de uitvoer van elke batch naar meerdere tabellen of externe opslagsystemen die geen ondersteuning bieden voor streaming-schrijfbewerkingen (zoals bepaalde JDBC-sinks).
  • Aangepaste logica of transformaties toepassen: Gegevens rechtstreeks bewerken in Python (bijvoorbeeld met behulp van gespecialiseerde bibliotheken of geavanceerde transformaties).

Zie Sinks in Lakeflow Spark-declaratieve pijplijnen voor informatie over de ingebouwde sinks of het maken van aangepaste sinks met Python.

Syntaxis

Gebruik de @dp.foreach_batch_sink() decoratie om een ForEachBatch-sink te genereren. U kunt er vervolgens naar verwijzen als een target in uw stroomdefinitie, bijvoorbeeld in @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Kenmerk Description
name Optional. Een unieke naam om de sink in de pijplijn te identificeren. Standaard ingesteld op de naam van de UDF, indien niet opgenomen.
batch_handler Dit is de door de gebruiker gedefinieerde functie (UDF) die wordt aangeroepen voor elke microbatch.
Df Spark DataFrame met gegevens voor de huidige microbatch.
batch_id De integer-ID van de micro-batch. Spark incrementeert deze id voor elk triggerinterval.
Een batch_id van 0 staat voor het begin van een stream of het begin van een volledige verversing. De foreach_batch_sink code moet een volledige vernieuwing voor downstreamgegevensbronnen correct verwerken. Zie de volgende sectie voor meer informatie.

Volledig vernieuwen

Omdat ForEachBatch gebruikmaakt van een streamingquery, houdt de pijplijn de controlepuntmap voor elke flow bij. Bij volledig vernieuwen:

  • De controlepuntmap wordt opnieuw ingesteld.
  • Uw sink-functie (foreach_batch_sink UDF) start met een gloednieuwe batch_id cyclus vanaf 0.
  • Gegevens in uw doelsysteem worden niet automatisch opgeschoond door de pijplijn (omdat de pijplijn niet weet waar uw gegevens worden geschreven). Als u een clean-slate-scenario nodig hebt, moet u de externe tabellen of locaties die uw ForEachBatch-sink vult, handmatig verwijderen of afkappen.

Unity Catalog-functies gebruiken

Alle bestaande Unity Catalog-mogelijkheden in Spark Structured Streaming foreach_batch_sink blijven beschikbaar.

Dit omvat het schrijven naar beheerde of externe Unity Catalog-tabellen. U kunt microbatches schrijven naar beheerde of externe tabellen van Unity Catalog, precies zoals in elke Apache Spark Structured Streaming-taak.

Gebeurtenislogboekvermeldingen

Wanneer u een ForEachBatch-sink maakt, wordt er een SinkDefinition-gebeurtenis toegevoegd met "format": "foreachBatch" aan het gebeurtenislogboek van de pijplijn.

Hiermee kunt u het gebruik van ForEachBatch-sinks bijhouden en waarschuwingen over uw sink bekijken.

Gebruiken met Databricks Connect

Als de functie die u opgeeft niet serialiseerbaar is (een belangrijke vereiste voor Databricks Connect), bevat het gebeurtenislogboek een WARN vermelding die u aanbeveelt om uw code te vereenvoudigen of te herstructureren als ondersteuning voor Databricks Connect vereist is.

Als u bijvoorbeeld parameters binnen een ForEachBatch UDF opgeeft met dbutils, kunt u in plaats daarvan het argument ophalen voordat u het in de UDF gebruikt.

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Beste praktijken

  1. Houd uw ForEachBatch-functie beknopt: vermijd threading, zware bibliotheekafhankelijkheden of grote gegevensmanipulaties in het geheugen. Complexe en toestandsafhankelijke logica kan leiden tot serialisatiefouten of prestatieknelpunten.
  2. Controleer uw controlepuntmap: voor streamingquery's beheert SDP controlepunten per stroom, niet per sink. Als u meerdere stromen in uw pijplijn hebt, heeft elke stroom een eigen controlepuntmap.
  3. Externe afhankelijkheden valideren: als u afhankelijk bent van externe systemen of bibliotheken, controleert u of deze zijn geïnstalleerd op alle clusterknooppunten of in uw container.
  4. Let op Databricks Connect: als uw omgeving in de toekomst naar Databricks Connect kan worden verplaatst, controleert u of uw code serializeerbaar is en niet afhankelijk is dbutils van de foreach_batch_sink UDF.

Beperkingen

  • Geen opruiming voor ForEachBatch: omdat uw aangepaste Python-code overal gegevens kan schrijven, kan de pijplijn die gegevens niet opruimen of bijhouden. U moet uw eigen beleid voor gegevensbeheer of bewaarbeleid afhandelen voor de bestemmingen waarnaar u schrijft.
  • Metrische gegevens in microbatch: Pijplijnen verzamelen metrische streaminggegevens, maar sommige scenario's kunnen onvolledige of ongebruikelijke metrische gegevens veroorzaken bij het gebruik van ForEachBatch. Dit komt door de onderliggende flexibiliteit van ForEachBatch, waardoor het bijhouden van gegevensstromen en rijen moeilijk is voor het systeem.
  • Ondersteuning voor het schrijven naar meerdere bestemmingen zonder meerdere leesbewerkingen: Sommige klanten kunnen ForEachBatch gebruiken om één keer van een bron te lezen en vervolgens naar meerdere bestemmingen te schrijven. Om dit te bereiken, moet u df.persist of df.cache opnemen binnen uw ForEachBatch-functie. Met deze opties probeert Azure Databricks de gegevens slechts één keer gereed te maken. Zonder deze opties resulteert uw query in meerdere leesbewerkingen. Dit is niet opgenomen in de volgende codevoorbeelden.
  • Gebruiken met Databricks Connect: als uw pijplijn wordt uitgevoerd op Databricks Connect, foreachBatch moeten door de gebruiker gedefinieerde functies (UDF) serialiseerbaar zijn en kunnen ze niet worden gebruikt dbutils. De pijplijn genereert waarschuwingen als er een niet-serialiseerbare UDF wordt gedetecteerd, zonder dat de pijplijn faalt.
  • Niet-serialiseerbare logica: code die verwijst naar lokale objecten, klassen of niet-selecteerbare resources, kunnen worden verbroken in Databricks Connect-contexten. Gebruik pure Python-modules en controleer of verwijzingen (bijvoorbeeld dbutils) niet worden gebruikt als Databricks Connect een vereiste is.

Voorbeelden

Voorbeeld van basissyntaxis

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Voorbeeldgegevens gebruiken voor een eenvoudige pijplijn

In dit voorbeeld wordt het NYC Taxi-voorbeeld gebruikt. Hierbij wordt ervan uitgegaan dat uw werkruimtebeheerder de databricks Public Datasets-catalogus heeft ingeschakeld. Voor de sink, wijzig my_catalog.my_schema naar een catalogus en een schema waartoe u toegang hebt.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Schrijven naar meerdere bestemmingen

In dit voorbeeld wordt naar meerdere bestemmingen geschreven. Het toont het gebruik van txnVersion en txnAppId om schrijfbewerkingen naar Delta Lake-tabellen idempotent te maken. Zie Idempotent table writes in foreachBatchvoor meer informatie.

Stel dat we naar twee tabellen table_a en table_b schrijven, en stel dat de schrijfbewerking naar table_a in een batch slaagt terwijl de schrijfbewerking naar table_b mislukt. Wanneer de batch opnieuw wordt uitgevoerd, staat het (txnVersion, txnAppId) -paar Delta toe om de dubbele schrijfbewerking naar table_ate negeren en alleen naar de batch te table_bschrijven.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

spark.sql() gebruiken

U kunt in uw ForEachBatch-sink gebruiken spark.sql() , zoals in het volgende voorbeeld.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Veelgestelde vragen (FAQ)

Kan ik dbutils gebruiken in mijn ForEachBatch-sink?

Als u van plan bent om uw pijplijn uit te voeren in een niet-Databricks Connect-omgeving, dbutils kan dat werken. Als u echter Databricks Connect gebruikt, dbutils is deze niet toegankelijk binnen uw foreachBatch functie. De pijplijn kan waarschuwingen genereren wanneer het het gebruik van dbutils detecteert om je te helpen onderbrekingen te voorkomen.

Kan ik meerdere stromen gebruiken met één ForEachBatch-sink?

Ja. U kunt meerdere stromen definiëren (met @dp.append_flow) die allemaal eenzelfde doellocatie hebben, maar ze behouden elk hun eigen checkpoints.

Verwerkt de pijplijn het bewaren of opschonen van gegevens voor mijn doel?

Nee. Omdat de ForEachBatch-sink naar een willekeurige locatie of elk willekeurig systeem kan schrijven, kan de pijplijn geen gegevens in dat doel automatisch beheren of verwijderen. U moet deze bewerkingen afhandelen als onderdeel van uw aangepaste code of externe processen.

Hoe los ik serialisatiefouten of -fouten op in mijn ForEachBatch-functie?

Bekijk de logboeken van het clusterstuurprogramma of de gebeurtenislogboeken van de pijplijn. Controleer voor serialisatieproblemen met Spark Connect of uw functie alleen afhankelijk is van serialiseerbare Python-objecten en niet verwijst naar niet-toegestane objecten (zoals geopende bestandsingangen of dbutils).