Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Lo streaming strutturato è un motore di elaborazione flussi scalabile e con tolleranza di errore basato su Spark. Spark si occupa dell'esecuzione dell'operazione di streaming in modo incrementale e continuo man mano che i dati continuano ad arrivare.
Structured Streaming è diventato disponibile in Spark 2.2. Da allora, è stato l'approccio consigliato per lo streaming di dati. Il principio fondamentale alla base di un flusso strutturato consiste nel considerare un flusso di dati live come una tabella in cui i nuovi dati vengono sempre accodati in modo continuo, ad esempio una nuova riga in una tabella. Esistono alcune origini di file di streaming predefinite definite, ad esempio CSV, JSON, ORC, Parquet e il supporto predefinito per servizi di messaggistica come Kafka e Hub eventi.
Questo articolo fornisce informazioni dettagliate su come ottimizzare l'elaborazione e l'inserimento di eventi tramite lo streaming strutturato Spark in ambienti di produzione con velocità effettiva elevata. Gli approcci suggeriti includono:
- Ottimizzazione della velocità effettiva dello streaming di dati
- Ottimizzazione delle operazioni di scrittura nella tabella Delta e
- Raggruppamento di eventi
Definizioni attività Spark e notebook Spark
I notebook Spark sono uno strumento eccellente per convalidare idee e fare esperimenti per ottenere informazioni dettagliate dai dati o dal codice. I notebook sono ampiamente usati anche per la preparazione e la visualizzazione dei dati, per l'apprendimento automatico e pre altri scenari Big Data. Le definizioni di job Spark sono attività non interattive orientate al codice che vengono eseguite in un cluster Spark per lunghi periodi. Le definizioni dei lavori Spark offrono robustezza e disponibilità.
I notebook Spark sono un'ottima origine per testare la logica del codice e soddisfare tutti i requisiti aziendali. Tuttavia, per mantenerlo in esecuzione in uno scenario di produzione, le definizioni di job Spark con politica di tentativi abilitata sono la soluzione migliore.
Criteri di ripetizione per definizioni di processi Spark
In Microsoft Fabric, l'utente può impostare una politica di ripetizione per i processi Spark Job Definition. Anche se lo script nel task potrebbe essere infinito, l'infrastruttura che esegue lo script potrebbe incontrare un problema che richieda l'arresto del task. In alternativa, il processo potrebbe essere eliminato a causa di esigenze di applicazione di patch all'infrastruttura sottostante. La politica di ripetizione consente all'utente di impostare regole per il riavvio automatico dell'attività se si arresta a causa di eventuali problemi sottostanti. I parametri specificano la frequenza con cui il processo deve essere riavviato, fino a tentativi infiniti e impostando il tempo tra i tentativi. In questo modo, gli utenti possono assicurarsi che le definizioni di processo Spark continuino a essere eseguite all'infinito fino a quando l'utente decide di arrestarle.
Origini di streaming
La configurazione dello streaming con Hub eventi richiede una configurazione di base, che include il nome dello spazio dei nomi di Hub eventi, il nome dell'hub, il nome della chiave di accesso condivisa e il gruppo di consumer. Un gruppo di consumatori è una visione complessiva di un hub eventi. Consente a più applicazioni che utilizzano di avere una visualizzazione separata del flusso di eventi e di leggere il flusso in modo indipendente al proprio ritmo e con i relativi offset.
Le partizioni sono una parte essenziale della possibilità di gestire un elevato volume di dati. Un singolo processore ha una capacità limitata per la gestione degli eventi al secondo, mentre più processori possono eseguire un processo migliore quando vengono eseguiti in parallelo. Le partizioni consentono di elaborare elevati volumi di eventi in parallelo.
Se vengono usate troppe partizioni con una velocità di inserimento ridotta, i lettori di partizioni gestiscono una piccola parte di questi dati, causando un'elaborazione non ottimale. Il numero ideale di partizioni dipende direttamente dalla velocità di elaborazione desiderata. Se si vuole ridimensionare l'elaborazione degli eventi, è possibile considerare l’aggiunta di altre partizioni. Non esiste un limite di velocità effettiva specifico per una partizione. Tuttavia, la velocità effettiva aggregata nello spazio dei nomi è limitata dal numero di unità di throughput. Quando si aumenta il numero di unità di throughput nel proprio spazio dei nomi, è opportuno aggiungere partizioni per consentire ai lettori simultanei di raggiungere la loro massima capacità di throughput.
È consigliabile esaminare e testare un numero ottimale di partizioni per lo scenario della velocità effettiva. Tuttavia, è comune vedere scenari con velocità effettiva elevata usando 32 o più partizioni.
Il connettore Hub eventi di Azure per Apache Spark (azure-event-hubs-spark) è consigliabile per connettere l'applicazione Spark a Hub eventi di Azure.
Lakehouse come destinazione di streaming
Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID (atomicità, coerenza, isolamento e durabilità) su soluzioni Data Lake Storage. Delta Lake supporta anche la gestione scalabile dei metadati, l'evoluzione dello schema, il tempo di spostamento fisico (controllo delle versioni dei dati), formato aperto e altre funzionalità.
Nell’ingegneria dei dati Fabric, Delta Lake viene usato per:
- Eseguire facilmente l'inserimento/aggiornamento combinato e la cancellazione di dati utilizzando Spark SQL.
- Compattare i dati per ridurre al minimo il tempo impiegato per l'esecuzione di query sui dati.
- Visualizzare lo stato delle tabelle prima e dopo l'esecuzione delle operazioni.
- Recuperare una cronologia delle operazioni eseguite sulle tabelle.
Delta viene aggiunto come uno dei formati di sink di output possibili usati in writeStream. Per altre informazioni sui sink di output esistenti, vedere la Guida alla programmazione di Spark Structured Streaming.
L'esempio seguente illustra come è possibile trasmettere i dati in Delta Lake.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
Informazioni sul frammento di codice nell'esempio:
- format() è l'istruzione che definisce il formato di output dei dati.
- outputMode() definisce in che modo vengono scritte le nuove righe nel flusso (ossia accodare, sovrascrivere).
- toTable() rende persistenti i dati trasmessi in una tabella Delta creata usando il valore passato come parametro.
Ottimizzazione delle scritture Delta
Il partizionamento dei dati è una parte fondamentale nella creazione di una solida soluzione di streaming: il partizionamento migliora la modalità di organizzazione dei dati e migliora anche la velocità effettiva. I file vengono frammentati facilmente dopo le operazioni Delta, causando un numero eccessivo di file di piccole dimensioni. Anche i file troppo grandi rappresentano un problema, a causa del lungo periodo di tempo per scriverli sul disco. La sfida con il partizionamento dei dati è trovare il giusto equilibrio che produce file di dimensioni ottimali. Spark supporta il partizionamento in memoria e su disco. I dati partizionati correttamente possono offrire prestazioni ottimali con la persistenza dei dati in Delta Lake e l'esecuzione di query sui dati da Delta Lake.
- Quando si partizionano i dati su disco, è possibile scegliere come partizionare i dati in base alle colonne usandopartitionBy(). partitionBy() è una funzione usata per partizionare un modello semantico di grandi dimensioni in file più piccoli in base a una o più colonne fornite durante la scrittura su disco. Il partizionamento è un modo per migliorare le prestazioni delle query quando si usa un modello semantico di grandi dimensioni. Evitare di scegliere una colonna che genera partizioni troppo piccole o troppo grandi. Definire una partizione in base a un set di colonne con una buona cardinalità e suddividere i dati in file di dimensioni ottimali.
- Il partizionamento dei dati in memoria può essere eseguito usando trasformazioni repartition() o coalesce(), distribuendo i dati in più nodi di lavoro e creando più attività che possono leggere ed elaborare i dati in parallelo usando i concetti fondamentali di Resilient Distributed Dataset (RDD). Consente di dividere il modello semantico in partizioni logiche, che possono essere calcolate in nodi diversi del cluster.
- repartition() viene usato per aumentare o ridurre il numero di partizioni in memoria. La ripartizione rimescola tutti i dati sulla rete e li bilancia in tutte le partizioni.
- coalesce() viene usato solo per ridurre il numero di partizioni in modo efficiente. È una versione ottimizzata di repartition() in cui lo spostamento dei dati in tutte le partizioni è inferiore usando coalesce().
La combinazione di entrambi gli approcci di partizionamento è una buona soluzione nello scenario con velocità effettiva elevata. repartition() crea un numero specifico di partizioni in memoria, mentre partitionBy() scrive i file su disco per ogni partizione di memoria e colonna di partizionamento. L'esempio seguente illustra l'uso di entrambe le strategie di partizionamento nello stesso processo Spark: i dati vengono prima suddivisi in 48 partizioni in memoria (supponendo che siano presenti 48 core CPU) e quindi partizionati su disco in base a due colonne esistenti nel payload.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Scrittura ottimizzata
Un'altra opzione per ottimizzare le scritture in Delta Lake consiste nell'uso della scrittura ottimizzata. La scrittura ottimizzata è una funzionalità facoltativa che migliora la modalità di scrittura dei dati nella tabella Delta. Spark unisce o suddivide le partizioni prima di scrivere i dati, ottimizzando la velocità effettiva dei dati scritti sul disco. Tuttavia, comporta un rimescolamento completo, quindi per alcuni carichi di lavoro può causare una degradazione delle prestazioni. È possibile effettuare il refactoring di processi che usano coalesce() e/o repartition() per partizionare i dati sul disco per iniziare a usare la scrittura ottimizzata.
Il codice seguente è un esempio di utilizzo della scrittura ottimizzata. Si noti che partitionBy() viene ancora usato.
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Invio in batch di eventi
Per ridurre al minimo il numero di operazioni e ottimizzare il tempo necessario per l'inserimento di dati in Delta Lake, l'elaborazione batch degli eventi è un'alternativa pratica.
I trigger definiscono la frequenza con cui deve essere eseguita una query di streaming (attivata) e generano nuovi dati. Impostandoli definisce un intervallo di tempo di elaborazione periodico per i microbatches, accumulando dati ed eventi in batch in poche operazioni persistenti, invece di scrivere su disco per tutto il tempo.
L'esempio seguente mostra una query di streaming in cui gli eventi vengono elaborati periodicamente in intervalli di un minuto.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Il vantaggio della combinazione di invio in batch di eventi nelle operazioni di scrittura di tabelle Delta è che crea file Delta di dimensioni maggiori contenenti più dati, evitando file di piccole dimensioni. È consigliabile analizzare la quantità di dati inseriti e trovare il tempo di elaborazione migliore per ottimizzare le dimensioni dei file Parquet creati dalla libreria Delta.
Monitoraggio
Spark 3.1 e versioni successive hanno un'interfaccia utente di Structured Streaming predefinita contenente le metriche di streaming seguenti:
- Velocità di input
- Velocità di elaborazione
- Righe di input
- Durata batch
- Durata operazione
Contenuto correlato
- Inserire i dati in streaming nel lakehouse e accedere con l'endpoint di analisi SQL.