Rastgele veri havuzlarına yazmak için foreachBatch kullanın

Bu sayfada, bir akış sorgusunun çıkışını mevcut akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile nasıl kullanılacağı foreachBatch gösterilmektedir.

Kod düzeni streamingDF.writeStream.foreachBatch(...) , akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır. foreachBatch ile kullanılan işlevler iki parametre alır:

  • Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
  • Mikro toplu işlemin benzersiz kimliği.

Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch gerekir. Bkz. Akış sorgularını kullanarak güncelleme veya ekleme ("upsert") işlemi yapma hakkında foreachBatch.

Ek DataFrame işlemleri uygulama

Spark bu gibi durumlarda artımlı planlar oluşturmayı desteklemediğinden, birçok DataFrame ve Dataset işlemi akışkan DataFrame'lerde desteklenmez. foreachBatch() kullanarak her bir mikro toplu iş çıkışına bu işlemlerin bazılarını uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için foreachBatch() ve SQL MERGE INTO işlemini kullanabilirsiniz. Daha fazla ayrıntı için MERGE INTO'a bakın.

Önemli

  • foreachBatch() yalnızca en az bir kez yazma garantisi sağlar. Ancak, işleve sağlanan batchId'ı, çıktıyı tekrardan arındırmak ve yalnız bir kez işlenmesini garanti etmek için kullanabilirsiniz. Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir.
  • foreachBatch() temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanın foreach() .
  • Durumlu bir operatörle foreachBatch kullanırken, işleme tamamlanmadan önce her toplu işlemi tamamen işlemek önemlidir. Bakınız Her bir parti DataFrame'i tamamen tüketme

Boş DataFrame'leri işleme

foreachBatch() boş bir DataFrame alabilir ve kodunuzun bu senaryoyla ilgilenmesi gerekir. Aksi takdirde sorgunuz başarısız olabilir.

Örneğin, Delta Lake akış kaynağı olduğunda, bu senaryolar foreachBatch() öğesine boş bir DataFrame geçirebilir.

  • OPTIMIZE işlenecek dosya olmadığında: Delta Lake kaynak tablosunda bir OPTIMIZE işlemi çalıştırıldığında ancak işlenecek dosya olmadığında, Yapılandırılmış Akış tablo sürümünü artırmak için bir ofset günlüğü girişi yazar. Bu, hiçbir dosya okunmamasına rağmen havuz üzerinde boş bir mikro toplu iş oluşturur.
  • Fiziksel plan düzeyinde dosya ayıklama: Koşul indirgeme veya dosya ayıklama fiziksel plan düzeyindeki tüm kayıtları ortadan kaldırırsa, sonuç hedefe boş bir işleme olur.

Kullanıcı kodunun düzgün işlemeye izin vermek için boş DataFrame'leri işlemesi gerekir. Aşağıdaki örneklere bakın:

Python

def process_batch(output_df, batch_id):
  # Process valid DataFrames only
  if not output_df.isEmpty():
    # business logic
    pass

streamingDF.writeStream.foreachBatch(process_batch).start()

Scala

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid DataFrames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0'da için foreachBatch davranış değişiklikleri

Databricks Runtime 14.0 ve üzerinde, standart erişim moduyla yapılandırılmış işlemde aşağıdaki davranış değişiklikleri geçerlidir:

  • print() komutları sürücü günlüklerine çıkış oluşturur.
  • İşlevin dbutils.widgets içindeki alt modüle erişemezsiniz.
  • İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.

Mevcut toplu iş veri kaynaklarını yeniden kullanma

Kullanarak foreachBatch(), Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:

Diğer birçok toplu iş veri kaynağı foreachBatch() adresinden kullanılabilir. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma.

Birden çok konuma yazmak

Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.

Birden fazla havuza yazmak için foreachBatch kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.

Birden çok Delta tablosuna veri yazıyorsanız foreachBatch kullanma konusunda daha fazla bilgi için bkz. Idempotent tablo yazma işlemleri için foreachBatch kullanma.

Her bir toplu iş DataFrame'ini tamamen tüketme

Durum bilgisi olan işleçler kullanırken (örneğin, kullanarak dropDuplicatesWithinWatermark), her toplu işlem yinelemesinin DataFrame'in tamamını tüketmesi veya sorguyu yeniden başlatması gerekir. DataFrame'in tamamını kullanmazsanız akış sorgusu bir sonraki toplu işlemle başarısız olur.

Bu durum birkaç durumda gerçekleşebilir. Aşağıdaki örneklerde, DataFrame'i doğru şekilde tüketmeyen sorguların nasıl düzeltileceğini gösterilmektedir.

İşlem kümesinin bir alt kümesini kasıtlı olarak kullanma

Toplu iş kümesinin yalnızca bir alt kümesini önemsiyorsanız aşağıdaki gibi bir kodunuz olabilir.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Bu durumda, batch_df.show(2) yalnızca toplu işteki ilk iki öğeyi işler; bu beklenen bir durumdur, ancak daha fazla öğe varsa, bunların tüketilmesi gerekir. Aşağıdaki kod, tam DataFrame'i tüketir.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Burada do_nothing işlevi, DataFrame'in geri kalanını sessizce yoksayar.

Toplu işlemdeki bir hatayı işleme

foreachBatch'de hata işleme için Databricks, akış sorgusunun hızla hata vermesine izin vermenizi ve bunun yerine tekrar denemeleri yönetmek için Lakeflow Jobs veya Apache Airflow gibi oturum katmanını kullanmanızı önerir. Bu, veri kaybının oluşabileceği kodunuzda karmaşık yeniden deneme döngüleri oluşturmaktan çok daha güvenlidir.

Yazma hedefinize dayalı yönergeler şunlardır:

Hedef Örnekler Kılavuz
DataFrame işlemleri Delta Lake tabloları Kullandığınız txnAppId ve txnVersion yazma seçenekleri ile txnVersion'yi batchId'ye bağlayarak, idempotensiyi garanti etmeli ve yeniden denemelerde veri doğruluğunu korumalısınız. Özel durumları yerel olarak yakalayıp yeniden denemeyin. Bunun yerine Databricks, Spark ölçümlerinin doğru kalması, verilerin yinelenmemesi ve düzenleyicinin tam toplu işlemi temiz bir şekilde yeniden denemesi için hataların yayılmasına izin vermenizi önerir.
Özel kod ve dış hedefler .collect(), OLTP veritabanları, ileti kuyrukları, API'ler Kendi idempotensliğinizi uygulayın. Herhangi bir işlemin partiler arasında yeniden denenebileceğini varsaymalısınız. batchId aynı kalırsa, işleminizin sonucu aynı kalmalıdır. Kısa bağlantı zaman aşımları gibi yalnızca geçici hataları yeniden deneyebilirsiniz, ancak yeniden denemenin sonunda başarısız olması durumunda kısmi veya yinelenen yazma işlemlerinden kaçınmak için çok dikkatli olun. En güvenli yaklaşım, hataların yayılmasına izin vermek ve düzenleyicinin toplu işlemin tamamını yeniden denemesine izin vermektir.

Burada, içinde özel durum türlerine ve bunların nasıl işleneceğini gösteren önerilere bazı örnekler verilmiştir foreachBatch:

Özel durum türü Örnekler Önerilen eylem
Geçici çıkış noktası hataları SQLTransientConnectionException, HTTP 429, zaman aşımları Yakala: yeniden deneyin veya ölü mesaj kuyruğuna gönderin
Havuz bir kez etkili olduğunda yinelenen veya anahtar kısıtlama ihlalleri SQLIntegrityConstraintViolationException Yakala: günlüğe kaydetme ve gizleme
Özel yeniden denenebilir hatalar Sarmalanmış soket özel durumları, yeniden denenebilir veritabanı hataları Catch: Ölçümleri artır ve denetimli süreç devamına izin ver
Mantık veya şema hataları NullPointerException, AttributeError, şema uyuşmazlığı Yay: Spark'ın sorguda başarısız olmasına izin ver
Geri alınamaz veri alıcısı hataları veya algılanmayan mantık hataları ValueError, PermissionError Yay: Spark'ın sorguda başarısız olmasına izin ver
Kritik hatalar OutOfMemoryError, bozuk durum, veri bütünlüğü ihlalleri Yay: Spark'ın sorguda başarısız olmasına izin ver

Kod örnekleri: özel durum işleme

Aşağıdaki örnekler, hatayı ele almak için farklı yaklaşımları göstermek üzere foreach içinde kasıtlı olarak bir hata oluşturur:

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Yukarıdaki kod hatayı işler ve sessizce gizler ve toplu işin geri kalanını kullanmayabilir. Bu durumu işlemek için iki seçenek vardır.

İlk olarak, hatayı daha üst bir katmana ileterek, orkestrasyon katmanınızın toplu işlemi yeniden denemesini sağlayabilirsiniz. Bu, geçici bir sorunsa hatayı çözebilir veya operasyon ekibinizin el ile düzeltmeyi denemesini sağlayabilir. Bunu yapmak için kodu şöyle görünecek şekilde değiştirin partial_func :

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

İkincisi, istisnayı yakalamak ve toplu işlemin kalanını yoksaymak istiyorsanız, toplu işlemin kalanını sessizce yoksaymak için kodu do_nothing işlevini kullanacak şekilde değiştirebilirsiniz.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()