Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede, bir akış sorgusunun çıkışını var olan akış havuzu olmayan veri kaynaklarına yazmak için Yapılandırılmış Akış ile birlikte kullanımı foreachBatch ele alınmaktadır.
Kod düzeni streamingDF.writeStream.foreachBatch(...) , akış sorgusunun her mikro toplu işleminin çıkış verilerine toplu iş işlevleri uygulamanıza olanak tanır.
foreachBatch ile kullanılan işlevler iki parametre alır:
- Bir mikro toplu işlemin çıkış verilerini içeren bir DataFrame.
- Mikro toplu işlemin benzersiz kimliği.
Yapılandırılmış Akış'ta Delta Lake birleştirme işlemleri için kullanmanız foreachBatch gerekir. Bkz. Akış sorgularını kullanarak güncelleme veya ekleme ("upsert") işlemi yapma hakkında foreachBatch.
Ek DataFrame işlemleri uygulama
Spark bu gibi durumlarda artımlı plan oluşturulmasını desteklemediğinden birçok DataFrame ve Veri Kümesi işlemi akış DataFrame'lerde desteklenmez. kullanarak foreachBatch() bu işlemlerin bazılarını her bir mikro toplu iş çıkışına uygulayabilirsiniz. Örneğin, akış toplamalarının çıkışını güncelleştirme modunda bir Delta tablosuna yazmak için foreachBatch() ve SQL MERGE INTO işlemini kullanabilirsiniz. Daha fazla ayrıntı için MERGE INTO'a bakın.
Önemli
-
foreachBatch()yalnızca en az bir kez yazma garantisi sağlar. Ancak, işleve sağlananbatchId'ı, çıktıyı tekrardan arındırmak ve yalnız bir kez işlenmesini garanti etmek için kullanabilirsiniz. Her iki durumda da, uçtan uca semantiği kendiniz düşünmeniz gerekir. -
foreachBatch()temel olarak bir akış sorgusunun mikro toplu yürütmesine bağlı olduğundan sürekli işleme moduyla çalışmaz. Verileri sürekli modda yazarsanız, bunun yerine kullanınforeach(). - Durumlu bir operatörle
foreachBatchkullanırken, işleme tamamlanmadan önce her toplu işlemi tamamen işlemek önemlidir. Bakınız Her bir parti DataFrame'i tamamen tüketme
ile foreachBatch() boş bir veri çerçevesi çağrılabilir ve doğru işleme izin vermek için kullanıcı kodunun dayanıklı olması gerekir. Aşağıda bir örnek verilmiştir:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0'da için foreachBatch davranış değişiklikleri
Databricks Runtime 14.0 ve üzerinde, standart erişim moduyla yapılandırılmış işlemde aşağıdaki davranış değişiklikleri geçerlidir:
-
print()komutları sürücü günlüklerine çıkış yazar. - İşlevin
dbutils.widgetsiçindeki alt modüle erişemezsiniz. - İşlevde başvurulan tüm dosyalar, modüller veya nesneler serileştirilebilir ve Spark'ta kullanılabilir olmalıdır.
Mevcut toplu iş veri kaynaklarını yeniden kullanma
kullanarak foreachBatch(), Yapılandırılmış Akış desteğine sahip olmayan veri havuzları için mevcut toplu veri yazıcılarını kullanabilirsiniz. İşte birkaç örnek:
diğer birçok toplu iş veri kaynağı adresinden foreachBatch()kullanılabilir. Bkz . Veri kaynaklarına ve dış hizmetlere bağlanma.
Birden çok konuma yazma
Bir akış sorgusunun çıkışını birden çok konuma yazmanız gerekiyorsa Databricks, en iyi paralelleştirme ve aktarım hızı için birden çok Yapılandırılmış Akış yazıcısı kullanmanızı önerir.
Birden foreachBatch çok havuza yazmak için kullanılması, akış yazma işlemlerinin yürütülmesini seri hale getirerek her mikro toplu işlem için gecikme süresini artırabilir.
Eğer foreachBatch birden çok Delta tablosuna yazmak için kullanıyorsanız, foreachBatch içindeki Idempotent tablo yazmalarını inceleyin.
Her bir toplu iş DataFrame'ini tamamen tüketme
Durum bilgisi olan işleçler kullanırken (örneğin, kullanarak dropDuplicatesWithinWatermark), her toplu işlem yinelemesinin DataFrame'in tamamını tüketmesi veya sorguyu yeniden başlatması gerekir. DataFrame'in tamamını kullanmazsanız akış sorgusu bir sonraki toplu işlemle başarısız olur.
Bu durum birkaç durumda gerçekleşebilir. Aşağıdaki örneklerde, DataFrame'i doğru şekilde tüketmeyen sorguların nasıl düzeltileceğini gösterilmektedir.
İşlem kümesinin bir alt kümesini kasıtlı olarak kullanma
Toplu iş kümesinin yalnızca bir alt kümesini önemsiyorsanız aşağıdaki gibi bir kodunuz olabilir.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Bu durumda, batch_df.show(2) yalnızca toplu işteki ilk iki öğeyi işler; bu beklenen bir durumdur, ancak daha fazla öğe varsa, bunların tüketilmesi gerekir. Aşağıdaki kod, tam DataFrame'i tüketir.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Burada do_nothing işlevi, DataFrame'in geri kalanını sessizce yoksayar.
Toplu işlemdeki bir hatayı işleme
Bir foreachBatch işlemi çalıştırılırken hata meydana gelebilir. Aşağıdaki gibi bir kodunuz olabilir (bu örnekte örnek, sorunu göstermek için kasıtlı olarak bir hata oluşturur).
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Hatayı ele alarak (ve sessizce görmezden gelerek) toplu işlemin geri kalanı işlenmeyebilir. Bu durumu işlemek için iki seçenek vardır.
İlk olarak, hatayı yeniden yükselterek toplu işlemi yeniden denemek için düzenleme katmanınıza geçirebilirsiniz. Bu, geçici bir sorunsa hatayı çözebilir veya operasyon ekibinizin el ile düzeltmeyi denemesini sağlayabilir. Bunu yapmak için kodu şöyle görünecek şekilde değiştirin partial_func :
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Hata durumunu yakalamak ve toplu işlemin geri kalanını yoksaymak istiyorsanız, ikinci seçenek kodu aşağıdaki şekilde değiştirmektir.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Bu kod, do_nothing işlevini kullanarak toplu işlemin geri kalanını sessizce göz ardı eder.