Aracılığıyla paylaş


Yapılandırılmış Akış'ta gerçek zamanlı mod

Important

Bu özellik Genel Önizleme aşamasındadır.

Bu sayfada, uçtan uca gecikme süresi 5 ms'ye kadar olan ultra düşük gecikme süresine sahip veri işlemeyi sağlayan Yapılandırılmış Akış için bir tetikleyici türü olan gerçek zamanlı mod açıklanmaktadır. Bu mod, akış verilerine anında yanıt gerektiren operasyonel iş yükleri için tasarlanmıştır.

Gerçek zamanlı mod Databricks Runtime 16.4 LTS ve sonraki sürümlerde kullanılabilir.

İşletimsel iş yükleri

Akış iş yükleri, analitik iş yüklerine ve operasyonel iş yüklerine geniş ölçüde ayrılabilir:

  • Analitik iş yükleri genellikle madalyon mimarisini (örneğin, verileri bronz, gümüş ve altın tablolara alma) izleyerek veri alımı ve dönüşümü kullanır.
  • operasyonel iş yükleri gerçek zamanlı verileri kullanır, iş mantığı uygular ve aşağı akış eylemlerini veya kararlarını tetikler.

İşletimsel iş yüklerine bazı örnekler şunlardır:

  • Olağan dışı konum, büyük işlem boyutu veya hızlı harcama desenleri gibi faktörlere bağlı olarak, bir dolandırıcılık puanı eşiği aşarsa kredi kartı işlemini gerçek zamanlı olarak engelleme veya işaretleme.
  • Kullanıcının beş dakikadır kot pantolonlara göz atmakta olduğunu gösteren tıklama akışı verileri, kullanıcıya sonraki 15 dakika içinde satın almaları durumunda %25 indirim sunan tanıtım amaçlı bir mesaj göndermeyi sağlar.

Genel olarak işletimsel iş yükleri, saniyenin altında uçtan uca gecikme süresine duyulan gereksinimle karakterize edilir. Bu, Apache Spark Yapılandırılmış Akış'ta gerçek zamanlı modla elde edilebilir.

Gerçek zamanlı mod düşük gecikme süresine nasıl ulaşır?

Gerçek zamanlı mod şu şekilde yürütme mimarisini geliştirir:

  • Uzun süre çalışan toplu işlemleri yürütme (varsayılan değer 5 dakikadır) ve bu işlemde veriler kaynakta kullanılabilir hale geldikçe işlenir.
  • Sorgunun tüm aşamaları aynı anda zamanlanır. Bu, kullanılabilir görev yuvalarının sayısının bir toplu işteki tüm aşamaların görev sayısına eşit veya daha fazla olmasını gerektirir.
  • Veriler, akış karıştırma kullanılarak üretildiği anda aşamalar arasında geçirilir.

Toplu işlemi işlemenin sonunda ve sonraki toplu işlem başlamadan önce Yapılandırılmış Akış denetim noktaları ilerleme gösterir ve son toplu iş için ölçümleri kullanılabilir hale getirir. Toplu işlemler daha uzunsa, bu etkinlikler daha az sıklıkta olabilir ve hata durumunda daha uzun süre yeniden yürütmeye ve ölçümlerin kullanılabilirliğini geciktirmeye neden olabilir. Öte yandan, partiler daha küçükse, bu etkinlikler daha sık hale gelir ve gecikme üzerinde etkili olabilir. Databricks, uygun tetikleyici aralığını bulmak için gerçek zamanlı modu hedef iş yükünüz ve gereksinimlerinizle karşılaştırmanızı önerir.

Küme yapılandırması

Yapılandırılmış Akış'ta gerçek zamanlı modu kullanmak için klasik bir Lakeflow İşi yapılandırmanız gerekir:

  1. Azure Databricks çalışma alanınızda sol üst köşedeki Yeni'ye tıklayın. Diğer'i seçin ve Küme'ye tıklayın.

  2. Foton hızlandırmayı temizleyin.

  3. Otomatik ölçeklendirmeyi etkinleştir seçeneğinin işaretini kaldırın.

  4. Gelişmiş performans'ın altında Spot örnekleri kullan seçeneğinin işaretini kaldırın.

  5. Gelişmiş ve Erişim modu altında Manuel'e tıklayın ve Ayrılmış (eski adı: Tek kullanıcı)'yı seçin.

  6. Spark'ın altında Spark yapılandırması altında aşağıdakileri girin:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Oluştur'utıklayın.

Küme boyutu gereksinimleri

Kümede yeterli görev yuvası varsa, küme başına bir gerçek zamanlı iş çalıştırabilirsiniz.

Düşük gecikme modunda çalışmak için toplam kullanılabilir görev yuvası sayısı tüm sorgu aşamalarındaki görev sayısından büyük veya buna eşit olmalıdır.

Slot hesaplama örnekleri

Tek aşamalı durumsuz boru hattı (Kafka kaynağı + sonlandırıcı)

maxPartitions = 8 ise en az 8 yuvaya ihtiyacınız vardır. maxPartitions ayarlanmadıysa Kafka konu bölümü sayısını kullanın.

durum bilgisi olan iki aşamalı işlem hattı (Kafka kaynağı + karıştırma):

maxPartitions = 8 ve karışık bölümler = 20 ise, 8 + 20 = 28 yuva gerekir.

Üç aşamalı boru hattı (Kafka kaynağı + karıştırma + yeniden bölümleme):

maxPartitions = 8 ve her biri 20 olan iki karıştırma aşamasıyla, toplamda 48 yuva gerekir: 8 + 20 + 20.

Dikkat edilmesi gereken temel konular

Kümenizi yapılandırırken bunu dikkate alın:

  • Mikro toplu iş modundan farklı olarak, gerçek zamanlı görevler verileri beklerken boşta kalabilir, bu nedenle kaynakların boşa harcanmasını önlemek için doğru boyutlandırma gereklidir.
  • Ayarlama yaparak hedef kullanım düzeyini (örneğin, 50%) hedefleyin:
    • maxPartitions (Kafka için)
    • spark.sql.shuffle.partitions (karıştırma aşaması için)
  • Databricks, her görevin ek yükü azaltmak için birden çok Kafka bölümünü işlemesi için maxPartitions ayarını önerir.
  • Çalışan başına görev yuvalarını, basit tek aşamalı işler için iş yüküyle eşleşecek şekilde ayarlayın.
  • Yoğun karıştırmalı işler için birikimlerden kaçınan en az karıştırma bölümü sayısını bulmak için denemeler yapın ve buradan ayarlayın. Kümede yeterli boşluk yoksa iş zamanlanmayacak.

Note

Databricks Runtime 16.4 LTS ve sonraki sürümlerde tüm gerçek zamanlı işlem hatları, gerçek zamanlı ve mikro toplu iş modları arasında sorunsuz geçişe olanak sağlayan checkpoint v2'yi kullanır.

Sorgu yapılandırması

Bir sorgunun düşük gecikme modu kullanılarak çalıştırılmasını belirtmek için gerçek zamanlı tetikleyiciyi etkinleştirmeniz gerekir. Ayrıca gerçek zamanlı tetikleyiciler yalnızca güncelleştirme modunda desteklenir. Örneğin:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

Daha önce, uçtan uca sorgu gecikme süresi toplu işlem süresine çok yakındı ve bu da toplu işlem süresini sorgu gecikmesinin iyi bir göstergesi haline getiriyordu. Ancak bu yöntem artık gerçek zamanlı modda uygulanmaz ve gecikme süresini ölçmek için alternatif yaklaşımlar gerektirir. Uçtan uca gecikme süresi iş yüküne özgüdür ve bazen yalnızca iş mantığıyla doğru bir şekilde ölçülebilir. Örneğin, kaynak zaman damgası Kafka'da çıkarılırsa, gecikme süresi Kafka'nın çıkış zaman damgası ile kaynak zaman damgası arasındaki fark olarak hesaplanabilir.

Akış işlemi sırasında toplanan kısmi bilgilere göre uçtan uca gecikme süresini çeşitli yollarla tahmin edebilirsiniz.

StreamingQueryProgress kullanma

Aşağıdaki ölçümler, sürücü günlüklerine otomatik olarak kaydedilen StreamingQueryProgress olayına dahildir. Bu öğelere StreamingQueryListener'un onQueryProgress() geri çağırma işlevi aracılığıyla da erişebilirsiniz. QueryProgressEvent.json() veya toString() fazladan gerçek zamanlı mod ölçümleri ekleyin.

  1. İşleme gecikme süresi (processingLatencyMs). Gerçek zamanlı mod sorgusunun bir kaydı okuması ile bu kaydın sonraki aşamaya veya sonraki işlemlere yazılmadan önce geçen süre. Tek aşamalı sorgular için bu, E2E gecikme süresiyle aynı süreyi ölçer. Bu ölçüm görev başına bildirilir.
  2. Kaynak kuyruğa alma gecikmesi (sourceQueuingLatencyMs). Bir kaydın ileti veri yoluna (örneğin, Kafka'daki günlük ekleme zamanı) başarıyla yazılması ve bu kaydın gerçek zamanlı mod sorgusu tarafından ilk kez okunması arasındaki geçen süre. Bu ölçüm görev başına bildirilir.
  3. E2E Gecikme Süresi (e2eLatencyMs). Kaydın bir ileti veri yolu için başarıyla yazıldığında ve kaydın gerçek zamanlı mod sorgusu tarafından aşağı akışa yazıldığındaki süre. Bu ölçüm, tüm görevler tarafından işlenen kayıtlar üzerinden her bir toplu iş için toplanır.

Örneğin:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

İşlerde Gözlem API'si kullanma

Gözlemleme API'si, başka bir iş başlatmadan gecikme süresini ölçmeye yardımcı olur. Kaynak veri varış zamanına yakın bir kaynak zaman damganız varsa ve havuza ulaşmadan önce geçirildiyse veya zaman damgasını geçirmenin bir yolunu bulabilirseniz, GözlemLEME API'sini kullanarak her toplu işlemin gecikme süresini tahmin edebilirsiniz:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Bu örnekte, girişin çıkışından önce geçerli bir zaman damgası kaydedilir ve bu zaman damgası ile kaydın kaynak zaman damgası arasındaki fark hesaplanarak gecikme süresi tahmin edilir. Sonuçlar devam eden raporlara eklenir ve dinleyicilerin kullanımına sunulur. Örnek çıktı aşağıdaki gibidir:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Neler desteklenir?

Environments

Küme Türü Supported
Tahsis Edilmiş (eski adı: tek kullanıcı) Yes
Standart (eski adı: paylaşılan) No
Lakeflow Spark Deklaratif Boru Hatları Klasik No
Lakeflow Spark Deklaratif İşlem Hatları Sunucusuz No
Serverless No

Languages

Language Supported
Scala Yes
Java Yes
Python Yes

Yürütme Modları

Yürütme Modu Supported
Güncelleştirme modu Yes
Append mode No
Tamamlama modu No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (Kafka Bağlayıcısı'nı kullanarak) Yes
Kinesis Evet (yalnızca EFO modu)
Google Pub/Sub (Mesajlaşma Hizmeti) No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (Kafka Bağlayıcısı'nı kullanarak) Yes
Kinesis No
Google Pub/Sub (Mesajlaşma Hizmeti) No
Apache Pulsar No
Rastgele Havuzlar (forEachWriter kullanarak) Yes

Operators

Operators Supported
Durum Bilgisi Olmayan İşlemler
  • Selection
Yes
  • Projection
Yes
UDFs
  • Scala UDF
Evet (bazı sınırlamalarla)
  • Python Kullanıcı Tanımlı Fonksiyonu (UDF)
Evet (bazı sınırlamalarla)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Toplama işlevleri Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Evet (durum sınırsız)
  • dropDuplicatesWithinWatermark
No
Akış - Tablo Birleştirme
  • Yayın tablosu (küçük olmalıdır)
Yes
Stream - Akışa Katılma No
(düz)MapGroupsWithState No
transformWithState Evet (bazı farklarla)
union Evet (bazı sınırlamalarla)
forEach Yes
forEachBatch No
mapPartitions Hayır (sınırlamaya bakın)

TransformWithState'i gerçek zamanlı modda kullanma

Databricks, durum bilgisi olan özel uygulamalar oluşturmak için Apache Spark Yapılandırılmış Akış'ta bir API'yi destekler transformWithState. API ve kod parçacıkları hakkında daha fazla bilgi için bkz. Durum bilgisi olan özel bir uygulama oluşturma .

Ancak, API'nin gerçek zamanlı modda davranışı ile mikro toplu iş mimarisinden yararlanan geleneksel akış sorguları arasında bazı farklılıklar vardır.

  • Gerçek zamanlı modda handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) yöntemi her satır için çağrılır.
    • Yineleyici inputRows tek bir değer döndürür. Mikro toplu iş modunda, her anahtar için bir kez çağrılır ve inputRows yineleyici, mikro toplu işteki bir anahtarın tüm değerlerini döndürür.
    • Kodlarını yazarken bu farka karşı duyarlı olmanız gerekir.
  • Olay zamanı zamanlayıcıları gerçek zamanlı modda desteklenmez.
  • Gerçek zamanlı modda süreölçerler, verinin varışına bağlı olarak tetiklemede gecikir. Aksi takdirde, veri yoksa, uzun süreli çalışan toplu iş sonunda tamamlanır. Örneğin, bir zamanlayıcının 10:00:00'da tetiklemesi gerekiyorsa ve aynı anda veri gelişi yoksa tetiklenmez. Bunun yerine, veriler 10:00:10'da ulaşırsa zamanlayıcı 10 saniyelik bir gecikmeyle tetiklenir. Veya veri gelmezse ve uzun süre çalışan toplu iş sonlandırılırsa, uzun süre çalışan toplu işlemi sonlandırmadan önce zamanlayıcıyı çalıştırır.

Python UDF'leri

Databricks, gerçek zamanlı modda Python kullanıcı tanımlı işlevlerin (UDF) çoğunluğunu destekler:

UDF türü Supported
Durum Bilgisi Olmayan UDF
Yes
  • Ok skaler UDF
Yes
Yes
  • Ok işlevi (mapInArrow)
Yes
Yes
Durum Bilgisi Olan Gruplama UDF (UDAF)
  • transformWithState (NOT: yalnızca Row arabirim)
Yes
  • applyInPandasWithState
No
Durum bilgisi olmayan Gruplama UDF (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Tablo işlevi
No
UC UDF No

Python UDF'lerini gerçek zamanlı modda kullanırken dikkate alınması gereken birkaç nokta vardır:

  • Gecikme süresini en aza indirmek için Ok toplu iş boyutunu (spark.sql.execution.arrow.maxRecordsPerBatch) 1 olarak yapılandırın.
    • Denge: Bu yapılandırma, aktarım hızına bağlı olarak gecikme süresini iyileştirir. Çoğu iş yükü için bu ayar önerilir.
    • Toplu iş boyutunu yalnızca giriş hacmine uyum sağlamak için daha yüksek bir aktarım hızı gerekiyorsa artırın ve gecikme süresindeki olası artışı kabul edin.
  • Pandas UDF'leri ve işlevleri, Ok toplu iş boyutu 1 ile iyi performans göstermemektedir.
    • Pandas UDF'leri veya işlevleri kullanıyorsanız, Ok toplu iş boyutunu daha yüksek bir değere ayarlayın (örneğin, 100 veya üzeri).
    • Bunun daha yüksek gecikme süresi anlamına geldiğini unutmayın. Databricks, mümkünse Ok UDF'sini veya işlevini kullanmanızı önerir.
  • Pandas ile ilgili performans sorunu nedeniyle transformWithState yalnızca arabirimiyle Row desteklenir.

İyileştirme teknikleri

Technique Varsayılan olarak etkin
Asenkron ilerleme izleme: Yazmayı, offset günlüğü ve taahhüt günlüğüne asenkron bir iş parçacığına kaydırarak iki mikro toplu işlem arasındaki süreyi azaltır. Bu, durum bilgisi olmayan akış sorgularının gecikme süresini azaltmaya yardımcı olabilir. No
Zaman uyumsuz durum denetim noktası oluşturma: Durum denetimi için beklemeden önceki mikro toplu işlemin tamamlanmasından hemen sonra sonraki mikro toplu işlemi işlemeye başlayarak durum bilgisi olan akış sorgularının gecikme süresini azaltmaya yardımcı olur. No

Limitations

Kaynak sınırlaması

Kinesis için yoklama modu desteklenmez. Ayrıca sık yapılan yeniden bölümlendirmeler gecikme süresini olumsuz etkileyebilir.

Birleşim sınırlaması

Union için bazı sınırlamalar vardır:

  • Kendi kendine birleşim desteklenmez:
    • Kafka: Aynı kaynak veri çerçevesi nesnesini ve birleşimden türetilmiş veri çerçevelerini kullanamazsınız. Geçici çözüm: Aynı kaynaktan okunan farklı Veri Çerçeveleri kullanın.
    • Kinesis: Aynı Kinesis kaynağından türetilen veri çerçevelerini aynı yapılandırmayla ilişkilendiremezsiniz. Geçici çözüm: Farklı Veri Çerçeveleri kullanmanın yanı sıra, her DataFrame'e farklı bir 'consumerName' seçeneği atayabilirsiniz.
  • Birleşimden önce tanımlanmış durum bilgisine sahip işleçler (örneğin, aggregate, deduplicate, transformWithState) desteklenmez.
  • Toplu kaynaklarla birleşim desteklenmez.

MapPartitions sınırlaması

mapPartitions Scala'da ve benzer Python API'lerinde (mapInPandas, mapInArrow) giriş bölümünün tamamının yineleyicisini alır ve giriş ile çıkış arasında rastgele eşleme ile çıkışın tamamının yineleyicisini oluşturur. Bu API'ler, çıkışın tamamını engelleyerek Akış Real-Time Modu'nda performans sorunlarına neden olabilir ve bu da gecikme süresini artırır. Bu API'lerin semantiği filigran yayılımını iyi desteklemez.

Karmaşık veri türlerini dönüştürme ile birlikte skaler UDF'leri kullanın veya filter bunun yerine benzer işlevler elde edin.

Examples

Aşağıdaki örneklerde desteklenen sorgular gösterilmektedir.

Durum bilgisi olmayan sorgular

Tüm tek aşamalı veya çok aşamalı durum bilgisi olmayan sorgular desteklenir.

Kafka kaynağı-Kafka havuzu

Bu örnekte, bir Kafka kaynağından okur ve bir Kafka havuzuna yazarsınız.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Repartition

Bu örnekte, kafka kaynağından okuma, verileri 20 bölüme yeniden bölümleme ve kafka havuzuna yazma.

Yeniden bölümleme işleminden önce Spark yapılandırmasını spark.sql.execution.sortBeforeRepartitionfalse olarak ayarlayın.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Stream-snapshot join (yalnızca yayın)

Bu örnekte Kafka'dan okur, verileri statik bir tabloyla birleştirir ve kafka havuzuna yazarsınız. Yalnızca statik tabloyu yayınlayan akış statik birleşimlerinin desteklendiğini unutmayın; bu da statik tablonun belleğe sığması gerektiği anlamına gelir.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Kinesis kaynağı-Kafka havuzu

Bu örnekte, bir Kinesis kaynağından okuyorsunuz ve Kafka havuzuna yazıyorsunuz.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

Bu örnekte, iki farklı konudan iki Kafka DataFrame'i birleştirilir ve bir Kafka havuzuna yazarsınız.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Durum bilgisi olan sorgular

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Toplama ile Birleşme

Bu örnekte, önce iki farklı konudan iki Kafka DataFrame'i bir araya getirip bir toplama yaparsınız. Sonunda Kafka havuzuna yazarsınız.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Gerçek zamanlı mod ile Yapılandırılmış Akış'taki diğer çalışma modlarının StatefulProcessor içindeki çalışma yöntemleri transformWithState arasında bir fark vardır. Bkz . TransformWithState'i gerçek zamanlı modda kullanma

TransformWithState (PySpark, Satır arabirimi)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Gerçek zamanlı mod ile Yapılandırılmış Akış'taki diğer yürütme modlarının içinde StatefulProcessorçalıştırma şekli transformWithState arasında bir fark vardır. Bkz . TransformWithState'i gerçek zamanlı modda kullanma

Sinks

foreachSink aracılığıyla Postgres'e yazma

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Bu özellik Databricks Runtime 17.1 ve sonraki sürümlerde kullanılabilir.

Görüntüleme hızı kaynağı

Bu örnekte, bir hız kaynağından okur ve akış DataFrame'i bir not defterinde görüntülersiniz.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())