Condividi tramite


Ripristino da errori di query structured streaming con processi

Structured Streaming offre la tolleranza di errore e la coerenza dei dati per le query di streaming; usando i processi di Azure Databricks, è possibile configurare facilmente le query di Structured Streaming per il riavvio automatico in caso di errore. Abilitando il checkpoint per una query di streaming, è possibile riavviare la query dopo un errore. La query riavviata continua in cui l'errore è stato interrotto.

Abilitare il checkpoint per le query structured streaming

Databricks consiglia di specificare sempre l'opzione checkpointLocation un percorso di archiviazione cloud prima di avviare la query. Ad esempio:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Questa posizione del checkpoint mantiene tutte le informazioni essenziali che identificano una query. Ogni query deve avere un percorso di checkpoint diverso. Più query non devono mai avere la stessa posizione. Per altre informazioni, vedere La Guida alla programmazione structured streaming.

Nota

Sebbene sia necessario per la maggior parte checkpointLocation dei tipi di sink di output, alcuni sink, ad esempio il sink di memoria, possono generare automaticamente una posizione di checkpoint temporanea quando non si specifica checkpointLocation. Questi percorsi di checkpoint temporanei non garantiscono alcuna tolleranza di errore o garanzie di coerenza dei dati e potrebbero non essere puliti correttamente. Evitare potenziali insidie specificando sempre un oggetto checkpointLocation.

Configurare processi structured streaming per riavviare le query di streaming in caso di errore

È possibile creare un processo di Azure Databricks con il notebook o jar con le query di streaming e configurarlo per:

  • Usare sempre un nuovo cluster.
  • Ripetere sempre l'errore.

Il riavvio automatico in caso di errore del processo è particolarmente importante quando si configurano carichi di lavoro di streaming con evoluzione dello schema. L'evoluzione dello schema funziona in Azure Databricks generando un errore previsto quando viene rilevata una modifica dello schema e quindi elaborando correttamente i dati usando il nuovo schema al riavvio del processo. Databricks consiglia di configurare sempre attività di streaming contenenti query con evoluzione dello schema per il riavvio automatico nei processi di Databricks.

I processi hanno una stretta integrazione con le API Structured Streaming e possono monitorare tutte le query di streaming attive in un'esecuzione. Questa configurazione garantisce che, se una parte della query non riesce, i processi terminano automaticamente l'esecuzione (insieme a tutte le altre query) e avviano una nuova esecuzione in un nuovo cluster. Verrà eseguito di nuovo il notebook o il codice JAR e tutte le query vengono riavviate di nuovo. Questo è il modo più sicuro per tornare a uno stato buono.

Nota

  • Un errore in una delle query di streaming attive causa l'esito negativo dell'esecuzione attiva e termina tutte le altre query di streaming.
  • Non è necessario usare streamingQuery.awaitTermination() o spark.streams.awaitAnyTermination() alla fine del notebook. I processi impediscono automaticamente il completamento di un'esecuzione quando una query di streaming è attiva.
  • Databricks consiglia di usare i processi invece di %run e dbutils.notebook.run() quando si orchestrano i notebook di Structured Streaming. Vedere Eseguire un notebook di Databricks da un altro notebook.

Di seguito è riportato un esempio di configurazione del processo consigliata.

  • Cluster: impostare questa opzione per usare sempre un nuovo cluster e usare la versione più recente di Spark (o almeno la versione 2.1). Le query avviate in Spark 2.1 e versioni successive sono recuperabili dopo gli aggiornamenti delle versioni di Query e Spark.
  • Notifiche: impostare questa opzione se si vuole ricevere una notifica tramite posta elettronica in caso di errori.
  • Pianificazione: non impostare una pianificazione.
  • Timeout: non impostare un timeout. Le query di streaming vengono eseguite per un periodo di tempo illimitato.
  • Numero massimo di esecuzioni simultanee: impostare su 1. Deve essere presente una sola istanza di ogni query contemporaneamente attiva.
  • Tentativi: impostare su Illimitato.

Per informazioni su queste configurazioni, vedere Creare ed eseguire processi di Azure Databricks .

Eseguire il ripristino dopo le modifiche in una query structured streaming

Esistono limitazioni sulle modifiche apportate a una query di streaming tra i riavvii dalla stessa posizione del checkpoint. Ecco alcune modifiche che non sono consentite o l'effetto della modifica non è ben definito. Per tutti:

  • Il termine consentito significa che è possibile eseguire la modifica specificata, ma se la semantica del suo effetto è ben definita dipende dalla query e dalla modifica.
  • Il termine non consentito indica che non è consigliabile eseguire la modifica specificata perché è probabile che la query riavviata abbia esito negativo con errori imprevedibili.
  • sdf rappresenta un dataframe di streaming/set di dati generato con sparkSession.readStream.

Tipi di modifiche nelle query structured streaming

  • Modifiche al numero o al tipo (ovvero origine diversa) delle origini di input: non è consentito.
  • Modifiche nei parametri delle origini di input: indica se questa opzione è consentita e se la semantica della modifica è ben definita dipende dall'origine e dalla query. Ecco alcuni esempi.
    • È consentito aggiungere, eliminare e modificare i limiti di frequenza:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Le modifiche apportate agli articoli e ai file sottoscritti non sono in genere consentite perché i risultati sono imprevedibili: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Modifiche nell'intervallo di trigger: è possibile modificare i trigger tra batch incrementali e intervalli di tempo. Vedere Modifica degli intervalli di trigger tra le esecuzioni.
  • Modifiche nel tipo di sink di output: sono consentite modifiche tra alcune combinazioni specifiche di sink. Questo deve essere verificato caso per caso. Ecco alcuni esempi.
    • Il sink di file nel sink Kafka è consentito. Kafka vedrà solo i nuovi dati.
    • Il sink Kafka nel sink di file non è consentito.
    • Il sink Kafka è stato modificato in foreach o viceversa è consentito.
  • Modifiche nei parametri del sink di output: indica se questa opzione è consentita e se la semantica della modifica è ben definita dipende dal sink e dalla query. Ecco alcuni esempi.
    • Le modifiche apportate alla directory di output di un sink di file non sono consentite: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Le modifiche apportate all'argomento di output sono consentite: sdf.writeStream.format("kafka").option("topic", "topic1") a sdf.writeStream.format("kafka").option("topic", "topic2")
    • Le modifiche apportate al sink foreach definito dall'utente( ovvero il ForeachWriter codice) sono consentite, ma la semantica della modifica dipende dal codice.
  • Modifiche apportate alle operazioni di proiezione/filtro/mapping: alcuni casi sono consentiti. Ad esempio:
    • L'aggiunta o l'eliminazione dei filtri è consentita: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Le modifiche apportate alle proiezioni con lo stesso schema di output sono consentite: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Le modifiche apportate alle proiezioni con schema di output diverso sono consentite in modo condizionale: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream è consentito solo se il sink di output consente la modifica dello schema da "a" a "b".
  • Modifiche nelle operazioni con stato: alcune operazioni nelle query di streaming devono mantenere i dati sullo stato per aggiornare continuamente il risultato. Structured Streaming esegue automaticamente il checkpoint dei dati di stato nell'archiviazione a tolleranza di errore (ad esempio, DBFS, archiviazione BLOB di Azure) e lo ripristina dopo il riavvio. Tuttavia, si presuppone che lo schema dei dati di stato rimanga invariato tra i riavvii. Ciò significa che tutte le modifiche (ovvero aggiunte, eliminazioni o modifiche dello schema) alle operazioni con stato di una query di streaming non sono consentite tra i riavvii. Di seguito è riportato l'elenco delle operazioni con stato il cui schema non deve essere modificato tra i riavvii per garantire il ripristino dello stato:
    • Aggregazione di streaming: ad esempio . sdf.groupBy("a").agg(...) Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni.
    • Deduplicazione di streaming: ad esempio. sdf.dropDuplicates("a") Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni.
    • Join del flusso di flusso: ad esempio ,ad esempio, sdf1.join(sdf2, ...) entrambi gli input vengono generati con sparkSession.readStream. Le modifiche apportate allo schema o alle colonne di equi join non sono consentite. Le modifiche apportate al tipo di join (esterno o interno) non sono consentite. Altre modifiche nella condizione di join non sono definite correttamente.
    • Operazione arbitraria con stato: ad esempio o sdf.groupByKey(...).mapGroupsWithState(...) sdf.groupByKey(...).flatMapGroupsWithState(...). Qualsiasi modifica allo schema dello stato definito dall'utente e il tipo di timeout non è consentito. Qualsiasi modifica all'interno della funzione di mapping dello stato definita dall'utente è consentita, ma l'effetto semantico della modifica dipende dalla logica definita dall'utente. Se si vuole effettivamente supportare le modifiche dello schema di stato, è possibile codificare/decodificare in modo esplicito le strutture di dati di stato complesse in byte usando uno schema di codifica/decodifica che supporta la migrazione dello schema. Ad esempio, se si salva lo stato come byte con codifica Avro, è possibile modificare lo schema avro-state-schema tra i riavvii della query perché ripristina lo stato binario.