Formazione
Modulo
Eseguire l'elaborazione incrementale con lo streaming strutturato Spark - Training
Informazioni sullo streaming strutturato Spark e sui modi per ottimizzarlo e utilizzarlo per popolare gli oggetti di destinazione
Questo browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
Questo articolo illustra l'uso di foreachBatch
con Structured Streaming per scrivere l'output di una query di streaming in origini dati che non dispongono di un sink di streaming esistente.
Il modello streamingDF.writeStream.foreachBatch(...)
di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch
accettano due parametri:
È necessario usare foreachBatch
per le operazioni di merge Delta Lake in Structured Streaming. Vedere Upsert dalle query di streaming usando foreachBatch.
Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch()
è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch()
e l'operazione SQL MERGE INTO
per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.
Importante
foreachBatch()
fornisce solo garanzie di scrittura at-least-once. Tuttavia, è possibile usare il batchId
fornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare sulla semantica end-to-end manualmente.foreachBatch()
non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usare foreach()
invece .È possibile richiamare un dataframe vuoto con foreachBatch()
e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso condiviso si applicano le modifiche di comportamento seguenti:
print()
scrivono l'output nei log del driver.dbutils.widgets
all'interno della funzione.Usando foreachBatch()
, è possibile usare writer di dati batch esistenti per sink di dati che potrebbero non avere il supporto di Structured Streaming. Ecco alcuni esempi:
Molte altre origini dati batch possono essere usate da foreachBatch()
. Vedere Connettersi alle origini dati.
Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più writer di streaming strutturato per ottimizzare la parallelizzazione e la velocità effettiva.
L'uso foreachBatch
di per scrivere in più sink serializza l'esecuzione di scritture di streaming, che può aumentare la latenza per ogni micro batch.
Se si usa foreachBatch
per scrivere in più tabelle Delta, vedere scritture di tabelle Idempotenti in foreachBatch.
Formazione
Modulo
Eseguire l'elaborazione incrementale con lo streaming strutturato Spark - Training
Informazioni sullo streaming strutturato Spark e sui modi per ottimizzarlo e utilizzarlo per popolare gli oggetti di destinazione
Documentazione
Checkpoint di Structured Streaming - Azure Databricks
Questo articolo offre informazioni generali sui checkpoint di Structured Streaming.
Configurare la dimensione del batch di Structured Streaming in Azure Databricks - Azure Databricks
La limitazione della frequenza di input per le query Structured Streaming contribuisce a mantenere dimensioni di batch coerenti e impedisce che batch di grandi dimensioni causino spill e ritardi di elaborazione di micro-batch a catena.
Configurare gli intervalli di trigger di Structured Streaming - Azure Databricks
Informazioni su come configurare gli intervalli di trigger di Structured Streaming in Azure Databricks.