Aracılığıyla paylaş


Rastgele veri havuzlarına yazmak için foreachBatch kullanın

Bu makalede, bir akış sorgusunun çıkışını var olan akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile birlikte kullanımı foreachBatch ele alınmaktadır.

Kod düzeni streamingDF.writeStream.foreachBatch(...) , akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır. foreachBatch ile kullanılan işlevler iki parametre alır:

  • Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
  • Mikro toplu işlemin benzersiz kimliği.

Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch gerekir. Bkz. Akış sorgularını kullanarak güncelleme veya ekleme ("upsert") işlemi yapma hakkında foreachBatch.

Ek DataFrame işlemleri uygulama

Spark bu gibi durumlarda artımlı plan oluşturulmasını desteklemediğinden birçok DataFrame ve Veri Kümesi işlemi akış DataFrame'lerde desteklenmez. kullanarak foreachBatch() bu işlemlerin bazılarını her bir mikro toplu iş çıkışına uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için foreachBatch() ve SQL MERGE INTO işlemini kullanabilirsiniz. Daha fazla ayrıntı için MERGE INTO'a bakın.

Önemli

  • foreachBatch() yalnızca en az bir kez yazma garantisi sağlar. Ancak, işleve sağlanan batchId'ı, çıktıyı tekrardan arındırmak ve yalnız bir kez işlenmesini garanti etmek için kullanabilirsiniz. Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir.
  • foreachBatch() temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanın foreach() .
  • Durumlu bir operatörle foreachBatch kullanırken, işleme tamamlanmadan önce her toplu işlemi tamamen işlemek önemlidir. Bakınız Her bir parti DataFrame'i tamamen tüketme

ile foreachBatch() boş bir veri çerçevesi çağrılabilir ve doğru işleme izin vermek için kullanıcı kodunun dayanıklı olması gerekir. Aşağıda bir örnek verilmiştir:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0'da için foreachBatch davranış değişiklikleri

Databricks Runtime 14.0 ve üzerinde, standart erişim moduyla yapılandırılmış işlemde aşağıdaki davranış değişiklikleri geçerlidir:

  • print() komutları sürücü günlüklerine çıkış yazar.
  • İşlevin dbutils.widgets içindeki alt modüle erişemezsiniz.
  • İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.

Mevcut toplu iş veri kaynaklarını yeniden kullanma

kullanarak foreachBatch(), Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:

diğer birçok toplu iş veri kaynağı adresinden foreachBatch()kullanılabilir. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma.

Birden çok konuma yazma

Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.

Birden foreachBatch çok havuza yazmak için kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.

Eğer foreachBatch birden çok Delta tablosuna yazmak için kullanıyorsanız, foreachBatch içindeki Idempotent tablo yazmalarını inceleyin.

Her bir toplu iş DataFrame'ini tamamen tüketme

Durum bilgisi olan işleçler kullanırken (örneğin, kullanarak dropDuplicatesWithinWatermark), her toplu işlem yinelemesinin DataFrame'in tamamını tüketmesi veya sorguyu yeniden başlatması gerekir. DataFrame'in tamamını kullanmazsanız akış sorgusu bir sonraki toplu işlemle başarısız olur.

Bu durum birkaç durumda gerçekleşebilir. Aşağıdaki örneklerde, DataFrame'i doğru şekilde tüketmeyen sorguların nasıl düzeltileceğini gösterilmektedir.

İşlem kümesinin bir alt kümesini kasıtlı olarak kullanma

Toplu iş kümesinin yalnızca bir alt kümesini önemsiyorsanız aşağıdaki gibi bir kodunuz olabilir.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Bu durumda, batch_df.show(2) yalnızca toplu işteki ilk iki öğeyi işler; bu beklenen bir durumdur, ancak daha fazla öğe varsa, bunların tüketilmesi gerekir. Aşağıdaki kod, tam DataFrame'i tüketir.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Burada do_nothing işlevi, DataFrame'in geri kalanını sessizce yoksayar.

Toplu işlemdeki bir hatayı işleme

Bir foreachBatch işlemi çalıştırılırken hata meydana gelebilir. Aşağıdaki gibi bir kodunuz olabilir (bu örnekte örnek, sorunu göstermek için kasıtlı olarak bir hata oluşturur).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Hatayı ele alarak (ve sessizce görmezden gelerek) toplu işlemin geri kalanı işlenmeyebilir. Bu durumu işlemek için iki seçenek vardır.

İlk olarak, hatayı yeniden yükselterek toplu işlemi yeniden denemek için düzenleme katmanınıza geçirebilirsiniz. Bu, geçici bir sorunsa hatayı çözebilir veya operasyon ekibinizin el ile düzeltmeyi denemesini sağlayabilir. Bunu yapmak için kodu şöyle görünecek şekilde değiştirin partial_func :

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Hata durumunu yakalamak ve toplu işlemin geri kalanını yoksaymak istiyorsanız, ikinci seçenek kodu aşağıdaki şekilde değiştirmektir.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Bu kod, do_nothing işlevini kullanarak toplu işlemin geri kalanını sessizce göz ardı eder.