Azure Databricks'da Yapılandırılmış Akış sorgularını izleme

Azure Databricks, Streaming sekmesinin altındaki Spark kullanıcı arabirimi aracılığıyla Yapılandırılmış Akış uygulamaları için yerleşik izleme sağlar.

Spark kullanıcı arabiriminde Yapılandırılmış Akış sorgularını ayırt etme

Spark kullanıcı arabiriminde hangi ölçümlerin hangi akışa ait olduğunu kolayca ayırt etmek için kodunuza ekleyerek .queryName(<query-name>) akışlarınıza writeStream benzersiz bir sorgu adı sağlayın.

Yapılandırılmış Akış ölçümlerini dış hizmetlere gönderme

Akış ölçümleri, Apache Spark'ın Akış Sorgu Dinleyicisi arabirimi kullanılarak uyarı veya pano kullanım örnekleri için dış hizmetlere gönderilebilir. Databricks Runtime 11.3 LTS ve üzerinde StreamingQueryListener, Python ve Scala'da kullanılabilir.

Important

Unity Kataloğu özellikli işlem erişim modlarını kullanan iş yükleri için aşağıdaki sınırlamalar geçerlidir:

  • StreamingQueryListener, kimlik bilgilerini kullanmak veya ayrılmış erişim moduyla bilgisayarda Unity Kataloğu tarafından yönetilen nesnelerle etkileşime geçmek için Databricks Runtime 15.1 veya daha üstünü gerektirir.
  • StreamingQueryListener, standart erişim modu (eski adıyla paylaşılan erişim modu) ile yapılandırılmış Scala iş yükleri için Databricks Runtime 16.1 veya üzerini gerektirir.

Note

Dinleyicilerle işlem gecikmesi, sorgu işleme hızlarını önemli ölçüde etkileyebilir. Bu dinleyicilerde işleme mantığını sınırlamanız ve verimlilik için Kafka gibi hızlı yanıt sistemlerine yazmayı tercih etmeniz tavsiye edilir.

Sorguda kaynakta kullanılabilir veri yoksa ve yeni veri bekliyorsa, akış sorgusu dinleyicisine bir onQueryIdle ileti gönderilir. İleti onQueryProgress yalnızca akış sorgusu toplu işleminin sonunda teslim edilir. Sorgu uzun süredir verileri işliyorsa, ne onQueryIdle ne de onQueryProgress olayları gönderilmeyebilir, ancak sorgu hala sağlıklıdır ve verileri işlemeye devam eder.

Aşağıdaki kod, dinleyici uygulamaya yönelik söz diziminin temel örneklerini sağlar:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Yapılandırılmış Akış'ta gözlemlenebilir ölçümleri tanımlama

Gözlemlenebilir ölçümler, sorguda (DataFrame) tanımlanabilen rastgele toplama işlevleri olarak adlandırılır. DataFrame'in çalışması bir tamamlanma noktasına ulaşır ulaşmaz (bir toplu iş sorgusunu tamamlar veya akış dönemine ulaşır), son tamamlanma noktasından bu yana işlenen verilerin ölçümlerini içeren adlandırılmış bir olay yayılır.

Spark oturumuna bir dinleyici ekleyerek bu ölçümleri gözlemleyebilirsiniz. Dinleyici, yürütme moduna bağlıdır.

  • Toplu işlem modu: QueryExecutionListener kullanın.

    QueryExecutionListener sorgu tamamlandığında çağrılır. Haritayı kullanarak ölçümlere erişin QueryExecution.observedMetrics .

  • Akış veya mikro yığın işleme: kullanın StreamingQueryListener.

    Akış sorgusu bir dönemi tamamladığında StreamingQueryListener çağrılır. Haritayı kullanarak ölçümlere erişin StreamingQueryProgress.observedMetrics . Azure Databricks akış için continuous tetikleyici modunu desteklemez.

Örneğin:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Unity Kataloğu, Delta Lake ve Yapılandırılmış Akış ölçümleri tablo tanımlayıcılarını eşleme

Yapılandırılmış Akış ölçümleri, bir akış sorgusu için kaynak olarak kullanılan delta tablosunun benzersiz kimliği için alanı birkaç yerde kullanır reservoirId .

reservoirId alanı, Delta tablosu tarafından Delta işlem günlüğünde depolanan benzersiz tanımlayıcıyı eşleştirir. Bu kimlik, Unity Kataloğu tarafından atanan ve Katalog Gezgini'nde görüntülenen değerle eşlenmeztableId.

Delta tablosunun tablo tanımlayıcısını gözden geçirmek için aşağıdaki söz dizimini kullanın. Bu, Unity Catalog tarafından yönetilen tablolar, Unity Catalog dış tablolar ve tüm Hive metastore Delta tablolarında çalışır:

DESCRIBE DETAIL <table-name>

Sonuçlarda görüntülenen id alanı, akış ölçümlerindeki reservoirId ile eşleşen tanımlayıcıdır.

StreamingQueryListener nesne ölçümleri

Fields Description
id Yeniden başlatmalar arasında kalıcı olan benzersiz bir sorgu kimliği.
runId Her başlatma/yeniden başlatma için benzersiz bir sorgu kimliği. Bkz. StreamingQuery.runId().
name Sorgunun kullanıcı tarafından belirtilen adı. Ad belirtilmezse ad null olur.
timestamp Mikro toplu iş yürütme zaman damgası.
batchId İşlenmekte olan mevcut veri kümesinin benzersiz kimliği. Bir hatadan sonra yeniden denemeler söz konusu olduğunda, belirli bir toplu iş kimliği birden fazla kez çalıştırılabilir. Benzer şekilde, işlenecek veri olmadığında toplu iş kimliği artırılmaz.
batchDuration Bir toplu işlemi milisaniye cinsinden işleme süresi.
numInputRows Bir tetikleyicide işlenen toplam kayıt sayısı (tüm kaynaklar arasında).
inputRowsPerSecond Gelen verilerin toplam oranı (tüm kaynaklar arasında).
processedRowsPerSecond Spark'ın verileri işleme oranı (tüm kaynaklar genelinde).

StreamingQueryListener ayrıca, müşteri ölçümleri ve kaynak ilerleme durumu ayrıntıları için inceleyebileceğiniz nesneleri içeren aşağıdaki alanları tanımlar:

Fields Description
durationMs Tür: ju.Map[String, JLong]. Bkz durationMs nesnesi.
eventTime Tür: ju.Map[String, String]. Bkz. eventTime nesnesi.
stateOperators Tür: Array[StateOperatorProgress]. Bkz stateOperators nesnesi.
sources Tür: Array[SourceProgress]. Bakınız sources nesnesi.
sink Tür: SinkProgress. Bkz. havuz nesnesi.
observedMetrics Tür: ju.Map[String, Row]. DataFrame/sorgu üzerinde tanımlanabilir rastgele toplama işlevleri (örneğin df.observe).

durationMs nesnesi

Nesne türü: ju.Map[String, JLong]

Mikro toplu işlem yürütme işleminin çeşitli aşamalarını tamamlamak için geçen süre hakkında bilgi.

Fields Description
durationMs.addBatch Mikro toplu işlemi yürütmek için geçen süre. Bu, Spark'ın mikro toplu işlemi planlamak için gereken süreyi dışlar.
durationMs.getBatch Kaynaktan uzaklıklarla ilgili meta verileri almak için geçen süre.
durationMs.latestOffset Mikro yığın için kullanılan en son ofset. Bu ilerleme nesnesi, kaynaklardan en son öteleme değerini almak için geçen süreyi ifade eder.
durationMs.queryPlanning Yürütme planını oluşturmak için geçen süre.
durationMs.triggerExecution Mikro toplu işlemi planlamak ve yürütmek için gereken süre.
durationMs.walCommit Yeni kullanılabilir ofsetleri taahhüt etmek için alınan süre.
durationMs.commitBatch addBatch sırasında hedef birime yazılan verileri kaydetmek için geçen süre. Yalnızca işlemeyi destekleyen veri işleyicileri için mevcuttur.
durationMs.commitOffsets Toplu işlemi işlem günlüğüne kaydetmek için geçen süre.

eventTime nesnesi

Nesne türü: ju.Map[String, String]

Mikro toplu işlemde işlenen veriler içinde görülen olay zamanı değeri hakkında bilgi. Bu veriler, filigran tarafından, Yapılandırılmış Akış işinde tanımlanan durum bilgisine sahip toplamaları işlemek üzere durumu nasıl uygun hale getirilmesini belirlerken kullanılır.

Fields Description
eventTime.avg Bu tetikleyicide görülen ortalama olay süresi.
eventTime.max Bu tetikleyicide görülen en yüksek olay süresi.
eventTime.min Bu tetikleyicide görülen en düşük olay süresi.
eventTime.watermark Bu tetikleyicide kullanılan filigranın değeri.

stateOperators nesnesi

Nesne türü: Array[StateOperatorProgress] Nesne, stateOperators Yapılandırılmış Akış işinde tanımlanan durum bilgisi olan işlemler ve bunlardan oluşturulan toplamalar hakkında bilgi içerir.

Akış durumu işleçleri hakkında daha fazla bilgi için bkz. Durum bilgisi olan akış nedir?.

Fields Description
stateOperators.operatorName Ölçümlerin ilişkilendirildiği durum bilgisi olan işlecin adı( örneğin symmetricHashJoin, , dedupeveya stateStoreSave).
stateOperators.numRowsTotal Durum bilgisi olan bir işlecin veya toplamanın sonucu olarak durumdaki toplam satır sayısı.
stateOperators.numRowsUpdated Durum bilgisine sahip bir işleç veya toplamanın sonucu olarak güncellenen toplam satır sayısı.
stateOperators.allUpdatesTimeMs Bu ölçüm şu anda Spark tarafından ölçülemiyor ve gelecekteki güncelleştirmelerde kaldırılması planlanıyor.
stateOperators.numRowsRemoved Bir durum bilgisine sahip operatör veya toplama sonucunda durumdan kaldırılan toplam satır sayısı.
stateOperators.allRemovalsTimeMs Bu ölçüm şu anda Spark tarafından ölçülemiyor ve gelecekteki güncelleştirmelerde kaldırılması planlanıyor.
stateOperators.commitTimeMs Tüm güncelleştirmeleri işlemek (yerleştirir ve kaldırır) ve yeni bir sürüm döndürmek için geçen süre.
stateOperators.memoryUsedBytes Durum deposu tarafından kullanılan bellek.
stateOperators.numRowsDroppedByWatermark Durum bilgisi olan birikime dahil edilmek için çok geç olduğu kabul edilen satır sayısı. Yalnızca akış toplamaları: Toplama işlemi sonrasında çıkarılan satır sayısı (ham giriş satırları değil). Bu sayı kesin değildir, ancak geç bırakılan veriler olduğuna dair bir gösterge sağlar.
stateOperators.numShufflePartitions Bu durum bilgisi olan işleç için karıştırma bölmelerinin sayısı.
stateOperators.numStateStoreInstances operatörün başlatıp koruduğu gerçek durum deposu örneği. Durum bilgisi olan birçok işleç için bu, bölüm sayısıyla aynıdır. Ancak stream-stream birleşimleri bölüm başına dört durum deposu örneği başlatır.
stateOperators.customMetrics Daha fazla ayrıntı için bu konudaki stateOperators.customMetrics bölümüne bakın.

StateOperatorProgress.customMetrics nesnesi

Nesne türü: ju.Map[String, JLong]

StateOperatorProgress , customMetricsbu ölçümleri toplarken kullandığınız özelliğe özgü ölçümleri içeren bir alanına sahiptir.

Feature Description
RocksDB durum depolama sistemi RocksDB durum deposu ölçümleri.
HDFS durum deposu HDFS durum deposu için ölçümler.
Akış deduplikasyonu Satır yinelenenlerini kaldırmaya yönelik ölçümler.
Veri akışı toplama Satır toplama ölçümleri.
Akış birleştirme işleci Akış birleştirme işleci için ölçümler.
transformWithState İşleç için transformWithState ölçümler.

RocksDB durum deposu özel ölçümleri

RocksDB'den toplanan bilgiler, Yapılandırılmış Akış işi için sakladığı durum bilgileriyle ilgili performans ve işlemlerine dair ölçümleri içermektedir. Daha fazla bilgi için bkz. Azure Databricks üzerinde RocksDB durum depolarını yapılandırma.

Fields Description
customMetrics.rocksdbBytesCopied RocksDB Dosya Yöneticisi tarafından izlenen kopyalanan bayt sayısı.
customMetrics.rocksdbCommitCheckpointLatency Yerel RocksDB anlık görüntüsünün alınarak yerel bir dizine yazılma süresi milisaniye cinsinden ölçülür.
customMetrics.rocksdbCompactLatency Denetim noktası onayı sırasında milisaniye cinsinden sıkıştırma işlemi süresi (isteğe bağlı).
customMetrics.rocksdbCommitCompactLatency Kaydetme işlemi sırasında, milisaniye cinsinden sıkıştırma süresi.
customMetrics.rocksdbCommitFileSyncLatencyMs Milisaniye cinsinden süre, yerel RocksDB anlık görüntüsünün denetim noktası konumuna (dış depolamaya) senkronize edilmesi.
customMetrics.rocksdbCommitFlushLatency RocksDB'nin bellek içi değişikliklerinin yerel diske boşaltılma süresi milisaniye cinsinden ölçülür.
customMetrics.rocksdbCommitPauseLatency Denetim noktası taahhüdünün bir parçası olarak, sıkıştırma işlemi gibi nedenlerle arka plan çalışanı iş parçacıklarını durdurma süresi (milisaniye cinsinden).
customMetrics.rocksdbCommitWriteBatchLatency Bellek içi yapıda (WriteBatch) hazırlanan yazma işlemlerinin yerel RocksDB'ye uygulanma süresi milisaniye cinsindendir.
customMetrics.rocksdbFilesCopied RocksDB Dosya Yöneticisi tarafından izlendiği şekilde kopyalanan dosyaların sayısı.
customMetrics.rocksdbFilesReused RocksDB Dosya Yöneticisi tarafından izlendiği şekilde yeniden kullanılan dosyaların sayısı.
customMetrics.rocksdbGetCount Çağrıların sayısı get (bellek içi toplu işlem gets - hazırlama yazma işlemleri için kullanılır ve WriteBatch dahil değildir).
customMetrics.rocksdbGetLatency Temel alınan yerel RocksDB::Get çağrısı için ortalama süre nanosaniye olarak.
customMetrics.rocksdbReadBlockCacheHitCount RocksDB'deki blok önbelleğinden önbellek isabetlerinin sayısı.
customMetrics.rocksdbReadBlockCacheMissCount RocksDB'de blok önbelleğinin eksik sayısı.
customMetrics.rocksdbSstFileSize RocksDB örneğindeki tüm Statik Sıralanmış Tablo (SST) dosyalarının boyutu.
customMetrics.rocksdbTotalBytesRead İşlemler tarafından get okunan sıkıştırılmamış bayt sayısı.
customMetrics.rocksdbTotalBytesWritten put işlemleri tarafından yazılan sıkıştırılmamış baytların toplam sayısı.
customMetrics.rocksdbTotalBytesReadThroughIterator Yineleyici kullanılarak okunan sıkıştırılmamış verilerin toplam bayt sayısı. Durum bilgisine sahip bazı işlemler (örneğin, FlatMapGroupsWithState zaman aşımı işleme ve filigranlama), verilerin bir yineleyici aracılığıyla Azure Databricks'e okunmasını gerektirir.
customMetrics.rocksdbTotalBytesReadByCompaction Sıkıştırma işleminin diskten okuduğu bayt sayısı.
customMetrics.rocksdbTotalBytesWrittenByCompaction Sıkıştırma işleminin diske yazdığı toplam bayt sayısı.
customMetrics.rocksdbTotalCompactionLatencyMs Arka plan sıkıştırmaları ve işleme sırasında başlatılan isteğe bağlı sıkıştırma dahil olmak üzere RocksDB sıkıştırmaları için milisaniye cinsinden süre.
customMetrics.rocksdbTotalFlushLatencyMs Arka plan temizleme de dahil olmak üzere toplam temizleme süresi. Boşaltma işlemleri, MemTable dolduğunda depolama alanına boşaltılan süreçlerdir. MemTables, verilerin RocksDB'de depolandığı ilk düzeydir.
customMetrics.rocksdbZipFileBytesUncompressed Dosya Yöneticisi tarafından bildirilen sıkıştırılmamış zip dosyalarının bayt cinsinden boyutu. Dosya Yöneticisi, fiziksel SST dosya disk alanı kullanımını ve silmeyi yönetir.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> RocksDB anlık görüntüsünün kontrol noktası konumuna kaydedilen en son sürümü. "-1" değeri hiçbir anlık görüntünün kaydedilmediğini gösterir. Anlık görüntüler her durum deposu örneğine özgü olduğundan, bu ölçü belirli bir bölüm kimliği ve durum deposu adı için geçerlidir.
customMetrics.rocksdbPutLatency Toplam put çağrısı gecikme süresi.
customMetrics.rocksdbPutCount Put opsiyonlarının sayısı.
customMetrics.rocksdbWriterStallLatencyMs Sıkıştırma veya temizleme işleminin bitmesi için yazar bekleme süresi başlar.
customMetrics.rocksdbTotalBytesWrittenByFlush Flush işlemi tarafından yazılan toplam bayt sayısı
customMetrics.rocksdbPinnedBlocksMemoryUsage Sabitlenmiş bloklar için bellek kullanımı
customMetrics.rocksdbNumInternalColFamiliesKeys İç sütun aileleri için dahili anahtar sayısı
customMetrics.rocksdbNumExternalColumnFamilies Dış sütun ailelerinin sayısı
customMetrics.rocksdbNumInternalColumnFamilies İç sütun ailelerinin sayısı

HDFS durum deposu özel ölçümleri

HDFS durum deposu sağlayıcısı davranışları ve işlemleri hakkında toplanan bilgiler.

Fields Description
customMetrics.stateOnCurrentVersionSizeBytes Yalnızca geçerli sürümde tahmini durum boyutu.
customMetrics.loadedMapCacheHitCount Sağlayıcıda önbelleğe alınan durumların önbellek isabeti sayısı.
customMetrics.loadedMapCacheMissCount Sağlayıcıda önbelleğe alınmış durumların önbellek kaçırma sayısı.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> Belirli bir durum mağazası örneği için anlık görüntünün yüklenen son sürümü.

Özelleştirilmiş yinelenenleri kaldırma ölçümleri

Yinelenenleri kaldırma davranışları ve işlemleriyle ilgili toplanan bilgiler.

Fields Description
customMetrics.numDroppedDuplicateRows Çıkarılan yinelenen satırların sayısı.
customMetrics.numRowsReadDuringEviction Durum çıkarma sırasında okunan durum satırlarının sayısı.

Özel Toplama Ölçümleri

Toplama davranışları ve işlemleri hakkında toplanan bilgiler.

Fields Description
customMetrics.numRowsReadDuringEviction Durum çıkarma sırasında okunan durum satırlarının sayısı.

Özel akış birleştirme metrikleri

Akış birleştirme davranışları ve işlemleri hakkında toplanan bilgiler.

Fields Description
customMetrics.skippedNullValueCount null spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled olarak ayarlandığında atlanan true değerlerin sayısı.

transformWithState özel ölçümleri

transformWithState hakkında toplanan bilgiler, (TWS) davranışları ve işlemlerine dairdir. hakkında transformWithStatedaha fazla ayrıntı için bkz. Durum bilgisi olan özel bir uygulama oluşturma.

Fields Description
customMetrics.initialStateProcessingTimeMs Tüm ilk durumu işlemek için alınan milisaniye sayısı.
customMetrics.numValueStateVars Değer durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numListStateVars Liste durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numMapStateVars Harita durum değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numDeletedStateVars Silinen durum değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.timerProcessingTimeMs Tüm süreölçerleri işlemek için alınan milisaniye sayısı.
customMetrics.numRegisteredTimers Kayıtlı süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numDeletedTimers Silinen süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numExpiredTimers Süresi dolan süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numValueStateWithTTLVars TTL ile değer durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numListStateWithTTLVars TTL ile liste durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numMapStateWithTTLVars TTL ile eşleme durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numValuesRemovedDueToTTLExpiry TTL süre sonu nedeniyle kaldırılan değer sayısı. Ayrıca transformWithStateInPandas için mevcuttur.
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry TTL süre sonu nedeniyle artımlı olarak kaldırılan değer sayısı.

sources nesnesi

Nesne türü: Array[SourceProgress]

nesnesi, sources akış veri kaynaklarına yönelik bilgiler ve ölçümler içerir.

Fields Description
description Akış veri kaynağı tablosunun ayrıntılı açıklaması.
startOffset Akış işinin başlatıldığı veri kaynağı tablosundaki başlangıç uzaklığı numarası.
endOffset Mikro yığın tarafından işlenen son ofset.
latestOffset Mikro toplu iş tarafından işlenen en son öteleme.
numInputRows Bu kaynaktan işlenen giriş satırlarının sayısı.
inputRowsPerSecond Bu kaynaktan işlenmek üzere verilerin gelme hızı (saniye olarak).
processedRowsPerSecond Spark'ın bu kaynaktan verileri işleme hızı.
metrics Tür: ju.Map[String, String]. Belirli bir veri kaynağı için özel ölçümler içerir.

Azure Databricks aşağıdaki kaynak nesnesi uygulamasını sağlar:

Note

Formda sources.<startOffset / endOffset / latestOffset>.* (veya benzer bir şekilde) tanımlanan alanlar için, bunu belirtilen alt alanı içeren (bu 3 olası alandan biri) olarak yorumlayın.

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake kaynakları nesnesi

Delta tablosu akış veri kaynakları için kullanılan özel ölçümlerin tanımları.

Fields Description
sources.description Akış sorgusunun okuduğu kaynağın açıklaması. Örneğin: ”DeltaSource[table]”.
sources.<startOffset / endOffset>.sourceVersion Bu ofsetin kodlandığı serileştirme sürümü.
sources.<startOffset / endOffset>.reservoirId Okunan tablonun kimliği. Bu, sorgu yeniden başlatılırken yanlış yapılandırmayı algılamak için kullanılır. Bkz. Unity Kataloğunu, Delta Lake'i ve Yapılandırılmış Akış ölçümleri tablosu tanımlayıcılarını eşleme.
sources.<startOffset / endOffset>.reservoirVersion Şu anda işlemekte olan tablonun sürümü.
sources.<startOffset / endOffset>.index Bu sürümdeki AddFiles dizinindeki sıra. Bu, büyük taahhütleri birden fazla partiye ayırmak için kullanılır. Bu dizin, modificationTimestamp ve path üzerinde sıralanarak oluşturulur.
sources.<startOffset / endOffset>.isStartingVersion Mevcut ofsetin, ilk verilerin işlenmesinden sonraki değişikliklerin işlenmesi yerine, yeni bir akış sorgusunun başlangıcını belirtip belirtmediğini tanımlar. Yeni bir sorgu başlatılırken, başlangıçtaki tabloda bulunan tüm veriler önce işlenir ve ardından gelen tüm yeni veriler işlenir.
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis Olay sıralaması için kaydedilen zaman. İşlenmeyi bekleyen ilk anlık görüntü verilerinin olay zamanı. Olay zaman sırası ile ilk anlık görüntü işlenirken kullanılır.
sources.latestOffset Mikro toplu sorgu tarafından işlenen en son ofset.
sources.numInputRows Bu kaynaktan işlenen giriş satırlarının sayısı.
sources.inputRowsPerSecond Bu kaynaktan işlenmek üzere verilerin gelme hızı.
sources.processedRowsPerSecond Spark'ın bu kaynaktan verileri işleme hızı.
sources.metrics.numBytesOutstanding Ödenmemiş dosyaların toplam boyutu (RocksDB tarafından izlenen dosyalar). Bu, akış kaynağı olarak Delta ve Auto Loader için birikim metriğidir.
sources.metrics.numFilesOutstanding İşlenecek bekleyen dosyaların sayısı. Bu, akış kaynağı olarak Delta ve Auto Loader için birikim metriğidir.

Apache Kafka kaynakları nesnesi

Apache Kafka akış veri kaynakları için kullanılan özel ölçümlerin tanımları.

Fields Description
sources.description Okunan Kafka konusunu tam olarak belirten Kafka kaynağının ayrıntılı bir açıklaması. Örneğin: ”KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset Akış işinin başlatıldığı Kafka konusu içindeki başlangıç uzaklığı numarası.
sources.endOffset Mikro yığın tarafından işlenen son ofset. Bu, devam eden bir mikro toplu iş yürütmesine latestOffset eşit olabilir.
sources.latestOffset Mikro yığın tarafından hesaplanan en son ofset. Mikro toplu işlem, sınırlama olduğunda tüm offsetleri işlemeyebilir, bu da endOffset ve latestOffset arasında farklılık oluşmasına neden olur.
sources.numInputRows Bu kaynaktan işlenen giriş satırlarının sayısı.
sources.inputRowsPerSecond Bu kaynaktan işlenmek üzere verilerin gelme hızı.
sources.processedRowsPerSecond Spark'ın bu kaynaktan verileri işleme hızı.
sources.metrics.avgOffsetsBehindLatest Akış sorgusunun abone olunan tüm konular arasında en son kullanılabilir uzaklığı geride bıraktığı ortalama uzaklık sayısı.
sources.metrics.estimatedTotalBytesBehindLatest Sorgu işleminin abone olunan konu başlıklarından kullanmadığı tahmini bayt sayısı.
sources.metrics.maxOffsetsBehindLatest Akış sorgusunun abone olunan tüm konular arasında en son kullanılabilir ofsetin arkasında kaldığı maksimum ofset sayısı.
sources.metrics.minOffsetsBehindLatest Abone olunan tüm konular arasında akış sorgusunun en son kullanılabilir uzaklığı arkasında olduğu en düşük uzaklık sayısı.

Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.

Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .

Otomatik Yükleyici kaynak ölçümleri

Otomatik Yükleyici akış veri kaynakları için kullanılan özel ölçümlerin tanımları.

Fields Description
sources.<startOffset / endOffset / latestOffset>.seqNum Dosyaların keşfedilme sırasına göre işlenmekte olan dosya dizisindeki mevcut konum.
sources.<startOffset / endOffset / latestOffset>.sourceVersion cloudFiles kaynağının uygulama sürümü.
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs En son doldurma işleminin başlangıç saati.
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs En son doldurma işleminin bitiş saati.
sources.<startOffset / endOffset / latestOffset>.lastInputPath Akış yeniden başlatılmadan önce akışın kullanıcı tarafından sağlanan son giriş yolu.
sources.metrics.numFilesOutstanding Beklemedeki işler içindeki dosya sayısı
sources.metrics.numBytesOutstanding Birikmiş işlemlerdeki dosyaların boyutu (byte)
sources.metrics.approximateQueueSize İleti kuyruğunun yaklaşık boyutu. Yalnızca cloudFiles.useNotifications seçeneği etkinleştirildiğinde.
sources.numInputRows Bu kaynaktan işlenen giriş satırlarının sayısı. binaryFile Kaynak biçimi için, numInputRows dosya sayısına eşittir.

PubSub kaynakları ölçümleri

PubSub akış veri kaynakları için kullanılan özel ölçümlerin tanımları. PubSub akış kaynaklarını izleme hakkında daha fazla bilgi için bkz. Pub/Sub akış ölçümlerini izleme.

Fields Description
sources.<startOffset / endOffset / latestOffset>.sourceVersion Bu, kayan nokta ile kodlanmış uygulama sürümüdür.
sources.<startOffset / endOffset / latestOffset>.seqNum İşlenmekte olan kalıcı sıra numarası.
sources.<startOffset / endOffset / latestOffset>.fetchEpoch İşlenen en büyük veri alma dönemi.
sources.metrics.numRecordsReadyToProcess Mevcut birikim içinde işlenebilecek kayıtların sayısı.
sources.metrics.sizeOfRecordsReadyToProcess Mevcut birikmiş işler içindeki işlenmemiş verilerin bayt cinsinden toplam boyutu.
sources.metrics.numDuplicatesSinceStreamStart Başlatıldığı ilk andan itibaren veri akışı tarafından işlenen mükerrer kayıtların toplam sayısı.

Pulsar kaynak ölçümleri

Pulsar akış veri kaynakları için kullanılan özel ölçümlerin tanımları.

Fields Description
sources.metrics.numInputRows Geçerli mikro toplu işlemde işlenen satır sayısı.
sources.metrics.numInputBytes Geçerli mikro toplu işlemde işlenen toplam bayt sayısı.

havuz nesnesi

Nesne türü: SinkProgress

Fields Description
sink.description Lavabonun açıklaması, kullanılan belirli lavabo uygulamasını ayrıntılı olarak açıklar.
sink.numOutputRows Çıkış satırlarının sayısı. Farklı havuz türleri, değerler için farklı davranışlara veya kısıtlamalara sahip olabilir. Desteklenen belirli türlere bakın
sink.metrics ju.Map[String, String] lavabonun ölçümleri.

Şu anda Azure Databricks iki özel sink nesnesi uygulaması sağlar:

Lavabo türü Details
Delta tablosu Bkz . Delta havuz nesnesi.
Apache Kafka konusu Bkz. Kafka havuz nesnesi.

alanı, sink.metrics nesnenin her iki değişkeni sink için de aynı şekilde davranır.

Delta Lake havuz nesnesi

Fields Description
sink.description Delta havuzu açıklaması, kullanılmakta olan delta havuzu uygulamasının ayrıntıları. Örneğin: ”DeltaSink[table]”.
sink.numOutputRows Satır sayısı her zaman -1 Spark'ın Delta Lake havuzu sınıflandırması olan DSv1 havuzları için çıkış satırlarını çıkaramadığı içindir.

Apache Kafka havuz nesnesi

Fields Description
sink.description Akış sorgusunun yazmakta olduğu Kafka havuzu açıklaması, kullanılan belirli Kafka havuz uygulamasının ayrıntılarını gösterir. Örneğin: ”org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Mikro toplu işlemin bir parçası olarak çıkış tablosuna veya veri havuzuna yazılan satır sayısı. Bazı durumlarda, bu değer "-1" olabilir ve genellikle "bilinmiyor" olarak yorumlanabilir.

Examples

Kafka'dan Kafka'ya StreamingQueryListener olayı örneği

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake'ten Delta Lake'e örnek bir StreamingQueryListener olayı

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/<user>/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Örnek Kinesis'ten Delta Lake'e StreamingQueryListener olayı

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/<run-id>/deltaTable]",
    "numOutputRows" : -1
  }
}

Örnek Kafka+Delta Lake ile Delta Lake arasındaki StreamingQueryListener olayı

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener olayına örnek hız kaynağı

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}