Dela via


Använd ForEachBatch för att skriva till godtyckliga datamottagare i pipelines

Viktigt!

API:et foreach_batch_sink finns i offentlig förhandsversion.

Med ForEachBatch-mottagaren kan du bearbeta en ström som en serie mikrobatcher. Varje batch kan bearbetas i Python med anpassad logik som liknar Apache Spark Structured Streamings foreachBatch. Med Lakeflow Spark Deklarativa pipelines (SDP) ForEachBatch-mottagare kan du transformera, sammanfoga eller skriva strömmande data till ett eller flera mål som inte har inbyggt stöd för strömningsskrivningar. Den här sidan beskriver hur du konfigurerar en ForEachBatch-mottagare, ger exempel och diskuterar viktiga överväganden.

ForEachBatch sink tillhandahåller följande funktionalitet:

  • Anpassad logik för varje mikrobatch: ForEachBatch är en flexibel strömningsmottagare. Du kan tillämpa godtyckliga åtgärder (till exempel sammanslagning i en extern tabell, skriva till flera mål eller utföra upserts) med Python-kod.
  • Fullständigt uppdateringsstöd: Pipelines hanterar kontrollpunkter per flöde, så kontrollpunkter återställs automatiskt när du utför en fullständig uppdatering av din pipeline. Med ForEachBatch-sink ansvarar du för att hantera återställning av nedströmsdata när detta sker.
  • Stöd för Unity Catalog: ForEachBatch-mottagare stöder alla Unity Catalog-funktioner, till exempel läsning från eller skrivning till Unity Catalog-volymer eller -tabeller.
  • Begränsad hushållning: Pipelinen spårar inte vilka data som skrivs från en ForEachBatch-mottagare, så det går inte att rensa dessa data. Du ansvarar för all nedströms datahantering.
  • Händelseloggposter: Händelseloggen för pipeline registrerar skapande och användning av varje ForEachBatch-mottagare. Om python-funktionen inte kan serialiseras visas en varningspost i händelseloggen med ytterligare förslag.

Anmärkning

  • ForEachBatch-mottagaren är utformad för strömmande frågor, till exempel append_flow. Det är inte avsett för endast batch-baserade pipelines eller för AutoCDC-semantiken.
  • ForEachBatch-mottagaren som beskrivs på den här sidan är för pipelines. Apache Spark Structured Streaming stöder foreachBatchockså . Information om strukturerad direktuppspelning foreachBatchfinns i Använda foreachBatch för att skriva till godtyckliga datamottagare.

När använda en ForEachBatch-sink

Använd en ForEachBatch-mottagare när din pipeline kräver funktioner som inte är tillgängliga via ett inbyggt mottagarformat som delta, eller kafka. Vanliga användningsfall är:

  • Sammanslagning eller upserting till en Delta Lake-tabell: Kör anpassad kopplingslogik för varje mikrobatch (till exempel hantering av uppdaterade poster).
  • Skriva till flera mål eller mål som inte stöds: Skriv utdata av varje batch till flera tabeller eller externa lagringslösningar som inte stöder strömmande skrivningar (som vissa JDBC-sänkare).
  • Tillämpa anpassad logik eller transformeringar: Manipulera data i Python direkt (till exempel med hjälp av specialiserade bibliotek eller avancerade transformeringar).

Information om inbyggda sinks eller hur du skapar anpassade sinks med Python finns i Sinks in Lakeflow Spark Deklarativa Pipelines.

Syntax

Använd @dp.foreach_batch_sink()-dekorationen för att generera en ForEachBatch-sink. Du kan sedan referera till detta som en target i flödesdefinitionen, till exempel i @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parameter Description
name Valfritt. Ett unikt namn för att identifiera slutpunkten i pipelinen. Använder UDF-namnet som standard om det inte anges.
batch_handler Det här är den användardefinierade funktionen (UDF) som anropas för varje mikrobatch.
Df Spark DataFrame som innehåller data för den aktuella mikrobatchen.
batch_id Heltals-ID för mikrobatchen. Spark ökar detta ID för varje utlösarintervall.
En batch_id av 0 representerar början av en dataström eller början på en fullständig uppdatering. Koden foreach_batch_sink bör korrekt hantera en fullständig uppdatering för nedströms datakällor. Mer information finns i nästa avsnitt.

Fullständig uppdatering

Eftersom ForEachBatch använder en strömmande fråga spårar pipelinen kontrollpunktskatalogen för varje flöde. Vid fullständig uppdatering:

  • Kontrollpunktskatalogen återställs.
  • Din sinkfunktion (foreach_batch_sink UDF) registrerar en helt ny batch_id cykel från början vid 0.
  • Data i målsystemet rensas inte automatiskt av pipelinen (eftersom pipelinen inte vet var dina data skrivs). Om du behöver börja från början, måste du manuellt ta bort eller minska de externa tabeller eller dataplats som datamottagaren ForEachBatch fyller i.

Använda Unity Catalog-funktioner

Alla befintliga Unity-katalogfunktioner i Spark Structured Streaming foreach_batch_sink är fortfarande tillgängliga.

Detta omfattar skrivning till hanterade eller externa Unity Catalog-tabeller. Du kan skriva mikrobatch till hanterade eller externa tabeller i Unity Catalog precis som i alla Apache Spark Structured Streaming-jobb.

Händelseloggar

När du skapar en ForEachBatch-mottagare läggs en SinkDefinition händelse med "format": "foreachBatch" till i pipelinens händelselogg.

På så sätt kan du spåra användningen av ForEachBatch-mottagare och se varningar om din mottagare.

Användning med Databricks Connect

Om funktionen du anger inte är serialiserbar (ett viktigt krav för Databricks Connect) innehåller händelseloggen en WARN post som rekommenderar att du förenklar eller omstrukturerar koden om Databricks Connect-support krävs.

Om du till exempel använder dbutils för att hämta parametrar i en ForEachBatch UDF kan du i stället hämta argumentet innan du använder det i UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Metodtips

  1. Behåll funktionen ForEachBatch kortfattad: Undvik trådning, stora biblioteksberoenden eller stora minnesinterna datamanipuleringar. Komplex eller tillståndskänslig logik kan leda till serialiseringsfel eller flaskhalsar i prestanda.
  2. Övervaka din kontrollpunktsmapp: För strömmande frågeställningar hanterar SDP kontrollpunkter efter flöde, inte efter slutpunkt. Om du har flera flöden i pipelinen har varje flöde en egen kontrollpunktskatalog.
  3. Verifiera externa beroenden: Om du förlitar dig på externa system eller bibliotek kontrollerar du att de är installerade på alla klusternoder eller i containern.
  4. Tänk på Databricks Connect: Om din miljö kan flyttas till Databricks Connect i framtiden, kontrollera att koden är serialiserbar och att det inte förlitar sig på dbutils inom foreach_batch_sink UDF.

Begränsningar

  • Inget underhåll för ForEachBatch: Eftersom din anpassade Python-kod kan skriva data var som helst, kan pipelinen inte rensa eller spåra dessa data. Du måste hantera dina egna datahanterings- eller bevarandeprinciper för de destinationer som du skriver till.
  • Metriker i mikrobatch: Pipelines samlar in streaming-mått, men vissa scenarier kan orsaka ofullständiga eller ovanliga mått när man använder ForEachBatch. Detta beror på den underliggande flexibiliteten i ForEachBatch, vilket gör det svårt för systemet att spåra dataflöde och rader.
  • Stöd för att skriva till flera destinationer utan flera läsningar: Vissa kunder kan använda ForEachBatch för att läsa från en källa en gång och sedan skriva till flera destinationer. För att åstadkomma detta måste du inkludera df.persist eller df.cache inuti din ForEachBatch-funktion. Med de här alternativen försöker Azure Databricks bara förbereda data en enda gång. Utan dessa alternativ resulterar din fråga i flera läsningar. Detta ingår inte i följande kodexempel.
  • Använda med Databricks Connect: Om pipelinen körs på Databricks Connect foreachBatch måste användardefinierade funktioner (UDF) vara serialiserbara och kan inte använda dbutils. Pipelinen varnar om den identifierar en icke-serialiserbar UDF, men misslyckas inte med pipelinen.
  • Logik som inte kan serialiseras: Kod som refererar till lokala objekt, klasser eller obildbara resurser kan brytas i Databricks Connect-kontexter. Använd rena Python-moduler och bekräfta att referenser (till exempel dbutils) inte används om Databricks Connect är ett krav.

Examples

Exempel på grundläggande syntax

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Använda exempeldata för en enkel pipeline

I det här exemplet används NYC Taxi-exemplet. Det förutsätter att din arbetsyteadministratör har aktiverat katalogen Databricks Public Datasets. För mottagaren ändrar my_catalog.my_schema du till en katalog och ett schema som du har åtkomst till.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Skriva till flera mål

Det här exemplet skriver till flera destinationer. Den visar hur du använder txnVersion och txnAppId för att göra skrivningar till Delta Lake-tabeller idempotenta. Mer information om Idempotent-tabellskrivningar finns i .

Anta att vi skriver till två tabeller, table_a och table_b, och att skrivning till table_a lyckas i en batch medan skrivning till table_b misslyckas. När batchen körstxnVersion igen tillåter (, txnAppId) -paret Delta att ignorera duplicerad skrivning till table_aoch endast skriva batchen till table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Att använda spark.sql()

Du kan använda spark.sql() i din ForEachBatch-mottagare, som i följande exempel.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Vanliga frågor (FAQ)

Kan jag använda dbutils i min ForEachBatch-mottagare?

Om du planerar att köra din pipeline i en icke-Databricks Connect-miljö dbutils kan det fungera. Men, om du använder Databricks Connect, är dbutils inte tillgängligt i din foreachBatch-funktion. Pipelinen kan generera varningar om den identifierar användning av dbutils för att hjälpa dig undvika avbrott.

Kan jag använda flera flöden med en enda ForEachBatch-mottagare?

Ja. Du kan definiera flera flöden (med @dp.append_flow) som alla riktar sig mot samma mottagarnamn, men där varje flöde behåller sina egna kontrollpunkter.

Omfattar pipelinen datakvarhållning eller rensning för mitt målsystem?

Nej. Eftersom ForEachBatch-datasänkan kan skriva till vilken plats som helst eller ett godtyckligt system kan pipelinen inte automatiskt hantera eller ta bort data i målsystemet. Du måste hantera dessa åtgärder som en del av din anpassade kod eller externa processer.

Hur felsöker jag serialiseringsfel eller -fel i min ForEachBatch-funktion?

Titta på dina klusterdrivrutinsloggar eller pipelinehändelseloggar. För problem med Spark Connect-relaterad serialisering, kontrollera att din funktion bara är beroende av serialiserbara Python-objekt och inte refererar till otillåtna objekt (till exempel öppna filhandtag eller dbutils).