İş akışlarıyla Yapılandırılmış Akış sorgu hatalarından kurtarma

Yapılandırılmış Akış, akış sorguları için hataya dayanıklılık ve veri tutarlılığı sağlar; Azure Databricks iş akışlarını kullanarak Yapılandırılmış Akış sorgularınızı hata durumunda otomatik olarak yeniden başlatacak şekilde kolayca yapılandırabilirsiniz. Akış sorgusu için denetim noktası oluşturmayı etkinleştirerek, bir hatadan sonra sorguyu yeniden başlatabilirsiniz. Yeniden başlatılan sorgu, başarısız olanın kaldığı yerden devam eder.

Yapılandırılmış Akış sorguları için denetim noktası oluşturmayı etkinleştirme

Databricks, sorguyu checkpointLocation başlatmadan önce her zaman bir bulut depolama yolu seçeneğini belirtmenizi önerir. Örnek:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Bu denetim noktası konumu, sorguyu tanımlayan tüm temel bilgileri korur. Her sorgu farklı bir denetim noktası konumuna sahip olmalıdır. Birden çok sorgu hiçbir zaman aynı konuma sahip olmamalıdır. Daha fazla bilgi için bkz . Yapılandırılmış Akış Programlama Kılavuzu.

Dekont

Çoğu çıkış havuzu türü için gerekli olsa checkpointLocation da, bellek havuzu gibi bazı havuzlar sağlamadığınızda checkpointLocationotomatik olarak geçici bir denetim noktası konumu oluşturabilir. Bu geçici denetim noktası konumları hataya dayanıklılık veya veri tutarlılığı garantileri sağlamaz ve düzgün bir şekilde temizlenmeyebilir. Her zaman bir checkpointLocationbelirterek olası tuzaklardan kaçının.

Hata durumunda akış sorgularını yeniden başlatmak için Yapılandırılmış Akış işlerini yapılandırma

Akış sorgularınızın yer aldığı not defteri veya JAR ile bir Azure Databricks işi oluşturabilir ve bunu şu şekilde yapılandırabilirsiniz:

  • Her zaman yeni bir küme kullanın.
  • Hata durumunda her zaman yeniden deneyin.

Şema evrimi ile akış iş yüklerini yapılandırırken iş hatasında otomatik olarak yeniden başlatma özellikle önemlidir. Şema evrimi, bir şema değişikliği algılandığında beklenen bir hatayı oluşturarak ve ardından iş yeniden başlatıldığında yeni şemayı kullanarak verileri düzgün bir şekilde işleyerek Azure Databricks'te çalışır. Databricks, Databricks iş akışlarında otomatik olarak yeniden başlatmak için şema evrimi olan sorgular içeren akış görevlerinin her zaman yapılandırılmasını önerir.

İşler Yapılandırılmış Akış API'leriyle sıkı tümleştirmeye sahiptir ve bir çalıştırmada etkin olan tüm akış sorgularını izleyebilir. Bu yapılandırma, sorgunun herhangi bir bölümü başarısız olursa işlerin çalıştırmayı otomatik olarak sonlandırmasını (diğer tüm sorgularla birlikte) ve yeni bir kümede yeni bir çalıştırma başlatmasını sağlar. Bu işlem not defteri veya JAR kodunu yeniden çalıştırır ve tüm sorguları yeniden başlatır. Bu, iyi bir duruma geri dönmenin en güvenli yoludur.

Dekont

  • Etkin akış sorgularından herhangi birinde hata olması, etkin çalıştırmanın başarısız olmasına ve diğer tüm akış sorgularını sonlandırmasına neden olur.
  • Veya not defterinizin sonunda kullanmanız streamingQuery.awaitTermination()spark.streams.awaitAnyTermination() gerekmez. Akış sorgusu etkin olduğunda işler otomatik olarak bir çalıştırmanın tamamlanmasını engeller.
  • Databricks, Yapılandırılmış Akış not defterlerini düzenlerken ve dbutils.notebook.run() yerine %run işlerin kullanılmasını önerir. Bkz . Başka bir not defterinden Databricks not defteri çalıştırma.

Aşağıda önerilen bir iş yapılandırması örneği verilmiştir.

  • Küme: Bunu her zaman yeni bir küme kullanacak ve en son Spark sürümünü (veya en az 2.1 sürümünü) kullanacak şekilde ayarlayın. Spark 2.1 ve üzeri sürümlerde başlatılan sorgular, sorgu ve Spark sürümü yükseltmelerinden sonra kurtarılabilir.
  • Bildirimler: Hatalarla ilgili e-posta bildirimi istiyorsanız bunu ayarlayın.
  • Zamanlama: Zamanlama ayarlamayın.
  • Zaman aşımı: Zaman aşımı ayarlamayın. Akış sorguları süresiz olarak uzun süre çalışır.
  • En fazla eşzamanlı çalıştırma: 1 olarak ayarlayın. Her sorgunun aynı anda etkin olan tek bir örneği olmalıdır.
  • Yeniden denemeler: Sınırsız olarak ayarlayın.

Bu yapılandırmaları anlamak için bkz . Azure Databricks İşleri oluşturma ve çalıştırma.

Yapılandırılmış Akış sorgusundaki değişikliklerden sonra kurtarma

Aynı denetim noktası konumundan yeniden başlatmalar arasında akış sorgusundaki değişikliklere izin verilen değişikliklerle ilgili sınırlamalar vardır. İzin verilmeyen veya değişikliğin etkisi iyi tanımlanmamış birkaç değişiklik aşağıda açıklanmaktadır. Tümü için:

  • İzin verilen terim, belirtilen değişikliği yapabileceğiniz anlamına gelir, ancak etkisinin semantiğinin iyi tanımlanmış olup olmadığı sorguya ve değişikliğe bağlıdır.
  • İzin verilmeyen terimi, yeniden başlatılan sorgu tahmin edilemeyen hatalarla başarısız olduğundan belirtilen değişikliği yapmamalısınız anlamına gelir.
  • sdf ile sparkSession.readStreamoluşturulan bir akış DataFrame/Veri Kümesi temsil eder.

Yapılandırılmış Akış sorgularındaki değişiklik türleri

  • Giriş kaynaklarının sayısı veya türündeki (farklı kaynak) değişiklikler: Buna izin verilmez.
  • Giriş kaynaklarının parametrelerindeki değişiklikler: Buna izin verilip verilmeyeceği ve değişikliğin semantiğinin iyi tanımlanmış olup olmadığı kaynağa ve sorguya bağlıdır. İşte birkaç örnek.
    • Hız sınırlarının eklenmesine, silinmesine ve değiştirilmesine izin verilir:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      kullanıcısı

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Sonuçlar tahmin edilemediğinden, abone olunan makalelerde ve dosyalarda yapılan değişikliklere genellikle izin verilmez: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Tetikleyici aralığındaki değişiklikler: Artımlı toplu işlemler ve zaman aralıkları arasındaki tetikleyicileri değiştirebilirsiniz. Bkz . Çalıştırmalar arasındaki tetikleyici aralıklarını değiştirme.
  • Çıkış havuzu türündeki değişiklikler: Havuzların belirli birkaç bileşimi arasındaki değişikliklere izin verilir. Bunun vaka bazında doğrulanması gerekir. İşte birkaç örnek.
    • Kafka havuzuna dosya havuzuna izin verilir. Kafka yalnızca yeni verileri görür.
    • Dosya havuzuna Kafka havuzuna izin verilmiyor.
    • Kafka havuzu foreach olarak değiştirildi veya buna izin verilir.
  • Çıkış havuzu parametrelerindeki değişiklikler: Buna izin verilip verilmeyeceği ve değişikliğin semantiğinin iyi tanımlanmış olup olmadığı havuza ve sorguya bağlıdır. İşte birkaç örnek.
    • Dosya havuzu çıkış dizininde yapılan değişikliklere izin verilmez: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Çıkış konusunda yapılan değişikliklere izin verilir: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Kullanıcı tanımlı foreach havuzundaki değişikliklere (kod ForeachWriter ) izin verilir, ancak değişikliğin semantiği koda bağlıdır.
  • Projeksiyon/filtre/harita benzeri işlemlerde değişikliklere izin verilir: Bazı durumlara izin verilir. Örnek:
    • Filtrelerin eklenmesine/silinmesine izin verilir: sdf.selectExpr("a") için sdf.where(...).selectExpr("a").filter(...).
    • Aynı çıkış şemasına sahip projeksiyonlardaki değişikliklere izin verilir: sdf.selectExpr("stringColumn AS json").writeStream olarak sdf.select(to_json(...).as("json")).writeStream.
    • Farklı çıkış şemasına sahip projeksiyonlardaki değişikliklere koşullu olarak izin verilir: sdf.selectExpr("a").writeStream yalnızca sdf.selectExpr("b").writeStream çıkış havuzu şemanın 'den olarak "a" değiştirilmesine "b"izin veriyorsa öğesine izin verilir.
  • Durum bilgisi olan işlemlerdeki değişiklikler: Akış sorgularındaki bazı işlemlerin sonucu sürekli güncelleştirmek için durum verilerini tutması gerekir. Yapılandırılmış Akış, durum verilerini hataya dayanıklı depolamaya (örneğin, DBFS, Azure Blob depolama) otomatik olarak denetler ve yeniden başlatıldıktan sonra geri yükler. Ancak bu, durum verilerinin şemasının yeniden başlatmalar arasında aynı kaldığı varsayılır. Başka bir deyişle, bir akış sorgusunun durum bilgisi olan işlemlerinde yapılan değişikliklere (eklemeler, silmeler veya şema değişiklikleri) yeniden başlatmalar arasında izin verilmez. Durum kurtarmayı güvence altına almak için şeması yeniden başlatmalar arasında değiştirilmemesi gereken durum bilgisi olan işlemlerin listesi aşağıdadır:
    • Akış toplama: Örneğin, sdf.groupBy("a").agg(...). Gruplandırma anahtarlarının veya toplamaların sayısında veya türünde herhangi bir değişikliğe izin verilmez.
    • Yinelenenleri kaldırma akışı: Örneğin, sdf.dropDuplicates("a"). Gruplandırma anahtarlarının veya toplamaların sayısında veya türünde herhangi bir değişikliğe izin verilmez.
    • Stream-stream birleştirme: Örneğin, sdf1.join(sdf2, ...) (her iki giriş de ile sparkSession.readStreamoluşturulur). Şemada veya eşkener birleştirme sütunlarında değişikliklere izin verilmez. Birleştirme türündeki (dış veya iç) değişikliklere izin verilmiyor. Birleştirme koşulundaki diğer değişiklikler kötü tanımlanmıştır.
    • Durum bilgisi olan rastgele işlem: Örneğin, sdf.groupByKey(...).mapGroupsWithState(...) veya sdf.groupByKey(...).flatMapGroupsWithState(...). Kullanıcı tanımlı durumun şemasında ve zaman aşımı türünde herhangi bir değişikliğe izin verilmez. Kullanıcı tanımlı durum eşleme işlevindeki herhangi bir değişikliğe izin verilir, ancak değişikliğin anlamsal etkisi kullanıcı tanımlı mantığa bağlıdır. Durum şeması değişikliklerini gerçekten desteklemek istiyorsanız, şema geçişini destekleyen bir kodlama/kod çözme şeması kullanarak karmaşık durum veri yapılarınızı açıkça bayt olarak kodlayabilir/kodunu çözebilirsiniz. Örneğin, durumunuzu Avro ile kodlanmış bayt olarak kaydederseniz, ikili durumu geri yükledikçe sorgu yeniden başlatmaları arasında Avro-state-schema değerini değiştirebilirsiniz.