Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Important
Bu özellik Genel Önizleme aşamasındadır.
Bu sayfada, uçtan uca gecikme süresi 5 ms'ye kadar olan ultra düşük gecikme süresine sahip veri işlemeyi sağlayan Yapılandırılmış Akış için bir tetikleyici türü olan gerçek zamanlı mod açıklanmaktadır. Bu mod, akış verilerine anında yanıt gerektiren operasyonel iş yükleri için tasarlanmıştır.
Gerçek zamanlı mod Databricks Runtime 16.4 LTS ve sonraki sürümlerde kullanılabilir.
İşletimsel iş yükleri
Akış iş yükleri, analitik iş yüklerine ve operasyonel iş yüklerine geniş ölçüde ayrılabilir:
- Analitik iş yükleri genellikle madalyon mimarisini (örneğin, verileri bronz, gümüş ve altın tablolara alma) izleyerek veri alımı ve dönüşümü kullanır.
- operasyonel iş yükleri gerçek zamanlı verileri kullanır, iş mantığı uygular ve aşağı akış eylemlerini veya kararlarını tetikler.
İşletimsel iş yüklerine bazı örnekler şunlardır:
- Olağan dışı konum, büyük işlem boyutu veya hızlı harcama desenleri gibi faktörlere bağlı olarak, bir dolandırıcılık puanı eşiği aşarsa kredi kartı işlemini gerçek zamanlı olarak engelleme veya işaretleme.
- Kullanıcının beş dakikadır kot pantolonlara göz atmakta olduğunu gösteren tıklama akışı verileri, kullanıcıya sonraki 15 dakika içinde satın almaları durumunda %25 indirim sunan tanıtım amaçlı bir mesaj göndermeyi sağlar.
Genel olarak işletimsel iş yükleri, saniyenin altında uçtan uca gecikme süresine duyulan gereksinimle karakterize edilir. Bu, Apache Spark Yapılandırılmış Akış'ta gerçek zamanlı modla elde edilebilir.
Gerçek zamanlı mod düşük gecikme süresine nasıl ulaşır?
Gerçek zamanlı mod şu şekilde yürütme mimarisini geliştirir:
- Uzun süre çalışan toplu işlemleri yürütme (varsayılan değer 5 dakikadır) ve bu işlemde veriler kaynakta kullanılabilir hale geldikçe işlenir.
- Sorgunun tüm aşamaları aynı anda zamanlanır. Bu, kullanılabilir görev yuvalarının sayısının bir toplu işteki tüm aşamaların görev sayısına eşit veya daha fazla olmasını gerektirir.
- Veriler, akış karıştırma kullanılarak üretildiği anda aşamalar arasında geçirilir.
Toplu işlemi işlemenin sonunda ve sonraki toplu işlem başlamadan önce Yapılandırılmış Akış denetim noktaları ilerleme gösterir ve son toplu iş için ölçümleri kullanılabilir hale getirir. Toplu işlemler daha uzunsa, bu etkinlikler daha az sıklıkta olabilir ve hata durumunda daha uzun süre yeniden yürütmeye ve ölçümlerin kullanılabilirliğini geciktirmeye neden olabilir. Öte yandan, partiler daha küçükse, bu etkinlikler daha sık hale gelir ve gecikme üzerinde etkili olabilir. Databricks, uygun tetikleyici aralığını bulmak için gerçek zamanlı modu hedef iş yükünüz ve gereksinimlerinizle karşılaştırmanızı önerir.
Küme yapılandırması
Yapılandırılmış Akış'ta gerçek zamanlı modu kullanmak için klasik bir Lakeflow İşi yapılandırmanız gerekir:
Azure Databricks çalışma alanınızda sol üst köşedeki Yeni'ye tıklayın. Diğer'i seçin ve Küme'ye tıklayın.
Foton hızlandırmayı temizleyin.
Otomatik ölçeklendirmeyi etkinleştir seçeneğinin işaretini kaldırın.
Gelişmiş performans'ın altında Spot örnekleri kullan seçeneğinin işaretini kaldırın.
Gelişmiş ve Erişim modu altında Manuel'e tıklayın ve Ayrılmış (eski adı: Tek kullanıcı)'yı seçin.
Spark'ın altında Spark yapılandırması altında aşağıdakileri girin:
spark.databricks.streaming.realTimeMode.enabled trueOluştur'utıklayın.
Küme boyutu gereksinimleri
Kümede yeterli görev yuvası varsa, küme başına bir gerçek zamanlı iş çalıştırabilirsiniz.
Düşük gecikme modunda çalışmak için toplam kullanılabilir görev yuvası sayısı tüm sorgu aşamalarındaki görev sayısından büyük veya buna eşit olmalıdır.
Slot hesaplama örnekleri
Tek aşamalı durumsuz boru hattı (Kafka kaynağı + sonlandırıcı)
maxPartitions = 8 ise en az 8 yuvaya ihtiyacınız vardır. maxPartitions ayarlanmadıysa Kafka konu bölümü sayısını kullanın.
durum bilgisi olan iki aşamalı işlem hattı (Kafka kaynağı + karıştırma):
maxPartitions = 8 ve karışık bölümler = 20 ise, 8 + 20 = 28 yuva gerekir.
Üç aşamalı boru hattı (Kafka kaynağı + karıştırma + yeniden bölümleme):
maxPartitions = 8 ve her biri 20 olan iki karıştırma aşamasıyla, toplamda 48 yuva gerekir: 8 + 20 + 20.
Dikkat edilmesi gereken temel konular
Kümenizi yapılandırırken bunu dikkate alın:
- Mikro toplu iş modundan farklı olarak, gerçek zamanlı görevler verileri beklerken boşta kalabilir, bu nedenle kaynakların boşa harcanmasını önlemek için doğru boyutlandırma gereklidir.
- Ayarlama yaparak hedef kullanım düzeyini (örneğin, 50%) hedefleyin:
-
maxPartitions(Kafka için) -
spark.sql.shuffle.partitions(karıştırma aşaması için)
-
- Databricks, her görevin ek yükü azaltmak için birden çok Kafka bölümünü işlemesi için maxPartitions ayarını önerir.
- Çalışan başına görev yuvalarını, basit tek aşamalı işler için iş yüküyle eşleşecek şekilde ayarlayın.
- Yoğun karıştırmalı işler için birikimlerden kaçınan en az karıştırma bölümü sayısını bulmak için denemeler yapın ve buradan ayarlayın. Kümede yeterli boşluk yoksa iş zamanlanmayacak.
Note
Databricks Runtime 16.4 LTS ve sonraki sürümlerde tüm gerçek zamanlı işlem hatları, gerçek zamanlı ve mikro toplu iş modları arasında sorunsuz geçişe olanak sağlayan checkpoint v2'yi kullanır.
Sorgu yapılandırması
Bir sorgunun düşük gecikme modu kullanılarak çalıştırılmasını belirtmek için gerçek zamanlı tetikleyiciyi etkinleştirmeniz gerekir. Ayrıca gerçek zamanlı tetikleyiciler yalnızca güncelleştirme modunda desteklenir. Örneğin:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Daha önce, uçtan uca sorgu gecikme süresi toplu işlem süresine çok yakındı ve bu da toplu işlem süresini sorgu gecikmesinin iyi bir göstergesi haline getiriyordu. Ancak bu yöntem artık gerçek zamanlı modda uygulanmaz ve gecikme süresini ölçmek için alternatif yaklaşımlar gerektirir. Uçtan uca gecikme süresi iş yüküne özgüdür ve bazen yalnızca iş mantığıyla doğru bir şekilde ölçülebilir. Örneğin, kaynak zaman damgası Kafka'da çıkarılırsa, gecikme süresi Kafka'nın çıkış zaman damgası ile kaynak zaman damgası arasındaki fark olarak hesaplanabilir.
Akış işlemi sırasında toplanan kısmi bilgilere göre uçtan uca gecikme süresini çeşitli yollarla tahmin edebilirsiniz.
StreamingQueryProgress kullanma
Aşağıdaki ölçümler, sürücü günlüklerine otomatik olarak kaydedilen StreamingQueryProgress olayına dahildir. Bu öğelere StreamingQueryListener'un onQueryProgress() geri çağırma işlevi aracılığıyla da erişebilirsiniz.
QueryProgressEvent.json() veya toString() fazladan gerçek zamanlı mod ölçümleri ekleyin.
- İşleme gecikme süresi (processingLatencyMs). Gerçek zamanlı mod sorgusunun bir kaydı okuması ile bu kaydın sonraki aşamaya veya sonraki işlemlere yazılmadan önce geçen süre. Tek aşamalı sorgular için bu, E2E gecikme süresiyle aynı süreyi ölçer. Bu ölçüm görev başına bildirilir.
- Kaynak kuyruğa alma gecikmesi (sourceQueuingLatencyMs). Bir kaydın ileti veri yoluna (örneğin, Kafka'daki günlük ekleme zamanı) başarıyla yazılması ve bu kaydın gerçek zamanlı mod sorgusu tarafından ilk kez okunması arasındaki geçen süre. Bu ölçüm görev başına bildirilir.
- E2E Gecikme Süresi (e2eLatencyMs). Kaydın bir ileti veri yolu için başarıyla yazıldığında ve kaydın gerçek zamanlı mod sorgusu tarafından aşağı akışa yazıldığındaki süre. Bu ölçüm, tüm görevler tarafından işlenen kayıtlar üzerinden her bir toplu iş için toplanır.
Örneğin:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
İşlerde Gözlem API'si kullanma
Gözlemleme API'si, başka bir iş başlatmadan gecikme süresini ölçmeye yardımcı olur. Kaynak veri varış zamanına yakın bir kaynak zaman damganız varsa ve havuza ulaşmadan önce geçirildiyse veya zaman damgasını geçirmenin bir yolunu bulabilirseniz, GözlemLEME API'sini kullanarak her toplu işlemin gecikme süresini tahmin edebilirsiniz:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Bu örnekte, girişin çıkışından önce geçerli bir zaman damgası kaydedilir ve bu zaman damgası ile kaydın kaynak zaman damgası arasındaki fark hesaplanarak gecikme süresi tahmin edilir. Sonuçlar devam eden raporlara eklenir ve dinleyicilerin kullanımına sunulur. Örnek çıktı aşağıdaki gibidir:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Neler desteklenir?
Environments
| Küme Türü | Supported |
|---|---|
| Tahsis Edilmiş (eski adı: tek kullanıcı) | Yes |
| Standart (eski adı: paylaşılan) | No |
| Lakeflow Spark Deklaratif Boru Hatları Klasik | No |
| Lakeflow Spark Deklaratif İşlem Hatları Sunucusuz | No |
| Serverless | No |
Languages
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Yürütme Modları
| Yürütme Modu | Supported |
|---|---|
| Güncelleştirme modu | Yes |
| Append mode | No |
| Tamamlama modu | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (Kafka Bağlayıcısı'nı kullanarak) | Yes |
| Kinesis | Evet (yalnızca EFO modu) |
| Google Pub/Sub (Mesajlaşma Hizmeti) | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (Kafka Bağlayıcısı'nı kullanarak) | Yes |
| Kinesis | No |
| Google Pub/Sub (Mesajlaşma Hizmeti) | No |
| Apache Pulsar | No |
| Rastgele Havuzlar (forEachWriter kullanarak) | Yes |
Operators
| Operators | Supported |
|---|---|
| Durum Bilgisi Olmayan İşlemler | |
|
Yes |
|
Yes |
| UDFs | |
|
Evet (bazı sınırlamalarla) |
|
Evet (bazı sınırlamalarla) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Toplama işlevleri | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Evet (durum sınırsız) |
|
No |
| Akış - Tablo Birleştirme | |
|
Yes |
| Stream - Akışa Katılma | No |
| (düz)MapGroupsWithState | No |
| transformWithState | Evet (bazı farklarla) |
| union | Evet (bazı sınırlamalarla) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Hayır (sınırlamaya bakın) |
TransformWithState'i gerçek zamanlı modda kullanma
Databricks, durum bilgisi olan özel uygulamalar oluşturmak için Apache Spark Yapılandırılmış Akış'ta bir API'yi destekler transformWithState. API ve kod parçacıkları hakkında daha fazla bilgi için bkz. Durum bilgisi olan özel bir uygulama oluşturma .
Ancak, API'nin gerçek zamanlı modda davranışı ile mikro toplu iş mimarisinden yararlanan geleneksel akış sorguları arasında bazı farklılıklar vardır.
- Gerçek zamanlı modda
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)yöntemi her satır için çağrılır.- Yineleyici
inputRowstek bir değer döndürür. Mikro toplu iş modunda, her anahtar için bir kez çağrılır veinputRowsyineleyici, mikro toplu işteki bir anahtarın tüm değerlerini döndürür. - Kodlarını yazarken bu farka karşı duyarlı olmanız gerekir.
- Yineleyici
- Olay zamanı zamanlayıcıları gerçek zamanlı modda desteklenmez.
- Gerçek zamanlı modda süreölçerler, verinin varışına bağlı olarak tetiklemede gecikir. Aksi takdirde, veri yoksa, uzun süreli çalışan toplu iş sonunda tamamlanır. Örneğin, bir zamanlayıcının 10:00:00'da tetiklemesi gerekiyorsa ve aynı anda veri gelişi yoksa tetiklenmez. Bunun yerine, veriler 10:00:10'da ulaşırsa zamanlayıcı 10 saniyelik bir gecikmeyle tetiklenir. Veya veri gelmezse ve uzun süre çalışan toplu iş sonlandırılırsa, uzun süre çalışan toplu işlemi sonlandırmadan önce zamanlayıcıyı çalıştırır.
Python UDF'leri
Databricks, gerçek zamanlı modda Python kullanıcı tanımlı işlevlerin (UDF) çoğunluğunu destekler:
| UDF türü | Supported |
|---|---|
| Durum Bilgisi Olmayan UDF | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Durum Bilgisi Olan Gruplama UDF (UDAF) | |
|
Yes |
|
No |
| Durum bilgisi olmayan Gruplama UDF (UDAF) | |
|
No |
|
No |
|
No |
| Tablo işlevi | |
|
No |
| UC UDF | No |
Python UDF'lerini gerçek zamanlı modda kullanırken dikkate alınması gereken birkaç nokta vardır:
- Gecikme süresini en aza indirmek için Ok toplu iş boyutunu (spark.sql.execution.arrow.maxRecordsPerBatch) 1 olarak yapılandırın.
- Denge: Bu yapılandırma, aktarım hızına bağlı olarak gecikme süresini iyileştirir. Çoğu iş yükü için bu ayar önerilir.
- Toplu iş boyutunu yalnızca giriş hacmine uyum sağlamak için daha yüksek bir aktarım hızı gerekiyorsa artırın ve gecikme süresindeki olası artışı kabul edin.
- Pandas UDF'leri ve işlevleri, Ok toplu iş boyutu 1 ile iyi performans göstermemektedir.
- Pandas UDF'leri veya işlevleri kullanıyorsanız, Ok toplu iş boyutunu daha yüksek bir değere ayarlayın (örneğin, 100 veya üzeri).
- Bunun daha yüksek gecikme süresi anlamına geldiğini unutmayın. Databricks, mümkünse Ok UDF'sini veya işlevini kullanmanızı önerir.
- Pandas ile ilgili performans sorunu nedeniyle transformWithState yalnızca arabirimiyle
Rowdesteklenir.
İyileştirme teknikleri
| Technique | Varsayılan olarak etkin |
|---|---|
| Asenkron ilerleme izleme: Yazmayı, offset günlüğü ve taahhüt günlüğüne asenkron bir iş parçacığına kaydırarak iki mikro toplu işlem arasındaki süreyi azaltır. Bu, durum bilgisi olmayan akış sorgularının gecikme süresini azaltmaya yardımcı olabilir. | No |
| Zaman uyumsuz durum denetim noktası oluşturma: Durum denetimi için beklemeden önceki mikro toplu işlemin tamamlanmasından hemen sonra sonraki mikro toplu işlemi işlemeye başlayarak durum bilgisi olan akış sorgularının gecikme süresini azaltmaya yardımcı olur. | No |
Limitations
Kaynak sınırlaması
Kinesis için yoklama modu desteklenmez. Ayrıca sık yapılan yeniden bölümlendirmeler gecikme süresini olumsuz etkileyebilir.
Birleşim sınırlaması
Union için bazı sınırlamalar vardır:
- Kendi kendine birleşim desteklenmez:
- Kafka: Aynı kaynak veri çerçevesi nesnesini ve birleşimden türetilmiş veri çerçevelerini kullanamazsınız. Geçici çözüm: Aynı kaynaktan okunan farklı Veri Çerçeveleri kullanın.
- Kinesis: Aynı Kinesis kaynağından türetilen veri çerçevelerini aynı yapılandırmayla ilişkilendiremezsiniz. Geçici çözüm: Farklı Veri Çerçeveleri kullanmanın yanı sıra, her DataFrame'e farklı bir 'consumerName' seçeneği atayabilirsiniz.
- Birleşimden önce tanımlanmış durum bilgisine sahip işleçler (örneğin,
aggregate,deduplicate,transformWithState) desteklenmez. - Toplu kaynaklarla birleşim desteklenmez.
MapPartitions sınırlaması
mapPartitions Scala'da ve benzer Python API'lerinde (mapInPandas, mapInArrow) giriş bölümünün tamamının yineleyicisini alır ve giriş ile çıkış arasında rastgele eşleme ile çıkışın tamamının yineleyicisini oluşturur. Bu API'ler, çıkışın tamamını engelleyerek Akış Real-Time Modu'nda performans sorunlarına neden olabilir ve bu da gecikme süresini artırır. Bu API'lerin semantiği filigran yayılımını iyi desteklemez.
Karmaşık veri türlerini dönüştürme ile birlikte skaler UDF'leri kullanın veya filter bunun yerine benzer işlevler elde edin.
Examples
Aşağıdaki örneklerde desteklenen sorgular gösterilmektedir.
Durum bilgisi olmayan sorgular
Tüm tek aşamalı veya çok aşamalı durum bilgisi olmayan sorgular desteklenir.
Kafka kaynağı-Kafka havuzu
Bu örnekte, bir Kafka kaynağından okur ve bir Kafka havuzuna yazarsınız.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Repartition
Bu örnekte, kafka kaynağından okuma, verileri 20 bölüme yeniden bölümleme ve kafka havuzuna yazma.
Yeniden bölümleme işleminden önce Spark yapılandırmasını spark.sql.execution.sortBeforeRepartitionfalse olarak ayarlayın.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Stream-snapshot join (yalnızca yayın)
Bu örnekte Kafka'dan okur, verileri statik bir tabloyla birleştirir ve kafka havuzuna yazarsınız. Yalnızca statik tabloyu yayınlayan akış statik birleşimlerinin desteklendiğini unutmayın; bu da statik tablonun belleğe sığması gerektiği anlamına gelir.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kinesis kaynağı-Kafka havuzu
Bu örnekte, bir Kinesis kaynağından okuyorsunuz ve Kafka havuzuna yazıyorsunuz.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
Bu örnekte, iki farklı konudan iki Kafka DataFrame'i birleştirilir ve bir Kafka havuzuna yazarsınız.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Durum bilgisi olan sorgular
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Toplama ile Birleşme
Bu örnekte, önce iki farklı konudan iki Kafka DataFrame'i bir araya getirip bir toplama yaparsınız. Sonunda Kafka havuzuna yazarsınız.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Gerçek zamanlı mod ile Yapılandırılmış Akış'taki diğer çalışma modlarının StatefulProcessor içindeki çalışma yöntemleri transformWithState arasında bir fark vardır. Bkz . TransformWithState'i gerçek zamanlı modda kullanma
TransformWithState (PySpark, Satır arabirimi)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Gerçek zamanlı mod ile Yapılandırılmış Akış'taki diğer yürütme modlarının içinde StatefulProcessorçalıştırma şekli transformWithState arasında bir fark vardır. Bkz . TransformWithState'i gerçek zamanlı modda kullanma
Sinks
foreachSink aracılığıyla Postgres'e yazma
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Bu özellik Databricks Runtime 17.1 ve sonraki sürümlerde kullanılabilir.
Görüntüleme hızı kaynağı
Bu örnekte, bir hız kaynağından okur ve akış DataFrame'i bir not defterinde görüntülersiniz.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())