Checkpoint di Structured Streaming
I checkpoint e i log write-ahead interagiscono per offrire garanzie di elaborazione per i carichi di lavoro Structured Streaming. Il checkpoint tiene traccia delle informazioni che identificano la query, incluse le informazioni sullo stato e i record elaborati. Quando si eliminano i file in una directory del checkpoint o si passa a un nuovo percorso del checkpoint, viene avviata la successiva esecuzione della query.
Ogni query deve avere un percorso di checkpoint diverso. Più query non devono mai condividere la stessa posizione.
È necessario specificare l'checkpointLocation
opzione prima di eseguire una query di streaming, come nell'esempio seguente:
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Nota
Alcuni sink, ad esempio l'output per display()
nei notebook e il sink memory
, generano automaticamente un percorso di checkpoint temporaneo se si omette questa opzione. Questi percorsi di checkpoint temporanei non garantiscono alcuna tolleranza di errore o garanzie di coerenza dei dati e potrebbero non essere puliti correttamente. Databricks consiglia di specificare sempre una posizione del checkpoint per questi sink.
Esistono limitazioni sulle modifiche apportate a una query di streaming tra i riavvii dalla stessa posizione del checkpoint. Ecco alcune modifiche che non sono consentite o l'effetto della modifica non è ben definito. Per tutti:
- Il termine consentito significa che è possibile eseguire la modifica specificata, ma se la semantica del suo effetto è ben definita o no dipende dalla query e dalla modifica.
- Il termine non consentito indica che non è consigliabile eseguire la modifica specificata perché è probabile che la query riavviata abbia esito negativo con errori imprevedibili.
-
sdf
rappresenta un dataframe di streaming/set di dati generato consparkSession.readStream
.
- Modifiche al numero o al tipo (ovvero origine diversa) delle origini di input: non è consentito.
-
Modifiche nei parametri delle origini di input: se questo è consentito e se la semantica della modifica è ben definita dipende dall'origine e dalla query. Ecco alcuni esempi.
È consentito aggiungere, eliminare e modificare i limiti di frequenza:
spark.readStream.format("kafka").option("subscribe", "article")
to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Le modifiche apportate agli articoli e ai file sottoscritti non sono in genere consentite perché i risultati sono imprevedibili:
spark.readStream.format("kafka").option("subscribe", "article")
aspark.readStream.format("kafka").option("subscribe", "newarticle")
- Modifiche nell'intervallo di trigger: è possibile modificare i trigger tra batch incrementali e intervalli di tempo. Vedere Modifica degli intervalli di trigger tra le esecuzioni.
-
Modifiche nel tipo di sink di output: sono consentite modifiche tra alcune combinazioni specifiche di sink. Questo deve essere verificato caso per caso. Ecco alcuni esempi.
- Il sink di file nel sink Kafka è consentito. Kafka vedrà solo i nuovi dati.
- Il sink Kafka nel sink di file non è consentito.
- Sink Kafka modificato in foreach o viceversa, è consentito.
-
Modifiche nei parametri del sink di output: Indica se è consentito e se la semantica della modifica è ben definita, dipende dal sink e dalla query. Ecco alcuni esempi.
- Le modifiche apportate alla directory di output di un sink di file non sono consentite:
sdf.writeStream.format("parquet").option("path", "/somePath")
asdf.writeStream.format("parquet").option("path", "/anotherPath")
- Le modifiche apportate all'argomento di output sono consentite:
sdf.writeStream.format("kafka").option("topic", "topic1")
asdf.writeStream.format("kafka").option("topic", "topic2")
- Le modifiche apportate al sink foreach definito dall'utente(ovvero il
ForeachWriter
codice) sono consentite, ma la semantica della modifica dipende dal codice.
- Le modifiche apportate alla directory di output di un sink di file non sono consentite:
-
Modifiche apportate alle operazioni di proiezione/filtro/mapping: alcuni casi sono consentiti. Ad esempio:
- L'aggiunta o l'eliminazione dei filtri è consentita:
sdf.selectExpr("a")
asdf.where(...).selectExpr("a").filter(...)
. - Le modifiche apportate alle proiezioni con lo stesso schema di output sono consentite:
sdf.selectExpr("stringColumn AS json").writeStream
asdf.select(to_json(...).as("json")).writeStream
. - Le modifiche apportate alle proiezioni con schema di output diverso sono consentite in modo condizionale:
sdf.selectExpr("a").writeStream
asdf.selectExpr("b").writeStream
è consentito solo se il sink di output consente la modifica dello schema da"a"
a"b"
.
- L'aggiunta o l'eliminazione dei filtri è consentita:
-
Modifiche nelle operazioni con stato: alcune operazioni nelle query di streaming devono mantenere i dati sullo stato per aggiornare continuamente il risultato. Structured Streaming esegue automaticamente il checkpoint dei dati di stato nell'archiviazione a tolleranza di errore (ad esempio, DBFS, archiviazione BLOB di Azure) e lo ripristina dopo il riavvio. Tuttavia, si presuppone che lo schema dei dati di stato rimanga invariato tra i riavvii. Ciò significa che eventuali modifiche ( ovvero aggiunte, eliminazioni o modifiche dello schema) alle operazioni con stato di una query di streaming non sono consentite tra riavvii. Di seguito è riportato l'elenco delle operazioni con stato il cui schema non deve essere modificato tra i riavvii per garantire il ripristino dello stato:
-
Aggregazione di streaming: ad esempio,
sdf.groupBy("a").agg(...)
. Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. -
Deduplicazione di streaming: ad esempio,
sdf.dropDuplicates("a")
. Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. -
join tra flussi: ad esempio,
sdf1.join(sdf2, ...)
(ovvero, entrambi gli input sono generati consparkSession.readStream
). Le modifiche apportate allo schema o alle colonne di equi-join non sono consentite. Le modifiche apportate al tipo di join (esterno o interno) non sono consentite. Altre modifiche nella condizione di join non sono definite correttamente. -
Operazione arbitraria con stato: ad esempio,
sdf.groupByKey(...).mapGroupsWithState(...)
osdf.groupByKey(...).flatMapGroupsWithState(...)
. Qualsiasi modifica allo schema dello stato definito dall'utente e il tipo di timeout non è consentito. Qualsiasi modifica all'interno della funzione di mapping dello stato definita dall'utente è consentita, ma l'effetto semantico della modifica dipende dalla logica definita dall'utente. Se si vuole effettivamente supportare le modifiche dello schema di stato, è possibile codificare/decodificare in modo esplicito le strutture di dati di stato complesse in byte usando uno schema di codifica/decodifica che supporta la migrazione dello schema. Ad esempio, se si salva lo stato come byte con codifica Avro, è possibile modificare lo schema avro-state-schema tra i riavvii della query perché ripristina lo stato binario.
-
Aggregazione di streaming: ad esempio,