Condividi tramite


Utilizzare ForEachBatch per scrivere su destinazioni dati arbitrarie nelle pipeline

Importante

L'API foreach_batch_sink è disponibile in anteprima pubblica.

Il sink ForEachBatch consente di elaborare un flusso come una serie di micro batch. Ogni batch può essere elaborato in Python con logica personalizzata simile a quella di Apache Spark Structured Streaming.foreachBatch Con il sink ForEachBatch delle Pipeline Dichiarative di Lakeflow Spark (SDP), è possibile trasformare, unire o scrivere dati di streaming in una o più destinazioni che non supportano in modo nativo le scritture di streaming. Questa pagina illustra come configurare un sink ForEachBatch, fornisce esempi e illustra le considerazioni principali.

Il sink ForEachBatch offre le funzionalità seguenti:

  • Logica personalizzata per ogni micro-batch: ForEachBatch è un sink di streaming flessibile. È possibile applicare azioni arbitrarie( ad esempio l'unione in una tabella esterna, la scrittura in più destinazioni o l'esecuzione di upsert) con il codice Python.
  • Supporto per l'aggiornamento completo: le pipeline gestiscono i checkpoint in base al flusso, quindi i checkpoint vengono reimpostati automaticamente quando si esegue un aggiornamento completo della pipeline. Con il sink ForEachBatch, l'utente è responsabile della gestione del ripristino dei dati downstream quando questo accade.
  • Supporto del catalogo Unity: il sink ForEachBatch supporta tutte le funzionalità del catalogo Unity, ad esempio la lettura o la scrittura in volumi o tabelle del catalogo Unity.
  • Manutenzione limitata: la pipeline non tiene traccia dei dati scritti da un sink ForEachBatch, quindi non è possibile pulire tali dati. L'utente è responsabile di qualsiasi gestione dei dati downstream.
  • Voci del registro eventi: Il registro eventi della pipeline registra la creazione e l'utilizzo di ogni sink ForEachBatch. Se la funzione Python non è serializzabile, viene visualizzata una voce di avviso nel registro eventi con suggerimenti aggiuntivi.

Annotazioni

  • Il sink ForEachBatch è progettato per le query di streaming, ad esempio append_flow. Non è progettato per pipeline esclusivamente batch o per la semantica AutoCDC.
  • Il sink ForEachBatch, descritto in questa pagina, è relativo alle pipeline. Apache Spark Structured Streaming supporta anche foreachBatch. Per informazioni su Structured Streaming foreachBatch, consultare Usare foreachBatch per scrivere su dispositivi di dati arbitrari.

Quando utilizzare un sink ForEachBatch

Usare un sink ForEachBatch ogni volta che la pipeline richiede funzionalità non disponibili tramite un formato sink predefinito, come delta, o kafka. I casi d'uso tipici includono:

  • Unione o inserimento o aggiornamento in una tabella Delta Lake: eseguire la logica di unione personalizzata per ogni micro-batch (ad esempio, gestione dei record aggiornati).
  • Scrittura in più destinazioni o non supportate: scrivere l'output di ogni batch in più tabelle o sistemi di archiviazione esterni che non supportano le scritture di streaming (ad esempio alcuni sink JDBC).
  • Applicazione di logica o trasformazioni personalizzate: modificare direttamente i dati in Python, ad esempio usando librerie specializzate o trasformazioni avanzate.

Per informazioni sui sink predefiniti o sulla creazione di sink personalizzati con Python, vedere Sinks in Lakeflow Spark Declarative Pipelines.

Sintassi

Utilizza la decorazione @dp.foreach_batch_sink() per generare un sink ForEachBatch. È quindi possibile fare riferimento a questo come nella target definizione del flusso, ad esempio in @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parametro Description
nome Optional. Nome univoco per identificare il sink all'interno della pipeline. L'impostazione predefinita è il nome della funzione definita dall'utente, se non incluso.
batch_handler Questa è la funzione definita dall'utente che verrà chiamata per ogni micro batch.
Df DataFrame Spark contenente i dati per il micro batch corrente.
batch_id ID intero del micro batch. Spark incrementa questo ID per ogni intervallo di trigger.
Un batch_id di 0 rappresenta l'inizio di un flusso o l'inizio di un aggiornamento completo. Il foreach_batch_sink codice deve gestire correttamente un aggiornamento completo per le origini dati downstream. Per altre informazioni, vedere la sezione successiva.

Aggiornamento completo

Poiché ForEachBatch usa una query di streaming, la pipeline tiene traccia della directory del checkpoint per ogni flusso. All'aggiornamento completo:

  • La directory del checkpoint viene ripristinata.
  • La funzione sink (foreach_batch_sink UDF) rileva l'inizio di un nuovo batch_id ciclo a partire da 0.
  • I dati nel sistema di destinazione non vengono puliti automaticamente dalla pipeline (perché la pipeline non sa dove vengono scritti i dati). Se è necessario uno scenario a tabula rasa, è necessario eliminare o troncare manualmente le tabelle o i percorsi esterni popolati dal sink ForEachBatch.

Uso delle funzionalità del catalogo Unity

Tutte le funzionalità del catalogo Unity esistenti in Spark Structured Streaming foreach_batch_sink rimangono disponibili.

Questo include la scrittura in tabelle gestite o esterne del catalogo Unity. È possibile scrivere micro batch in tabelle gestite o esterne di Unity Catalog esattamente come in qualsiasi processo apache Spark Structured Streaming.

Voci del registro eventi

Quando si crea un sink ForEachBatch, un evento SinkDefinition con "format": "foreachBatch" viene aggiunto al log degli eventi della pipeline.

In questo modo è possibile tenere traccia dell'utilizzo dei sink ForEachBatch e visualizzare gli avvisi relativi al sink.

Uso con Databricks Connect

Se la funzione fornita non è serializzabile (un requisito importante per Databricks Connect), il registro eventi include una WARN voce che consiglia di semplificare o effettuare il refactoring del codice se è necessario il supporto di Databricks Connect.

Ad esempio, se si utilizza dbutils per ottenere i parametri all'interno di una funzione definita dall'utente (UDF) ForEachBatch, è possibile ottenere l'argomento prima di utilizzarlo nella UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Procedure consigliate

  1. Mantenere concisa la funzione ForEachBatch: evitare il threading, le dipendenze delle librerie pesanti o le manipolazioni di dati in memoria di grandi dimensioni. La logica complessa o con stato può causare errori di serializzazione o problemi di prestazioni.
  2. Monitorare la cartella del checkpoint: per le query di streaming, SDP gestisce i checkpoint in base al flusso, non al sink. Se nella pipeline sono presenti più flussi, ogni flusso ha una propria directory di checkpoint.
  3. Convalidare le dipendenze esterne: se ci si basa su sistemi o librerie esterni, verificare che siano installati in tutti i nodi del cluster o nel contenitore.
  4. Presta attenzione a Databricks Connect: se l'ambiente potrebbe passare a Databricks Connect in futuro, verifica che il codice sia serializzabile e non si basi sulla dbutils funzione definita dall'utente foreach_batch_sink.

Limitazioni

  • Nessuna manutenzione per ForEachBatch: poiché il codice Python personalizzato può scrivere dati ovunque, la pipeline non può pulire o tenere traccia dei dati. È necessario gestire i propri criteri di gestione o conservazione dei dati per le destinazioni in cui si scrive.
  • Metriche in micro batch: le pipeline raccolgono metriche di streaming, ma alcuni scenari possono causare metriche incomplete o insolite quando si usa ForEachBatch. Ciò è dovuto alla flessibilità sottostante di ForEachBatch che rende difficile il rilevamento del flusso di dati e delle righe per il sistema.
  • Supporto della scrittura in più destinazioni senza più letture: alcuni clienti possono usare ForEachBatch per leggere da un'origine una sola volta e quindi scrivere in più destinazioni. A tale scopo, è necessario includere df.persist o df.cache all'interno della funzione ForEachBatch. Usando queste opzioni, Azure Databricks tenterà di preparare i dati solo una sola volta. Senza queste opzioni, la query genererà più letture. Questo non è incluso negli esempi di codice seguenti.
  • Uso con Databricks Connect: se la pipeline viene eseguita su Databricks Connect, foreachBatch le funzioni definite dall'utente devono essere serializzabili e non possono usare dbutils. La pipeline emette avvisi se rileva un UDF non serializzabile, ma non causa il fallimento della pipeline.
  • Logica non serializzabile: il codice che fa riferimento a oggetti locali, classi o risorse non selezionabili può interrompere i contesti di Databricks Connect. Usare moduli Python puri e verificare che i riferimenti (ad esempio, dbutils) non vengano usati se Databricks Connect è un requisito.

Esempi

Esempio della sintassi di base

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Uso di dati di esempio per una pipeline semplice

Questo esempio usa l'esempio NYC Taxi. Si presuppone che l'amministratore dell'area di lavoro abbia abilitato il catalogo di set di dati pubblici di Databricks. Per il sink, passare my_catalog.my_schema a un catalogo e a uno schema a cui si ha accesso.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Scrittura verso più destinazioni

Questo esempio scrive in più destinazioni. Mostra come utilizzare txnVersion e txnAppId per rendere le scritture nelle tabelle Delta Lake idempotenti. Per informazioni dettagliate, vedere Scritture di tabelle idempotenti in foreachBatch.

Si supponga di scrivere in due tabelle table_a e table_b, e si supponga che all'interno di un batch la scrittura table_a abbia esito positivo mentre la scrittura in table_b ha esito negativo. Quando il batch viene eseguito nuovamente, la coppia (txnVersion, txnAppId) consentirà a Delta di ignorare la scrittura duplicata in table_ae di scrivere solo il batch in table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Utilizzo di spark.sql()

È possibile usare spark.sql() nel sink ForEachBatch, come nell'esempio seguente.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Domande frequenti

È possibile usare dbutils nel sink ForEachBatch?

Se si prevede di eseguire la pipeline in un ambiente non Databricks Connect, dbutils può funzionare. Tuttavia, se si usa Databricks Connect, dbutils non è accessibile all'interno della foreachBatch funzione. La pipeline può generare avvisi se rileva dell'utilizzo dbutils per aiutarti a evitare interruzioni.

È possibile utilizzare più flussi con un unico sink ForEachBatch?

Sì. È possibile definire più flussi (con @dp.append_flow) destinati allo stesso nome di sink, ma ognuno mantiene i propri checkpoint.

La pipeline gestisce la conservazione o la pulizia dei dati per il tuo target?

No Poiché il sink ForEachBatch può scrivere in qualsiasi posizione o sistema arbitrario, la pipeline non può gestire o eliminare automaticamente i dati in tale destinazione. È necessario gestire queste operazioni come parte del codice personalizzato o dei processi esterni.

Come è possibile risolvere gli errori di serializzazione o gli errori nella funzione ForEachBatch?

Esamina i log dei driver del cluster o i log degli eventi della pipeline. Per i problemi di serializzazione correlati a Spark Connect, verificare che la funzione dipende solo da oggetti Python serializzabili e non faccia riferimento a oggetti non consentiti (ad esempio handle di file aperti o dbutils).