Eseguire il primo carico di lavoro Structured Streaming

Questo articolo fornisce esempi di codice e spiegazione dei concetti di base necessari per eseguire le prime query structured streaming in Azure Databricks. È possibile usare Structured Streaming per carichi di lavoro di elaborazione quasi in tempo reale e incrementali.

Structured Streaming è una delle diverse tecnologie che alimentano le tabelle di streaming nelle tabelle Live Delta. Databricks consiglia di usare tabelle Live Delta per tutti i nuovi carichi di lavoro ETL, inserimento e Structured Streaming. Vedere Che cos'è le tabelle live Delta?.

Nota

Sebbene le tabelle live Delta forniscano una sintassi leggermente modificata per la dichiarazione di tabelle di streaming, la sintassi generale per la configurazione delle letture e delle trasformazioni di streaming si applica a tutti i casi d'uso di streaming in Azure Databricks. Le tabelle live delta semplificano anche lo streaming gestendo informazioni sullo stato, metadati e numerose configurazioni.

Leggere da un flusso di dati

È possibile usare Structured Streaming per inserire dati in modo incrementale da origini dati supportate. Alcune delle origini dati più comuni usate nei carichi di lavoro Di azure Databricks Structured Streaming includono quanto segue:

  • File di dati nell'archiviazione di oggetti cloud
  • Bus e code dei messaggi
  • Delta Lake

Databricks consiglia di usare il caricatore automatico per l'inserimento in streaming dall'archiviazione di oggetti cloud. Il caricatore automatico supporta la maggior parte dei formati di file supportati da Structured Streaming. Vedere Che cos'è il caricatore automatico?.

Ogni origine dati offre diverse opzioni per specificare come caricare batch di dati. Durante la configurazione del lettore, le opzioni principali che potrebbe essere necessario impostare rientrano nelle categorie seguenti:

  • Opzioni che specificano l'origine dati o il formato (ad esempio, tipo di file, delimitatori e schema).
  • Opzioni che consentono di configurare l'accesso ai sistemi di origine( ad esempio, le impostazioni e le credenziali della porta).
  • Opzioni che specificano dove iniziare in un flusso( ad esempio, offset Kafka o lettura di tutti i file esistenti).
  • Opzioni che controllano la quantità di dati elaborati in ogni batch (ad esempio, offset massimi, file o byte per batch).

Usare il caricatore automatico per leggere i dati di streaming dall'archiviazione oggetti

L'esempio seguente illustra il caricamento di dati JSON con il caricatore automatico, che usa per indicare il formato e le cloudFiles opzioni. L'opzione schemaLocation abilita l'inferenza dello schema e l'evoluzione. Incollare il codice seguente in una cella del notebook di Databricks ed eseguire la cella per creare un dataframe di streaming denominato raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Analogamente ad altre operazioni di lettura in Azure Databricks, la configurazione di una lettura in streaming non carica effettivamente i dati. È necessario attivare un'azione sui dati prima dell'inizio del flusso.

Nota

La chiamata display() a un dataframe di streaming avvia un processo di streaming. Per la maggior parte dei casi d'uso di Structured Streaming, l'azione che attiva un flusso deve scrivere dati in un sink. Vedere Preparazione del codice Structured Streaming per la produzione.

Eseguire una trasformazione di streaming

Structured Streaming supporta la maggior parte delle trasformazioni disponibili in Azure Databricks e Spark SQL. È anche possibile caricare i modelli MLflow come funzioni definite dall'utente ed eseguire stime di streaming come trasformazione.

L'esempio di codice seguente completa una semplice trasformazione per arricchire i dati JSON inseriti con informazioni aggiuntive usando le funzioni SPARK SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

L'oggetto risultante transformed_df contiene istruzioni di query per caricare e trasformare ogni record man mano che arriva nell'origine dati.

Nota

Structured Streaming considera le origini dati come set di dati non associati o infiniti. Di conseguenza, alcune trasformazioni non sono supportate nei carichi di lavoro Structured Streaming perché richiedono l'ordinamento di un numero infinito di elementi.

La maggior parte delle aggregazioni e molti join richiedono la gestione delle informazioni sullo stato con filigrane, finestre e modalità di output. Vedere Applicare filigrane per controllare le soglie di elaborazione dei dati.

Scrivere in un sink di dati

Un sink di dati è la destinazione di un'operazione di scrittura in streaming. I sink comuni usati nei carichi di lavoro di streaming di Azure Databricks includono quanto segue:

  • Delta Lake
  • Bus e code dei messaggi
  • Database chiave-valore

Come per le origini dati, la maggior parte dei sink di dati offre una serie di opzioni per controllare il modo in cui i dati vengono scritti nel sistema di destinazione. Durante la configurazione del writer, le opzioni principali che potrebbe essere necessario impostare rientrano nelle categorie seguenti:

  • Modalità di output (accodamento per impostazione predefinita).
  • Percorso del checkpoint (obbligatorio per ogni writer).
  • Intervalli di trigger; vedere Configurare gli intervalli di trigger di Structured Streaming.
  • Opzioni che specificano il sink o il formato di dati, ad esempio il tipo di file, i delimitatori e lo schema.
  • Opzioni che configurano l'accesso ai sistemi di destinazione (ad esempio, impostazioni e credenziali della porta).

Eseguire una scrittura batch incrementale in Delta Lake

L'esempio seguente scrive in Delta Lake usando un percorso e un checkpoint di file specificati.

Importante

Assicurarsi sempre di specificare un percorso di checkpoint univoco per ogni writer di streaming configurato. Il checkpoint fornisce l'identità univoca per il flusso, monitorando tutti i record elaborati e le informazioni sullo stato associati alla query di streaming.

L'impostazione availableNow per il trigger indica a Structured Streaming di elaborare tutti i record non elaborati in precedenza dal set di dati di origine e quindi arrestarli, in modo da poter eseguire in modo sicuro il codice seguente senza doversi preoccupare di lasciare in esecuzione un flusso:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In questo esempio non arrivano nuovi record nell'origine dati, quindi l'esecuzione ripetuta di questo codice non inserisce nuovi record.

Avviso

L'esecuzione di Structured Streaming può impedire la chiusura automatica delle risorse di calcolo. Per evitare costi imprevisti, assicurarsi di terminare le query di streaming.

Preparazione del codice Structured Streaming per la produzione

Databricks consiglia di usare tabelle delta live per la maggior parte dei carichi di lavoro di streaming strutturato. I consigli seguenti forniscono un punto di partenza per la preparazione dei carichi di lavoro Structured Streaming per la produzione:

  • Rimuovere il codice non necessario dai notebook che restituiscono risultati, ad esempio display e count.
  • Non eseguire carichi di lavoro Structured Streaming in cluster interattivi; pianificare sempre i flussi come processi.
  • Per consentire il ripristino automatico dei processi di streaming, configurare i processi con tentativi infiniti.
  • Non usare il ridimensionamento automatico per i carichi di lavoro con Structured Streaming.

Per altre raccomandazioni, vedere Considerazioni sulla produzione per Structured Streaming.

Leggere i dati da Delta Lake, trasformare e scrivere in Delta Lake

Delta Lake offre un ampio supporto per l'uso di Structured Streaming sia come origine che come sink. Vedere Letture e scritture in streaming di tabelle Delta.

Nell'esempio seguente viene illustrata la sintassi di esempio per caricare in modo incrementale tutti i nuovi record da una tabella Delta, unirli con uno snapshot di un'altra tabella Delta e scriverli in una tabella Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

È necessario disporre delle autorizzazioni appropriate configurate per leggere le tabelle di origine e scrivere nelle tabelle di destinazione e nel percorso del checkpoint specificato. Compilare tutti i parametri indicati con parentesi angolari (<>) usando i valori pertinenti per le origini dati e i sink.

Nota

Delta Live Tables fornisce una sintassi completamente dichiarativa per la creazione di pipeline Delta Lake e gestisce automaticamente proprietà come trigger e checkpoint. Vedere Che cos'è le tabelle live Delta?.

Leggere i dati da Kafka, trasformare e scrivere in Kafka

Apache Kafka e altri bus di messaggistica offrono una certa latenza più bassa disponibile per set di dati di grandi dimensioni. È possibile usare Azure Databricks per applicare trasformazioni ai dati inseriti da Kafka e quindi scrivere nuovamente i dati in Kafka.

Nota

La scrittura di dati nell'archiviazione oggetti cloud comporta un sovraccarico di latenza aggiuntivo. Se si desidera archiviare dati da un bus di messaggistica in Delta Lake, ma è necessaria la latenza più bassa possibile per i carichi di lavoro di streaming, Databricks consiglia di configurare processi di streaming separati per inserire dati nel lakehouse e applicare trasformazioni quasi in tempo reale per i sink del bus di messaggistica downstream.

L'esempio di codice seguente illustra un modello semplice per arricchire i dati di Kafka unendo i dati in una tabella Delta e quindi scrivendo di nuovo in Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

È necessario disporre delle autorizzazioni appropriate configurate per l'accesso al servizio Kafka. Compilare tutti i parametri indicati con parentesi angolari (<>) usando i valori pertinenti per le origini dati e i sink. Vedere Elaborazione di flussi con Apache Kafka e Azure Databricks.