Aracılığıyla paylaş


Yapılandırılmış Akış'ta gerçek zamanlı mod

Gerçek zamanlı mod, uçtan uca gecikme süresi beş milisaniyeye kadar düşük olan ultra düşük gecikme süresine sahip veri işlemeye olanak tanıyan Yapılandırılmış Akış için bir tetikleyici türüdür. Sahtekarlık algılama, gerçek zamanlı kişiselleştirme ve anlık karar alma sistemleri gibi akış verilerine anında yanıt gerektiren operasyonel iş yükleri için gerçek zamanlı modu kullanın.

Gerçek zamanlı mod Databricks Runtime 16.4 LTS ve üzerinde kullanılabilir. Adım adım kurulum yönergeleri için bkz. Gerçek zamanlı modu kullanmaya başlama. Kod örnekleri için bkz . Gerçek zamanlı mod örnekleri.

Gerçek zamanlı mod nedir?

operasyonel ve analitik iş yükleri karşılaştırması

Akış iş yükleri, analitik iş yüklerine ve operasyonel iş yüklerine geniş ölçüde ayrılabilir:

  • Analitik iş yükleri genellikle madalyon mimarisini (örneğin, verileri bronz, gümüş ve altın tablolara alma) izleyerek veri alımı ve dönüşümü kullanır.
  • operasyonel iş yükleri gerçek zamanlı verileri kullanır, iş mantığı uygular ve aşağı akış eylemlerini veya kararlarını tetikler.

İşletimsel iş yüklerine bazı örnekler şunlardır:

  • Olağan dışı konum, büyük işlem boyutu veya hızlı harcama desenleri gibi faktörlere bağlı olarak, bir dolandırıcılık puanı eşiği aşarsa kredi kartı işlemini gerçek zamanlı olarak engelleme veya işaretleme.
  • Kullanıcının beş dakikadır kot pantolonlara göz atmakta olduğunu gösteren tıklama akışı verileri, kullanıcıya sonraki 15 dakika içinde satın almaları durumunda %25 indirim sunan tanıtım amaçlı bir mesaj göndermeyi sağlar.

Genel olarak işletimsel iş yükleri, saniyenin altında uçtan uca gecikme süresi gereksinimiyle karakterize edilir. Bu, Apache Spark Yapılandırılmış Akış'ta gerçek zamanlı modla elde edilebilir.

Gerçek zamanlı mod düşük gecikme süresine nasıl ulaşır?

Gerçek zamanlı mod şu şekilde yürütme mimarisini geliştirir:

  • Sistemin, kaynakta veri kullanılabilir hale geldikçe işlediği, varsayılanı beş dakika olan uzun süreli toplu işlemleri yürütmesi.
  • Sorgunun tüm aşamalarını aynı anda zamanlama. Bu, kullanılabilir görev yuvalarının sayısının bir toplu işteki tüm aşamaların görev sayısına eşit veya daha fazla olmasını gerektirir.
  • Akış karıştırma kullanılarak üretildiği anda aşamalar arasında veri geçirme.

Toplu işlemi işlemenin sonunda ve sonraki toplu işlem başlamadan önce Yapılandırılmış Akış denetim noktaları ilerleme durumunu gösterir ve ölçümleri yayımlar. Toplu işlem süresi denetim noktası oluşturma sıklığını etkiler:

  • Daha uzun toplu işlemler: Daha seyrek denetim noktası oluşturma, bu da hatalar üzerinde daha uzun süren yeniden yürütmeler ve gecikmeli ölçüm kullanılabilirliği anlamına gelir.
  • Daha kısa toplu işlemler: Gecikme süresini etkileyebilecek daha sık denetim noktaları.

Databricks, uygun tetikleyici aralığını bulmak için gerçek zamanlı modu hedef iş yükünüzle karşılaştırmanızı önerir.

Gerçek zamanlı mod ne zaman kullanılır?

Kullanım örneğiniz için gereken gerçek zamanlı modu seçin:

  • Saniye altı gecikme süresi: İşlemleri gerçek zamanlı olarak engellemesi gereken sahtekarlık algılama sistemleri gibi verilere milisaniye içinde yanıt vermesi gereken uygulamalar.
  • operasyonel karar alma: Gerçek zamanlı teklifler, uyarılar veya bildirimler gibi gelen verilere göre anında eylemleri tetikleyen sistemler.
  • Sürekli işleme: Verilerin düzenli toplu işlemler yerine en kısa sürede işlenmesi gereken iş yükleri.

Aşağıdaki durumlarda mikro toplu iş modunu (varsayılan Yapılandırılmış Akış tetikleyicisi) kullanın:

  • Analitik işleme: Gecikme süresi gereksinimlerinin saniye veya dakika cinsinden ölçüldüğü ETL işlem hatları, veri dönüşümleri ve madalyon mimari uygulamaları.
  • Maliyet iyileştirme: Gerçek zamanlı mod ayrılmış işlem kaynakları gerektirdiği için saniye alt gecikme süresinin gerekli olmadığı iş yükleri.
  • Denetim noktası sıklığı önemlidir: Daha hızlı kurtarma için daha sık denetim noktası oluşturmadan yararlanan uygulamalar.

Gereksinimler ve yapılandırma

Gerçek zamanlı modun işlem kurulumu ve sorgu yapılandırması için belirli gereksinimleri vardır. Bu bölümde gerçek zamanlı modu kullanmak için gereken önkoşullar ve yapılandırma adımları açıklanmaktadır.

Önkoşullar

Gerçek zamanlı modu kullanmak için aşağıdaki gereksinimleri karşılamanız gerekir:

  • Databricks Runtime 16.4 LTS veya üzeri: Gerçek zamanlı mod yalnızca DBR 16.4 LTS ve sonraki sürümlerde kullanılabilir.
  • Ayrılmış işlem: Ayrılmış (eski adıyla tek kullanıcı) bir işlem kullanmanız gerekir. Standard (eski adıyla paylaşılan), Lakeflow Spark Deklaratif İşlem Hatları ve sunucusuz kümeler desteklenmez.
  • Otomatik ölçeklendirme yok: Otomatik ölçeklendirme devre dışı bırakılmalıdır.
  • Foton yok: Foton hızlandırma gerçek zamanlı modda desteklenmez.
  • Spark yapılandırması: spark.databricks.streaming.realTimeMode.enabled olarak ayarlamanız true gerekir.

İşlem yapılandırması

Bilgisayarınızı aşağıdaki ayarlarla yapılandırın:

  • Spark yapılandırmasında spark.databricks.streaming.realTimeMode.enabled'yi true olarak ayarlayın.
  • Otomatik ölçeklendirmeyi devre dışı bırakın.
  • Foton hızlandırmayı devre dışı bırakın.
  • İşlemin ayrılmış bir küme olarak yapılandırıldığından emin olun (standart değil, Lakeflow Spark Bildirimli İşlem Hatları veya sunucusuz).

gerçek zamanlı mod için işlem oluşturma ve yapılandırma hakkında adım adım yönergeler için bkz. Gerçek zamanlı modu kullanmaya başlama.

Sorgu yapılandırması

Bir sorguyu gerçek zamanlı modda çalıştırmak için gerçek zamanlı tetikleyiciyi etkinleştirmeniz gerekir. Gerçek zamanlı tetikleyiciler yalnızca güncelleştirme modunda desteklenir.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Hesaplama boyutlandırma

İşlemde yeterli görev yuvası varsa işlem kaynağı başına bir gerçek zamanlı iş çalıştırabilirsiniz.

Düşük gecikme modunda çalışmak için toplam kullanılabilir görev yuvası sayısı tüm sorgu aşamalarındaki görev sayısından büyük veya buna eşit olmalıdır.

Slot hesaplama örnekleri

İşlem hattı türü Konfigürasyon Gerekli yuvalar
Tek aşamalı durum bilgisi olmayan (Kafka kaynağı + havuz) maxPartitions = 8 8 slota
durum bilgisi olan iki aşamalı (Kafka kaynağı + karıştırma) maxPartitions = 8, karışık bölümler = 20 28 yuva (8 + 20)
Üç aşamalı (Kafka kaynağı + karıştırma + yeniden bölümleme) maxPartitions = 8, her biri 20 aşamadan oluşan iki karıştırma aşaması 48 yuva (8 + 20 + 20)

ayarlamazsanız maxPartitionsKafka konu başlığındaki bölüm sayısını kullanın.

Dikkat edilmesi gereken temel konular

Hesaplama ortamınızı yapılandırırken aşağıdakileri göz önünde bulundurun:

  • Mikro toplu iş modundan farklı olarak, gerçek zamanlı görevler verileri beklerken boşta kalabilir, bu nedenle kaynakların boşa harcanmasını önlemek için doğru boyutlandırma gereklidir.
  • Ayarlama yaparak hedef kullanım düzeyini (örneğin, 50%) hedefleyin:
    • maxPartitions (Kafka için)
    • spark.sql.shuffle.partitions (karıştırma aşaması için)
  • Databricks, her görevin ek yükü azaltmak için birden fazla Kafka bölümünü işlemesi için maxPartitions'yi ayarlamanızı önerir.
  • Çalışan başına görev yuvalarını, basit tek aşamalı işler için iş yüküyle eşleşecek şekilde ayarlayın.
  • Yoğun karıştırmalı işler için birikimlerden kaçınan en az karıştırma bölümü sayısını bulmak için denemeler yapın ve buradan ayarlayın. Bilgisayar sistemi yeterli yuvaya sahip değilse işi zamanlamaz.

Note

Databricks Runtime 16.4 LTS ve üzeri tüm gerçek zamanlı işlem hatları, gerçek zamanlı ve mikro toplu iş modları arasında sorunsuz geçişe olanak tanıyan checkpoint v2'yi kullanır.

İyileştirme teknikleri

Gerçek zamanlı modda gecikme süresini azaltmak için aşağıdaki teknikleri kullanın:

  • Zaman uyumsuz ilerleme izleme: Yazma işlemlerini uzaklığa taşır ve günlükleri zaman uyumsuz bir iş parçacığına işler ve durum bilgisi olmayan sorgular için toplu işler arası süreyi azaltır.
  • Asenkron durum denetim noktası oluşturma: Durum denetim noktasını oluşturmadan, işlem tamamlanır tamamlanmaz bir sonraki mikro toplu işlemi işlemeye başlar ve durum bilgisi olan sorgular için gecikmeyi azaltır.

Note

Her iki teknik de varsayılan olarak etkin değildir. Bunları ayrı olarak etkinleştirmeniz gerekir.

İzleme ve gözlemlenebilirlik

Gerçek zamanlı iş yükleri için sorgu performansının ölçülmesi önemlidir. Gerçek zamanlı modda geleneksel toplu iş süresi ölçümleri gerçek gecikme süresini yansıtmadığından alternatif yaklaşımlara ihtiyacınız vardır.

Uçtan uca gecikme süresi iş yüküne özgüdür ve bazen yalnızca iş mantığıyla doğru bir şekilde ölçülebilir. Örneğin, kaynak zaman damgası Kafka'da çıktı ise, gecikme süresini Kafka'nın çıkış zaman damgası ile kaynak zaman damgası arasındaki fark olarak hesaplayabilirsiniz.

Ayrıca, aşağıda açıklanan yerleşik ölçümleri ve API'leri kullanarak uçtan uca gecikme süresini tahmin edebilirsiniz.

Yerleşik ölçümler StreamingQueryProgress ile

Aşağıdaki ölçümler, sürücü günlüklerine otomatik olarak kaydedilen StreamingQueryProgress olayına dahildir. Bu öğelere StreamingQueryListener'un onQueryProgress() geri çağırma işlevi aracılığıyla da erişebilirsiniz. QueryProgressEvent.json() veya toString() fazladan gerçek zamanlı mod ölçümleri ekleyin.

  1. İşleme gecikme süresi (processingLatencyMs). Gerçek zamanlı mod sorgusunun bir kaydı okuması ile sorgunun bir sonraki aşamaya veya aşağı akışa yazması arasında geçen süre. Tek aşamalı sorgular için bu, E2E gecikme süresiyle aynı süreyi ölçer. Sistem bu ölçümü görev başına bildirir.
  2. Kaynak kuyruğa alma gecikmesi (sourceQueuingLatencyMs). Sistemin bir ileti veriyoluna kayıt yazması (örneğin, Kafka'daki günlük ekleme zamanı) ile gerçek zamanlı mod sorgusunun kaydı ilk kez okuması arasında geçen süre. Sistem bu ölçümü görev başına bildirir.
  3. E2E Gecikme Süresi (e2eLatencyMs). Sistemin kaydı bir ileti veri yolu'na yazması ile gerçek zamanlı mod sorgusunun kaydı aşağı akışa yazması arasındaki süre. Sistem, tüm görevler tarafından işlenen tüm kayıtlarda toplu iş başına bu ölçümü birleştirir.

Örneğin:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Gözlem API'siyle özel gecikme süresi ölçümü

Gözlemleme API'si, başka bir iş başlatmadan gecikme süresini ölçmeye yardımcı olur. Kaynak veri varış zamanına yakın bir kaynak zaman damganız varsa, Gözlemleme API'sini kullanarak her toplu işlemin gecikme süresini tahmin edebilirsiniz. Havuza ulaşmadan önce zaman damgasını geçirin:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Bu örnekte, girişin çıkışından önce geçerli bir zaman damgası kaydedilir ve bu zaman damgası ile kaydın kaynak zaman damgası arasındaki fark hesaplanarak gecikme süresi tahmin edilir. Sonuçlar devam eden raporlara eklenir ve dinleyicilerin kullanımına sunulur. Örnek çıktı aşağıdaki gibidir:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Özellik desteği ve sınırlamaları

Bu bölümde, uyumlu ortamlar, diller, kaynaklar, havuzlar, işleçler ve belirli özellikler için dikkat edilmesi gerekenler dahil olmak üzere gerçek zamanlı modun desteklenen özellikleri ve geçerli sınırlamaları açıklanmaktadır.

Desteklenen ortamlar, diller ve modlar

Desteklenen diller: Gerçek zamanlı mod Scala, Java ve Python'ı destekler.

Desteklenen işlem türleri:

İşlem türü Supported
Tahsis Edilmiş (eski adı: tek kullanıcı)
Standart (eski adı: paylaşılan) ✓ (yalnızca Python)
Lakeflow Spark Deklaratif Boru Hatları Klasik Desteklenmiyor
Lakeflow Spark Deklaratif İşlem Hatları Sunucusuz Desteklenmiyor
Serverless Desteklenmiyor

Desteklenen yürütme modları:

Yürütme modu Supported
Güncelleştirme modu
Append mode Desteklenmiyor
Tamamlama modu Desteklenmiyor

Kaynak ve havuz desteği

Kaynak veya havuz Kaynak olarak Havuz olarak
Apache Kafka
Event Hubs (Kafka bağlayıcısı kullanarak)
Kinesis ✓ (yalnızca EFO modu) Desteklenmiyor
AWS MSK Desteklenmiyor
Delta Desteklenmiyor Desteklenmiyor
Google Pub/Sub (Mesajlaşma Hizmeti) Desteklenmiyor Desteklenmiyor
Apache Pulsar Desteklenmiyor Desteklenmiyor
Rastgele havuzlar (kullanarak forEachWriter) Uygulanamaz

Desteklenen operatörler

Operators Supported
Durum Bilgisi Olmayan İşlemler
Selection
Projection
UDF'ler
Scala UDF ✓ (bazı sınırlamalarla)
Python Kullanıcı Tanımlı Fonksiyonu (UDF) ✓ (bazı sınırlamalarla)
Toplama
sum
count
max
min
avg
Toplama işlevleri
Pencereleme
Tumbling
Sliding
Session Desteklenmiyor
Deduplication
KopyalarıKaldır ✓ (durum sınırsız)
dropDuplicatesWithinWatermark Desteklenmiyor
Stream - Tablo Birleştirme
Yayın tablosu (küçük olmalıdır)
Stream - Akışa Katılma Desteklenmiyor
(düz)MapGroupsWithState Desteklenmiyor
transformWithState ✓ (bazı farklılıklarla)
union ✓ (bazı sınırlamalarla)
forEach
forEachBatch Desteklenmiyor
mapPartitions Desteklenmiyor (sınırlamaya bakın)

Dikkat edilmesi gereken özel noktalar

Bazı işleçler ve özellikler gerçek zamanlı modda kullanıldığında dikkat edilmesi gereken belirli noktalara veya farklılıklara sahiptir.

gerçek zamanlı modda transformWithState

Databricks, durum bilgisi olan özel uygulamalar oluşturmak için Apache Spark Yapılandırılmış Akış'ta bir API'yi destekler transformWithState. API ve kod parçacıkları hakkında daha fazla bilgi için bkz. Durum bilgisi olan özel bir uygulama oluşturma .

Ancak, API'nin gerçek zamanlı modda davranışı ile mikro toplu iş mimarisinden yararlanan geleneksel akış sorguları arasında bazı farklılıklar vardır.

  • Gerçek zamanlı mod, her satır için handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) yöntemini çağırır.
    • Yineleyici inputRows tek bir değer döndürür. Mikro toplu iş modu bunu her anahtar için bir kez çağırır ve inputRows yineleyici, mikro toplu işteki bir anahtarın tüm değerlerini döndürür.
    • Kodunuzu yazarken bu farkın farkında olmanız gerekir.
  • Olay zamanı zamanlayıcıları gerçek zamanlı modda desteklenmez.
  • Gerçek zamanlı modda, zamanlayıcıların çalışması veri gelişine bağlı olarak gecikir.
    • Süreölçer 10:00:00 olarak zamanlandıysa ancak veri gelmiyorsa zamanlayıcı hemen tetiklenmiyor.
    • Veriler 10:00:10'da ulaşırsa zamanlayıcı 10 saniyelik bir gecikmeyle tetikler.
    • Veri gelmezse ve uzun süreli çalışan toplu işlem sonlandırılmak üzereyse, toplu işlem sonlandırılmadan önce zamanlayıcı tetiklenir.

Gerçek zamanlı modda Python UDF'leri

Databricks, gerçek zamanlı modda Python kullanıcı tanımlı işlevlerin (UDF) çoğunluğunu destekler:

Kategori UDF türü Supported
Durumsuz Python skaler UDF (Kullanıcı tanımlı skaler işlevler - Python)
Durumsuz Arrow skaler UDF
Durumsuz Pandas skaler UDF (pandas kullanıcı tanımlı işlevler)
Durumsuz Ok işlevi (mapInArrow)
Durumsuz Pandas işlevi (Harita)
Duruma duyarlı gruplandırma (UDAF) transformWithState (Row yalnızca arabirim)
Durum bilgisi olan gruplandırma (UDAF) applyInPandasWithState Desteklenmiyor
Durum bilgisi olmayan gruplandırma (UDAF) apply Desteklenmiyor
Durum bilgisi olmayan gruplandırma (UDAF) applyInArrow Desteklenmiyor
Durumsuz gruplandırma (UDAF) applyInPandas Desteklenmiyor
Tablo işlevi UDTF (Python kullanıcı tanımlı tablo işlevleri (UDF)) Desteklenmiyor
Tablo işlevi UC UDF Desteklenmiyor

Python UDF'lerini gerçek zamanlı modda kullanırken dikkate alınması gereken birkaç nokta vardır:

  • Gecikme süresini en aza indirmek için Ok toplu iş boyutunu (spark.sql.execution.arrow.maxRecordsPerBatch) 1 olarak yapılandırın.
    • Fedakarlık: Bu yapılandırma, geçiş hızından ödün vererek gecikme süresini iyileştirir. Çoğu iş yükü için bu ayar önerilir.
    • Toplu iş boyutunu yalnızca giriş hacmine uyum sağlamak için daha yüksek bir aktarım hızı gerekiyorsa artırın ve gecikme süresindeki olası artışı kabul edin.
  • Pandas UDF'leri ve işlevleri, Arrow toplu iş boyutu 1 olduğunda iyi performans göstermemektedir.
    • Pandas UDF'leri veya işlevleri kullanıyorsanız, Arrow toplu iş boyutunu daha yüksek bir değere ayarlayın (örneğin, 100 veya daha fazla).
    • Bunun daha yüksek gecikme süresi anlamına geldiğini unutmayın. Databricks, mümkünse Ok UDF'sini veya işlevini kullanmanızı önerir.
  • Pandas ile ilgili performans sorunu nedeniyle transformWithState yalnızca arabirimiyle Row desteklenir.

Limitations

Kaynak sınırlamaları

Kinesis için gerçek zamanlı mod yoklama modunu desteklemez. Ayrıca sık yapılan yeniden bölümlendirmeler gecikme süresini olumsuz etkileyebilir.

Birleşim sınırlamaları

Union işlecinin bazı sınırlamaları vardır:

  • Gerçek zamanlı mod kendi kendine birleşmeyi desteklemez:
    • Kafka: Aynı kaynak veri çerçevesi nesnesini ve birleşimden türetilmiş veri çerçevelerini kullanamazsınız. Geçici çözüm: Aynı kaynaktan okunan farklı DataFrame'ler kullanın.
    • Kinesis: Aynı Kinesis kaynağından türetilen veri çerçevelerini aynı yapılandırmayla ilişkilendiremezsiniz. Geçici çözüm: Farklı DataFrame'ler kullanmanın yanı sıra, her DataFrame'e farklı bir 'consumerName' seçeneği atayabilirsiniz.
  • Gerçek zamanlı mod, Birleşimden önce tanımlanan durum bilgisi olan işleçleri (örneğin, aggregate, deduplicate, transformWithState) desteklemez.
  • Gerçek zamanlı mod, toplu kaynaklarla birleşimi desteklemez.

MapPartitions sınırlaması

mapPartitions Scala'da ve benzer Python API'lerinde (mapInPandas, mapInArrow) giriş bölümünün tamamının yineleyicisini alır ve giriş ile çıkış arasında rastgele eşleme ile çıkışın tamamının yineleyicisini oluşturur. Bu API'ler, çıkışın tamamını engelleyerek Akış Real-Time Modu'nda performans sorunlarına neden olabilir ve bu da gecikme süresini artırır. Bu API'lerin semantiği filigran yayılımını iyi desteklemez.

Karmaşık veri türlerini dönüştürme ile birlikte skaler UDF'leri kullanın veya filter bunun yerine benzer işlevler elde edin.

Sonraki Adımlar

Gerçek zamanlı modun ne olduğunu ve nasıl yapılandırabileceğinizi anladığınıza göre, gerçek zamanlı akış uygulamaları uygulamaya başlamak için şu kaynakları inceleyin: