Aracılığıyla paylaş


Rastgele veri havuzlarına yazmak için foreachBatch kullanın

Bu makalede, bir akış sorgusunun çıkışını var olan akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile birlikte kullanımı foreachBatch ele alınmaktadır.

Kod düzeni streamingDF.writeStream.foreachBatch(...) , akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır. ile foreachBatch kullanılan işlevler iki parametre alır:

  • Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
  • Mikro toplu işlemin benzersiz kimliği.

Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch gerekir. Bkz . foreachBatch kullanarak akış sorgularından Upsert.

Ek DataFrame işlemleri uygulama

Spark bu gibi durumlarda artımlı plan oluşturulmasını desteklemediğinden birçok DataFrame ve Veri Kümesi işlemi akış DataFrame'lerde desteklenmez. kullanarak foreachBatch() bu işlemlerin bazılarını her bir mikro toplu iş çıkışına uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için ve SQL MERGE INTO işlemini kullanabilirsinizforeachBath(). MERGE INTO bölümünde daha fazla ayrıntıya bakın.

Önemli

  • foreachBatch() yalnızca en az bir kez yazma garantisi sağlar. Ancak, çıkışı yinelenenleri kaldırmanın ve tam olarak bir kez garanti almanın bir yolu olarak işlevine sağlanan öğesini kullanabilirsiniz batchId . Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir.
  • foreachBatch() temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanın foreach() .

ile foreachBatch() boş bir veri çerçevesi çağrılabilir ve doğru işleme izin vermek için kullanıcı kodunun dayanıklı olması gerekir. Aşağıda bir örnek verilmiştir:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0'da için foreachBatch davranış değişiklikleri

Paylaşılan erişim moduyla yapılandırılan işlemde Databricks Runtime 14.0 ve üzerinde aşağıdaki davranış değişiklikleri geçerlidir:

  • print() komutları sürücü günlüklerine çıkış yazar.
  • İşlevin dbutils.widgets içindeki alt modüle erişemezsiniz.
  • İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.

Mevcut toplu iş veri kaynaklarını yeniden kullanma

kullanarak foreachBatch(), Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:

diğer birçok toplu iş veri kaynağı adresinden foreachBatch()kullanılabilir. Bkz . Veri kaynaklarına bağlanma.

Birden çok konuma yazma

Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.

Birden foreachBatch çok havuza yazmak için kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.

Birden çok Delta tablosuna yazmak için kullanıyorsanız foreachBatch bkz . ForeachBatch'ta idempotent tablo yazmaları.