Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
foreach_batch_sink API Genel Kullanıma Açık Önizleme aşamasındadır.
ForEachBatch alıcısı, bir akışı bir dizi mikro parça olarak işlemenize olanak tanır. Her toplu iş, Apache Spark Yapılandırılmış Akış'ınkine foreachBatchbenzer özel mantıkla Python'da işlenebilir. Lakeflow Spark Bildirimli İşlem Hatları (SDP) ForEachBatch havuzuyla akış verilerini yerel olarak akış yazmalarını desteklemeyen bir veya daha fazla hedefe dönüştürebilir, birleştirebilir veya yazabilirsiniz. Bu sayfa forEachBatch havuzu ayarlama konusunda size yol gösterir, örnekler sağlar ve önemli konuları ele alır.
ForEachBatch havuzu aşağıdaki işlevleri sağlar:
- Her mikro toplu iş için özel mantık: ForEachBatch esnek bir akış havuzudur. Python koduyla rastgele eylemler (dış tabloya birleştirme, birden çok hedefe yazma veya upsert gerçekleştirme gibi) uygulayabilirsiniz.
- Tam yenileme desteği: İşlem hatları denetim noktalarını akış temelinde yönetir, bu nedenle işlem hattınızın tam yenilemesini gerçekleştirdiğinizde denetim noktaları otomatik olarak sıfırlanır. ForEachBatch sink ile, bu gerçekleştiğinde aşağı akış veri sıfırlamasını yönetmek sizin sorumluluğunuzdadır.
- Unity Kataloğu desteği: ForEachBatch havuzu, Unity Kataloğu birimlerinden veya tablolarından okuma veya bu birimlere yazma gibi tüm Unity Kataloğu özelliklerini destekler.
- Sınırlı temizlik: İşlem hattı, ForEachBatch havuzuna hangi verilerin yazıldığını izlemez, bu nedenle bu verileri temizleyemez. Aşağı akış veri yönetiminden siz sorumlusunuz.
- Olay günlüğü girişleri: Geçit olay günlüğü, her ForEachBatch sink'in oluşturulmasını ve kullanımını kaydeder. Python işleviniz serileştirilebilir değilse, olay günlüğünde ek öneriler içeren bir uyarı girdisi görürsünüz.
Uyarı
- ForEachBatch havuzu,
append_flowgibi akış sorguları için tasarlanmıştır. Yalnızca toplu işlem hatları veyaAutoCDCsemantikleri için tasarlanmamıştır. - Bu sayfada açıklanan ForEachBatch havuzu işlem hatları içindir. Apache Spark Yapılandırılmış Akış'ı da destekler
foreachBatch. Yapılandırılmış AkışforeachBatchhakkında bilgi için bkz. Rastgele veri havuzlarına yazmak için foreachBatch kullanma.
ForEachBatch havuzu ne zaman kullanılır?
İşlem hattınız, delta veya kafka gibi yerleşik bir havuz biçimi aracılığıyla sağlanamayan işlevler gerektirdiğinde ForEachBatch sink kullanın. Tipik kullanım örnekleri şunlardır:
- Delta Lake tablosuyla birleştirme veya yükseltme: Her mikro toplu iş için özel birleştirme mantığı çalıştırın (örneğin, güncelleştirilmiş kayıtları işleme).
- Birden çok veya desteklenmeyen hedefe yazma: Her toplu işlemin çıkışını akış yazma işlemlerini desteklemeyen birden çok tabloya veya dış depolama sistemlerine yazın (belirli JDBC havuzları gibi).
- Özel mantık veya dönüştürmeler uygulama: Python'daki verileri doğrudan işleme (örneğin, özel kitaplıkları veya gelişmiş dönüştürmeleri kullanarak).
Yerleşik havuzlar veya Python ile özel havuzlar oluşturma hakkında bilgi için bkz. Lakeflow Spark Bildirimli İşlem Hatlarında Havuzlar.
Sözdizimi
ForEachBatch sink'i oluşturmak için @dp.foreach_batch_sink() dekorasyonunu kullanın. Daha sonra bunu akış tanımınızda target olarak, örneğin @dp.append_flow içinde referans alabilirsiniz.
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
| Parametre | Description |
|---|---|
| name | Optional. İşlem hattı içindeki havuzu tanımlamak için benzersiz bir ad. Dahil edilmediğinde, varsayılan olarak UDF'nin adını kullanır. |
| batch_handler | Bu, her mikro toplu iş için çağrılacak kullanıcı tanımlı işlevdir (UDF). |
| Df | Geçerli mikro toplu iş için verileri içeren Spark DataFrame. |
| batch_id | Mikro toplu işlemin tamsayı kimliği. Spark her tetikleyici aralığı için bu kimliği artırır. Bir batch_id, 0 akışın başlangıcını veya tam yenilemenin başlangıcını temsil eder. Kod, foreach_batch_sink aşağı akış veri kaynakları için tam yenilemeyi düzgün bir şekilde işlemelidir. Daha fazla bilgi için sonraki bölüme bakın. |
Tam yenileme
ForEachBatch bir akış sorgusu kullandığından, işlem hattı her akış için denetim noktası dizinini izler. Tam yenilemede:
- Denetim noktası dizini sıfırlanır.
- Havuz işleviniz (
foreach_batch_sinkUDF), 0'dan başlayarak yepyeni birbatch_iddöngünün başladığını fark eder. - Hedef sisteminizdeki veriler işlem hattı tarafından otomatik olarak temizlenmez (çünkü işlem hattı verilerinizin nereye yazdığını bilmez). Temiz bir sayfa senaryosuna ihtiyacınız varsa, ForEachBatch havuzunuzun doldurduğunu dış tabloları veya konumları el ile bırakmanız veya kesmeniz gerekir.
Unity Kataloğu özelliklerini kullanma
Spark Yapılandırılmış Akış'taki foreach_batch_sink mevcut tüm Unity Kataloğu özellikleri kullanılabilir durumda kalır.
Bu, yönetilen veya harici Unity Katalog tablolarına yazmayı içerir. Unity Kataloğu yönetilen veya dış tablolara, bir Apache Spark Yapılandırılmış Akış işinde olduğu gibi mikro toplu işlemler yazabilirsiniz.
Olay günlüğü girdileri
ForEachBatch havuzu oluşturduğunuzda, SinkDefinition olayına "format": "foreachBatch" eklenerek işlem hattının olay günlüğüne dahil edilir.
Bu, ForEachBatch havuzlarının kullanımını izlemenize ve havuzunuzla ilgili uyarıları görmenize olanak tanır.
Databricks Connect ile kullanma
Sağladığınız işlev serileştirilebilir değilse (Databricks Connect için önemli bir gereksinim), olay günlüğü, Databricks Connect desteği gerekiyorsa kodunuzu basitleştirmenizi veya yeniden düzenlemenizi öneren bir WARN girdi içerir.
Örneğin, forEachBatch UDF'sinde parametreleri almak için dbutils kullanırsanız, bunun yerine UDF'de kullanmadan önce argümanı alabilirsiniz.
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
En iyi yöntemler
- ForEachBatch işlevinizi kısa tutun: İş parçacığı, yoğun kitaplık bağımlılıkları veya büyük bellek içi veri manipülasyonlarından kaçının. Karmaşık veya durum bilgisi olan mantık, serileştirme hatalarına veya performans sorunlarına yol açabilir.
- Denetim noktası klasörünüzü izleme: Akış sorguları için SDP denetim noktalarını havuza göre değil akışa göre yönetir. İşlem hattınızda birden çok akış varsa, her akışın kendi denetim noktası dizini vardır.
- Dış bağımlılıkları doğrulama: Dış sistemlere veya kitaplıklara güveniyorsanız, bunların tüm küme düğümlerine veya kapsayıcınıza yüklenip yüklenmediğini denetleyin.
-
Databricks Connect'e dikkat edin: Ortamınız gelecekte Databricks Connect'e geçebilirse kodunuzun seri hale getirilebilir olup olmadığını ve UDF'ye
dbutilsbağlıforeach_batch_sinkolmadığını denetleyin.
Sınırlamalar
- ForEachBatch için temizlik hizmeti yok: Özel Python kodunuz her yerde veri yazabileceğinden işlem hattı bu verileri temizleyemez veya izleyemez. Yazdığınız hedefler için kendi veri yönetimi veya veri saklama ilkelerinizi yönetmeniz gerekir.
- Mikro toplu iş içindeki ölçümler: İşlem hatları akış ölçümlerini toplar, ancak bazı senaryolar ForEachBatch kullanılırken eksik veya olağan dışı ölçümlere neden olabilir. Bunun nedeni ForEachBatch'ın temel alınan esnekliğinden kaynaklanır ve bu da veri akışını ve satırları izlemeyi sistem için zorlaştırır.
-
Birden fazla okuma olmadan birden çok hedefe yazmayı destekleme: Bazı müşteriler forEachBatch kullanarak bir kaynaktan bir kez okuyabilir ve sonra birden çok hedefe yazabilir. Bunu başarmak için
df.persistveyadf.cacheöğesini ForEachBatch işlevinizin içine eklemeniz gerekir. Azure Databricks bu seçenekleri kullanarak verileri yalnızca tek bir kez hazırlamaya çalışır. Bu seçenekler olmadan sorgunuz birden çok okumayla sonuçlanır. Bu, aşağıdaki kod örneklerine dahil değildir. -
Databricks Connect ile Kullanma: İşlem hattınız Databricks Connect üzerinde çalışıyorsa,
foreachBatchkullanıcı tanımlı işlevlerin (UDF) serileştirilebilir olması gerekir vedbutilskullanımı mümkün değildir. İşlem hattı, seri hale getirilemeyen bir UDF algılarsa uyarı verir, fakat işlem hattı başarısız olmaz. -
Serileştirilemez mantık: Yerel nesnelere, sınıflara veya seçilemez kaynaklara başvuran kod Databricks Connect bağlamlarında bozulabilir. Saf Python modüllerini kullanın ve Databricks Connect bir gereksinimse başvuruların (örneğin,
dbutils) kullanılmadığını onaylayın.
Örnekler
Temel söz dizimi örneği
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
Basit bir işlem hattı için örnek verileri kullanma
Bu örnekte NYC Taxi örneği kullanılmaktadır. Çalışma alanı yöneticinizin Databricks Genel Veri Kümeleri kataloğunu etkinleştirdiğini varsayar. Havuz için, erişiminiz olan bir katalog ve şemaya geçin my_catalog.my_schema .
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
Çoklu hedeflere yazma
Bu örnek birden çok hedefe veri aktarır. Delta Lake tablolarına yazma işlemini idem potent yapmak için txnVersion ve txnAppId kullanımını gösterir. Ayrıntılar için bkz. içinde Idempotent tablo yazmaları foreachBatch.
İki tabloya, table_a ve table_b, yazmakta olduğumuzu ve bir toplu işlem sırasında table_a yazımın başarılı olurken table_b yazımın başarısız olduğunu varsayalım. Toplu iş yeniden çalıştırıldığında, (txnVersion, txnAppId) çifti Delta'nın table_a üzerine yinelenen yazımı yoksaymasına ve toplu işi yalnızca table_b üzerine yazmasına olanak sağlar.
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
spark.sql()’ı kullanma
Aşağıdaki örnekte olduğu gibi ForEachBatch havuzunuzda kullanabilirsiniz spark.sql() .
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
Sıkça Sorulan Sorular (SSS)
ForEachBatch çözümleyicimde dbutils kullanabilir miyim?
İşlem hattınızı Databricks Connect olmayan bir ortamda dbutils çalıştırmayı planlıyorsanız çalışabilir. Ancak, Databricks Connect kullanıyorsanız, dbutils işlevinizin içinde foreachBatch erişilemez. İşlem hattı, dbutils kullanımını algılarsa, kesintileri önlemenize yardımcı olmak için uyarı gösterebilir.
Birden fazla akışı tek bir ForEachBatch havuzuyla kullanabilir miyim?
Evet.
@dp.append_flow ile, tümü aynı havuz adını hedefleyen birden çok akış tanımlayabilirsiniz, ancak her biri kendi denetim noktalarını korur.
Hedefim için işlem hattı veri tutmayı veya temizlemeyi yönetebilir mi?
Hayır. ForEachBatch havuzu herhangi bir rastgele konuma veya sisteme yazabildiğinden, işlem hattı bu hedefteki verileri otomatik olarak yönetemez veya silemez. Bu işlemleri özel kodunuzun veya dış işlemlerinizin bir parçası olarak işlemeniz gerekir.
ForEachBatch işlevimdeki serileştirme hata veya arızalarını nasıl çözebilirim?
Küme sürücü günlüklerinize veya işlem hattı olay günlüklerinize bakın. Spark Connect ile ilgili serileştirme sorunları için işlevinizin yalnızca seri hale getirilebilir Python nesnelerine bağlı olup olmadığını ve izin verilmeyen nesnelere (açık dosya tanıtıcıları veya dbutilsgibi) başvurmadığını denetleyin.