Rastgele veri havuzlarına yazmak için foreachBatch kullanın
Bu makalede, bir akış sorgusunun çıkışını var olan akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile birlikte kullanımı foreachBatch
ele alınmaktadır.
Kod düzeni streamingDF.writeStream.foreachBatch(...)
, akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır. ile foreachBatch
kullanılan işlevler iki parametre alır:
- Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
- Mikro toplu işlemin benzersiz kimliği.
Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch
gerekir. Bkz . foreachBatch kullanarak akış sorgularından Upsert.
Ek DataFrame işlemleri uygulama
Spark bu gibi durumlarda artımlı plan oluşturulmasını desteklemediğinden birçok DataFrame ve Veri Kümesi işlemi akış DataFrame'lerde desteklenmez. kullanarak foreachBatch()
bu işlemlerin bazılarını her bir mikro toplu iş çıkışına uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için ve SQL MERGE INTO
işlemini kullanabilirsinizforeachBath()
. MERGE INTO bölümünde daha fazla ayrıntıya bakın.
Önemli
foreachBatch()
yalnızca en az bir kez yazma garantisi sağlar. Ancak, çıkışı yinelenenleri kaldırmanın ve tam olarak bir kez garanti almanın bir yolu olarak işlevine sağlanan öğesini kullanabilirsinizbatchId
. Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir.foreachBatch()
temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanınforeach()
.
ile foreachBatch()
boş bir veri çerçevesi çağrılabilir ve doğru işleme izin vermek için kullanıcı kodunun dayanıklı olması gerekir. Aşağıda bir örnek verilmiştir:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0'da için foreachBatch
davranış değişiklikleri
Paylaşılan erişim moduyla yapılandırılan işlemde Databricks Runtime 14.0 ve üzerinde aşağıdaki davranış değişiklikleri geçerlidir:
print()
komutları sürücü günlüklerine çıkış yazar.- İşlevin
dbutils.widgets
içindeki alt modüle erişemezsiniz. - İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.
Mevcut toplu iş veri kaynaklarını yeniden kullanma
kullanarak foreachBatch()
, Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:
diğer birçok toplu iş veri kaynağı adresinden foreachBatch()
kullanılabilir. Bkz . Veri kaynaklarına bağlanma.
Birden çok konuma yazma
Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.
Birden foreachBatch
çok havuza yazmak için kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.
Birden çok Delta tablosuna yazmak için kullanıyorsanız foreachBatch
bkz . ForeachBatch'ta idempotent tablo yazmaları.