Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Azure Databricks, Streaming sekmesinin altındaki Spark kullanıcı arabirimi aracılığıyla Yapılandırılmış Akış uygulamaları için yerleşik izleme sağlar.
Spark kullanıcı arabiriminde Yapılandırılmış Akış sorgularını ayırt etme
Spark kullanıcı arabiriminde hangi ölçümlerin hangi akışa ait olduğunu kolayca ayırt etmek için kodunuza ekleyerek .queryName(<query-name>) akışlarınıza writeStream benzersiz bir sorgu adı sağlayın.
Yapılandırılmış Akış ölçümlerini dış hizmetlere gönderme
Akış ölçümleri, Apache Spark'ın Akış Sorgu Dinleyicisi arabirimi kullanılarak uyarı veya pano kullanım örnekleri için dış hizmetlere gönderilebilir. Databricks Runtime 11.3 LTS ve üzerinde StreamingQueryListener, Python ve Scala'da kullanılabilir.
Important
Unity Kataloğu özellikli işlem erişim modlarını kullanan iş yükleri için aşağıdaki sınırlamalar geçerlidir:
-
StreamingQueryListener, kimlik bilgilerini kullanmak veya ayrılmış erişim moduyla bilgisayarda Unity Kataloğu tarafından yönetilen nesnelerle etkileşime geçmek için Databricks Runtime 15.1 veya daha üstünü gerektirir. -
StreamingQueryListener, standart erişim modu (eski adıyla paylaşılan erişim modu) ile yapılandırılmış Scala iş yükleri için Databricks Runtime 16.1 veya üzerini gerektirir.
Note
Dinleyicilerle işlem gecikmesi, sorgu işleme hızlarını önemli ölçüde etkileyebilir. Bu dinleyicilerde işleme mantığını sınırlamanız ve verimlilik için Kafka gibi hızlı yanıt sistemlerine yazmayı tercih etmeniz tavsiye edilir.
Sorguda kaynakta kullanılabilir veri yoksa ve yeni veri bekliyorsa, akış sorgusu dinleyicisine bir onQueryIdle ileti gönderilir. İleti onQueryProgress yalnızca akış sorgusu toplu işleminin sonunda teslim edilir. Sorgu uzun süredir verileri işliyorsa, ne onQueryIdle ne de onQueryProgress olayları gönderilmeyebilir, ancak sorgu hala sağlıklıdır ve verileri işlemeye devam eder.
Aşağıdaki kod, dinleyici uygulamaya yönelik söz diziminin temel örneklerini sağlar:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Yapılandırılmış Akış'ta gözlemlenebilir ölçümleri tanımlama
Gözlemlenebilir ölçümler, sorguda (DataFrame) tanımlanabilen rastgele toplama işlevleri olarak adlandırılır. DataFrame'in çalışması bir tamamlanma noktasına ulaşır ulaşmaz (bir toplu iş sorgusunu tamamlar veya akış dönemine ulaşır), son tamamlanma noktasından bu yana işlenen verilerin ölçümlerini içeren adlandırılmış bir olay yayılır.
Spark oturumuna bir dinleyici ekleyerek bu ölçümleri gözlemleyebilirsiniz. Dinleyici, yürütme moduna bağlıdır.
Toplu işlem modu:
QueryExecutionListenerkullanın.QueryExecutionListenersorgu tamamlandığında çağrılır. Haritayı kullanarak ölçümlere erişinQueryExecution.observedMetrics.Akış veya mikro yığın işleme: kullanın
StreamingQueryListener.Akış sorgusu bir dönemi tamamladığında
StreamingQueryListenerçağrılır. Haritayı kullanarak ölçümlere erişinStreamingQueryProgress.observedMetrics. Azure Databricks akış içincontinuoustetikleyici modunu desteklemez.
Örneğin:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Unity Kataloğu, Delta Lake ve Yapılandırılmış Akış ölçümleri tablo tanımlayıcılarını eşleme
Yapılandırılmış Akış ölçümleri, bir akış sorgusu için kaynak olarak kullanılan delta tablosunun benzersiz kimliği için alanı birkaç yerde kullanır reservoirId .
reservoirId alanı, Delta tablosu tarafından Delta işlem günlüğünde depolanan benzersiz tanımlayıcıyı eşleştirir. Bu kimlik, Unity Kataloğu tarafından atanan ve Katalog Gezgini'nde görüntülenen değerle eşlenmeztableId.
Delta tablosunun tablo tanımlayıcısını gözden geçirmek için aşağıdaki söz dizimini kullanın. Bu, Unity Catalog tarafından yönetilen tablolar, Unity Catalog dış tablolar ve tüm Hive metastore Delta tablolarında çalışır:
DESCRIBE DETAIL <table-name>
Sonuçlarda görüntülenen id alanı, akış ölçümlerindeki reservoirId ile eşleşen tanımlayıcıdır.
StreamingQueryListener nesne ölçümleri
| Fields | Description |
|---|---|
id |
Yeniden başlatmalar arasında kalıcı olan benzersiz bir sorgu kimliği. |
runId |
Her başlatma/yeniden başlatma için benzersiz bir sorgu kimliği. Bkz. StreamingQuery.runId(). |
name |
Sorgunun kullanıcı tarafından belirtilen adı. Ad belirtilmezse ad null olur. |
timestamp |
Mikro toplu iş yürütme zaman damgası. |
batchId |
İşlenmekte olan mevcut veri kümesinin benzersiz kimliği. Bir hatadan sonra yeniden denemeler söz konusu olduğunda, belirli bir toplu iş kimliği birden fazla kez çalıştırılabilir. Benzer şekilde, işlenecek veri olmadığında toplu iş kimliği artırılmaz. |
batchDuration |
Bir toplu işlemi milisaniye cinsinden işleme süresi. |
numInputRows |
Bir tetikleyicide işlenen toplam kayıt sayısı (tüm kaynaklar arasında). |
inputRowsPerSecond |
Gelen verilerin toplam oranı (tüm kaynaklar arasında). |
processedRowsPerSecond |
Spark'ın verileri işleme oranı (tüm kaynaklar genelinde). |
StreamingQueryListener ayrıca, müşteri ölçümleri ve kaynak ilerleme durumu ayrıntıları için inceleyebileceğiniz nesneleri içeren aşağıdaki alanları tanımlar:
| Fields | Description |
|---|---|
durationMs |
Tür: ju.Map[String, JLong]. Bkz durationMs nesnesi. |
eventTime |
Tür: ju.Map[String, String]. Bkz. eventTime nesnesi. |
stateOperators |
Tür: Array[StateOperatorProgress]. Bkz stateOperators nesnesi. |
sources |
Tür: Array[SourceProgress]. Bakınız sources nesnesi. |
sink |
Tür: SinkProgress. Bkz. havuz nesnesi. |
observedMetrics |
Tür: ju.Map[String, Row]. DataFrame/sorgu üzerinde tanımlanabilir rastgele toplama işlevleri (örneğin df.observe). |
durationMs nesnesi
Nesne türü: ju.Map[String, JLong]
Mikro toplu işlem yürütme işleminin çeşitli aşamalarını tamamlamak için geçen süre hakkında bilgi.
| Fields | Description |
|---|---|
durationMs.addBatch |
Mikro toplu işlemi yürütmek için geçen süre. Bu, Spark'ın mikro toplu işlemi planlamak için gereken süreyi dışlar. |
durationMs.getBatch |
Kaynaktan uzaklıklarla ilgili meta verileri almak için geçen süre. |
durationMs.latestOffset |
Mikro yığın için kullanılan en son ofset. Bu ilerleme nesnesi, kaynaklardan en son öteleme değerini almak için geçen süreyi ifade eder. |
durationMs.queryPlanning |
Yürütme planını oluşturmak için geçen süre. |
durationMs.triggerExecution |
Mikro toplu işlemi planlamak ve yürütmek için gereken süre. |
durationMs.walCommit |
Yeni kullanılabilir ofsetleri taahhüt etmek için alınan süre. |
durationMs.commitBatch |
addBatch sırasında hedef birime yazılan verileri kaydetmek için geçen süre. Yalnızca işlemeyi destekleyen veri işleyicileri için mevcuttur. |
durationMs.commitOffsets |
Toplu işlemi işlem günlüğüne kaydetmek için geçen süre. |
eventTime nesnesi
Nesne türü: ju.Map[String, String]
Mikro toplu işlemde işlenen veriler içinde görülen olay zamanı değeri hakkında bilgi. Bu veriler, filigran tarafından, Yapılandırılmış Akış işinde tanımlanan durum bilgisine sahip toplamaları işlemek üzere durumu nasıl uygun hale getirilmesini belirlerken kullanılır.
| Fields | Description |
|---|---|
eventTime.avg |
Bu tetikleyicide görülen ortalama olay süresi. |
eventTime.max |
Bu tetikleyicide görülen en yüksek olay süresi. |
eventTime.min |
Bu tetikleyicide görülen en düşük olay süresi. |
eventTime.watermark |
Bu tetikleyicide kullanılan filigranın değeri. |
stateOperators nesnesi
Nesne türü: Array[StateOperatorProgress] Nesne, stateOperators Yapılandırılmış Akış işinde tanımlanan durum bilgisi olan işlemler ve bunlardan oluşturulan toplamalar hakkında bilgi içerir.
Akış durumu işleçleri hakkında daha fazla bilgi için bkz. Durum bilgisi olan akış nedir?.
| Fields | Description |
|---|---|
stateOperators.operatorName |
Ölçümlerin ilişkilendirildiği durum bilgisi olan işlecin adı( örneğin symmetricHashJoin, , dedupeveya stateStoreSave). |
stateOperators.numRowsTotal |
Durum bilgisi olan bir işlecin veya toplamanın sonucu olarak durumdaki toplam satır sayısı. |
stateOperators.numRowsUpdated |
Durum bilgisine sahip bir işleç veya toplamanın sonucu olarak güncellenen toplam satır sayısı. |
stateOperators.allUpdatesTimeMs |
Bu ölçüm şu anda Spark tarafından ölçülemiyor ve gelecekteki güncelleştirmelerde kaldırılması planlanıyor. |
stateOperators.numRowsRemoved |
Bir durum bilgisine sahip operatör veya toplama sonucunda durumdan kaldırılan toplam satır sayısı. |
stateOperators.allRemovalsTimeMs |
Bu ölçüm şu anda Spark tarafından ölçülemiyor ve gelecekteki güncelleştirmelerde kaldırılması planlanıyor. |
stateOperators.commitTimeMs |
Tüm güncelleştirmeleri işlemek (yerleştirir ve kaldırır) ve yeni bir sürüm döndürmek için geçen süre. |
stateOperators.memoryUsedBytes |
Durum deposu tarafından kullanılan bellek. |
stateOperators.numRowsDroppedByWatermark |
Durum bilgisi olan birikime dahil edilmek için çok geç olduğu kabul edilen satır sayısı. Yalnızca akış toplamaları: Toplama işlemi sonrasında çıkarılan satır sayısı (ham giriş satırları değil). Bu sayı kesin değildir, ancak geç bırakılan veriler olduğuna dair bir gösterge sağlar. |
stateOperators.numShufflePartitions |
Bu durum bilgisi olan işleç için karıştırma bölmelerinin sayısı. |
stateOperators.numStateStoreInstances |
operatörün başlatıp koruduğu gerçek durum deposu örneği. Durum bilgisi olan birçok işleç için bu, bölüm sayısıyla aynıdır. Ancak stream-stream birleşimleri bölüm başına dört durum deposu örneği başlatır. |
stateOperators.customMetrics |
Daha fazla ayrıntı için bu konudaki stateOperators.customMetrics bölümüne bakın. |
StateOperatorProgress.customMetrics nesnesi
Nesne türü: ju.Map[String, JLong]
StateOperatorProgress , customMetricsbu ölçümleri toplarken kullandığınız özelliğe özgü ölçümleri içeren bir alanına sahiptir.
| Feature | Description |
|---|---|
| RocksDB durum depolama sistemi | RocksDB durum deposu ölçümleri. |
| HDFS durum deposu | HDFS durum deposu için ölçümler. |
| Akış deduplikasyonu | Satır yinelenenlerini kaldırmaya yönelik ölçümler. |
| Veri akışı toplama | Satır toplama ölçümleri. |
| Akış birleştirme işleci | Akış birleştirme işleci için ölçümler. |
transformWithState |
İşleç için transformWithState ölçümler. |
RocksDB durum deposu özel ölçümleri
RocksDB'den toplanan bilgiler, Yapılandırılmış Akış işi için sakladığı durum bilgileriyle ilgili performans ve işlemlerine dair ölçümleri içermektedir. Daha fazla bilgi için bkz. Azure Databricks üzerinde
| Fields | Description |
|---|---|
customMetrics.rocksdbBytesCopied |
RocksDB Dosya Yöneticisi tarafından izlenen kopyalanan bayt sayısı. |
customMetrics.rocksdbCommitCheckpointLatency |
Yerel RocksDB anlık görüntüsünün alınarak yerel bir dizine yazılma süresi milisaniye cinsinden ölçülür. |
customMetrics.rocksdbCompactLatency |
Denetim noktası onayı sırasında milisaniye cinsinden sıkıştırma işlemi süresi (isteğe bağlı). |
customMetrics.rocksdbCommitCompactLatency |
Kaydetme işlemi sırasında, milisaniye cinsinden sıkıştırma süresi. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Milisaniye cinsinden süre, yerel RocksDB anlık görüntüsünün denetim noktası konumuna (dış depolamaya) senkronize edilmesi. |
customMetrics.rocksdbCommitFlushLatency |
RocksDB'nin bellek içi değişikliklerinin yerel diske boşaltılma süresi milisaniye cinsinden ölçülür. |
customMetrics.rocksdbCommitPauseLatency |
Denetim noktası taahhüdünün bir parçası olarak, sıkıştırma işlemi gibi nedenlerle arka plan çalışanı iş parçacıklarını durdurma süresi (milisaniye cinsinden). |
customMetrics.rocksdbCommitWriteBatchLatency |
Bellek içi yapıda (WriteBatch) hazırlanan yazma işlemlerinin yerel RocksDB'ye uygulanma süresi milisaniye cinsindendir. |
customMetrics.rocksdbFilesCopied |
RocksDB Dosya Yöneticisi tarafından izlendiği şekilde kopyalanan dosyaların sayısı. |
customMetrics.rocksdbFilesReused |
RocksDB Dosya Yöneticisi tarafından izlendiği şekilde yeniden kullanılan dosyaların sayısı. |
customMetrics.rocksdbGetCount |
Çağrıların sayısı get (bellek içi toplu işlem gets - hazırlama yazma işlemleri için kullanılır ve WriteBatch dahil değildir). |
customMetrics.rocksdbGetLatency |
Temel alınan yerel RocksDB::Get çağrısı için ortalama süre nanosaniye olarak. |
customMetrics.rocksdbReadBlockCacheHitCount |
RocksDB'deki blok önbelleğinden önbellek isabetlerinin sayısı. |
customMetrics.rocksdbReadBlockCacheMissCount |
RocksDB'de blok önbelleğinin eksik sayısı. |
customMetrics.rocksdbSstFileSize |
RocksDB örneğindeki tüm Statik Sıralanmış Tablo (SST) dosyalarının boyutu. |
customMetrics.rocksdbTotalBytesRead |
İşlemler tarafından get okunan sıkıştırılmamış bayt sayısı. |
customMetrics.rocksdbTotalBytesWritten |
put işlemleri tarafından yazılan sıkıştırılmamış baytların toplam sayısı. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Yineleyici kullanılarak okunan sıkıştırılmamış verilerin toplam bayt sayısı. Durum bilgisine sahip bazı işlemler (örneğin, FlatMapGroupsWithState zaman aşımı işleme ve filigranlama), verilerin bir yineleyici aracılığıyla Azure Databricks'e okunmasını gerektirir. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Sıkıştırma işleminin diskten okuduğu bayt sayısı. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Sıkıştırma işleminin diske yazdığı toplam bayt sayısı. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Arka plan sıkıştırmaları ve işleme sırasında başlatılan isteğe bağlı sıkıştırma dahil olmak üzere RocksDB sıkıştırmaları için milisaniye cinsinden süre. |
customMetrics.rocksdbTotalFlushLatencyMs |
Arka plan temizleme de dahil olmak üzere toplam temizleme süresi. Boşaltma işlemleri, MemTable dolduğunda depolama alanına boşaltılan süreçlerdir.
MemTables, verilerin RocksDB'de depolandığı ilk düzeydir. |
customMetrics.rocksdbZipFileBytesUncompressed |
Dosya Yöneticisi tarafından bildirilen sıkıştırılmamış zip dosyalarının bayt cinsinden boyutu. Dosya Yöneticisi, fiziksel SST dosya disk alanı kullanımını ve silmeyi yönetir. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
RocksDB anlık görüntüsünün kontrol noktası konumuna kaydedilen en son sürümü. "-1" değeri hiçbir anlık görüntünün kaydedilmediğini gösterir. Anlık görüntüler her durum deposu örneğine özgü olduğundan, bu ölçü belirli bir bölüm kimliği ve durum deposu adı için geçerlidir. |
customMetrics.rocksdbPutLatency |
Toplam put çağrısı gecikme süresi. |
customMetrics.rocksdbPutCount |
Put opsiyonlarının sayısı. |
customMetrics.rocksdbWriterStallLatencyMs |
Sıkıştırma veya temizleme işleminin bitmesi için yazar bekleme süresi başlar. |
customMetrics.rocksdbTotalBytesWrittenByFlush |
Flush işlemi tarafından yazılan toplam bayt sayısı |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
Sabitlenmiş bloklar için bellek kullanımı |
customMetrics.rocksdbNumInternalColFamiliesKeys |
İç sütun aileleri için dahili anahtar sayısı |
customMetrics.rocksdbNumExternalColumnFamilies |
Dış sütun ailelerinin sayısı |
customMetrics.rocksdbNumInternalColumnFamilies |
İç sütun ailelerinin sayısı |
HDFS durum deposu özel ölçümleri
HDFS durum deposu sağlayıcısı davranışları ve işlemleri hakkında toplanan bilgiler.
| Fields | Description |
|---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
Yalnızca geçerli sürümde tahmini durum boyutu. |
customMetrics.loadedMapCacheHitCount |
Sağlayıcıda önbelleğe alınan durumların önbellek isabeti sayısı. |
customMetrics.loadedMapCacheMissCount |
Sağlayıcıda önbelleğe alınmış durumların önbellek kaçırma sayısı. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
Belirli bir durum mağazası örneği için anlık görüntünün yüklenen son sürümü. |
Özelleştirilmiş yinelenenleri kaldırma ölçümleri
Yinelenenleri kaldırma davranışları ve işlemleriyle ilgili toplanan bilgiler.
| Fields | Description |
|---|---|
customMetrics.numDroppedDuplicateRows |
Çıkarılan yinelenen satırların sayısı. |
customMetrics.numRowsReadDuringEviction |
Durum çıkarma sırasında okunan durum satırlarının sayısı. |
Özel Toplama Ölçümleri
Toplama davranışları ve işlemleri hakkında toplanan bilgiler.
| Fields | Description |
|---|---|
customMetrics.numRowsReadDuringEviction |
Durum çıkarma sırasında okunan durum satırlarının sayısı. |
Özel akış birleştirme metrikleri
Akış birleştirme davranışları ve işlemleri hakkında toplanan bilgiler.
| Fields | Description |
|---|---|
customMetrics.skippedNullValueCount |
null
spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled olarak ayarlandığında atlanan true değerlerin sayısı. |
transformWithState özel ölçümleri
transformWithState hakkında toplanan bilgiler, (TWS) davranışları ve işlemlerine dairdir. hakkında transformWithStatedaha fazla ayrıntı için bkz. Durum bilgisi olan özel bir uygulama oluşturma.
| Fields | Description |
|---|---|
customMetrics.initialStateProcessingTimeMs |
Tüm ilk durumu işlemek için alınan milisaniye sayısı. |
customMetrics.numValueStateVars |
Değer durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numListStateVars |
Liste durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numMapStateVars |
Harita durum değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numDeletedStateVars |
Silinen durum değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.timerProcessingTimeMs |
Tüm süreölçerleri işlemek için alınan milisaniye sayısı. |
customMetrics.numRegisteredTimers |
Kayıtlı süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numDeletedTimers |
Silinen süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numExpiredTimers |
Süresi dolan süreölçer sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numValueStateWithTTLVars |
TTL ile değer durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numListStateWithTTLVars |
TTL ile liste durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numMapStateWithTTLVars |
TTL ile eşleme durumu değişkenlerinin sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numValuesRemovedDueToTTLExpiry |
TTL süre sonu nedeniyle kaldırılan değer sayısı. Ayrıca transformWithStateInPandas için mevcuttur. |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
TTL süre sonu nedeniyle artımlı olarak kaldırılan değer sayısı. |
sources nesnesi
Nesne türü: Array[SourceProgress]
nesnesi, sources akış veri kaynaklarına yönelik bilgiler ve ölçümler içerir.
| Fields | Description |
|---|---|
description |
Akış veri kaynağı tablosunun ayrıntılı açıklaması. |
startOffset |
Akış işinin başlatıldığı veri kaynağı tablosundaki başlangıç uzaklığı numarası. |
endOffset |
Mikro yığın tarafından işlenen son ofset. |
latestOffset |
Mikro toplu iş tarafından işlenen en son öteleme. |
numInputRows |
Bu kaynaktan işlenen giriş satırlarının sayısı. |
inputRowsPerSecond |
Bu kaynaktan işlenmek üzere verilerin gelme hızı (saniye olarak). |
processedRowsPerSecond |
Spark'ın bu kaynaktan verileri işleme hızı. |
metrics |
Tür: ju.Map[String, String]. Belirli bir veri kaynağı için özel ölçümler içerir. |
Azure Databricks aşağıdaki kaynak nesnesi uygulamasını sağlar:
Note
Formda sources.<startOffset / endOffset / latestOffset>.* (veya benzer bir şekilde) tanımlanan alanlar için, bunu belirtilen alt alanı içeren (bu 3 olası alandan biri) olarak yorumlayın.
sources.startOffset.<child-field>sources.endOffset.<child-field>sources.latestOffset.<child-field>
Delta Lake kaynakları nesnesi
Delta tablosu akış veri kaynakları için kullanılan özel ölçümlerin tanımları.
| Fields | Description |
|---|---|
sources.description |
Akış sorgusunun okuduğu kaynağın açıklaması. Örneğin: ”DeltaSource[table]”. |
sources.<startOffset / endOffset>.sourceVersion |
Bu ofsetin kodlandığı serileştirme sürümü. |
sources.<startOffset / endOffset>.reservoirId |
Okunan tablonun kimliği. Bu, sorgu yeniden başlatılırken yanlış yapılandırmayı algılamak için kullanılır. Bkz. Unity Kataloğunu, Delta Lake'i ve Yapılandırılmış Akış ölçümleri tablosu tanımlayıcılarını eşleme. |
sources.<startOffset / endOffset>.reservoirVersion |
Şu anda işlemekte olan tablonun sürümü. |
sources.<startOffset / endOffset>.index |
Bu sürümdeki AddFiles dizinindeki sıra. Bu, büyük taahhütleri birden fazla partiye ayırmak için kullanılır. Bu dizin, modificationTimestamp ve path üzerinde sıralanarak oluşturulur. |
sources.<startOffset / endOffset>.isStartingVersion |
Mevcut ofsetin, ilk verilerin işlenmesinden sonraki değişikliklerin işlenmesi yerine, yeni bir akış sorgusunun başlangıcını belirtip belirtmediğini tanımlar. Yeni bir sorgu başlatılırken, başlangıçtaki tabloda bulunan tüm veriler önce işlenir ve ardından gelen tüm yeni veriler işlenir. |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
Olay sıralaması için kaydedilen zaman. İşlenmeyi bekleyen ilk anlık görüntü verilerinin olay zamanı. Olay zaman sırası ile ilk anlık görüntü işlenirken kullanılır. |
sources.latestOffset |
Mikro toplu sorgu tarafından işlenen en son ofset. |
sources.numInputRows |
Bu kaynaktan işlenen giriş satırlarının sayısı. |
sources.inputRowsPerSecond |
Bu kaynaktan işlenmek üzere verilerin gelme hızı. |
sources.processedRowsPerSecond |
Spark'ın bu kaynaktan verileri işleme hızı. |
sources.metrics.numBytesOutstanding |
Ödenmemiş dosyaların toplam boyutu (RocksDB tarafından izlenen dosyalar). Bu, akış kaynağı olarak Delta ve Auto Loader için birikim metriğidir. |
sources.metrics.numFilesOutstanding |
İşlenecek bekleyen dosyaların sayısı. Bu, akış kaynağı olarak Delta ve Auto Loader için birikim metriğidir. |
Apache Kafka kaynakları nesnesi
Apache Kafka akış veri kaynakları için kullanılan özel ölçümlerin tanımları.
| Fields | Description |
|---|---|
sources.description |
Okunan Kafka konusunu tam olarak belirten Kafka kaynağının ayrıntılı bir açıklaması. Örneğin: ”KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”. |
sources.startOffset |
Akış işinin başlatıldığı Kafka konusu içindeki başlangıç uzaklığı numarası. |
sources.endOffset |
Mikro yığın tarafından işlenen son ofset. Bu, devam eden bir mikro toplu iş yürütmesine latestOffset eşit olabilir. |
sources.latestOffset |
Mikro yığın tarafından hesaplanan en son ofset. Mikro toplu işlem, sınırlama olduğunda tüm offsetleri işlemeyebilir, bu da endOffset ve latestOffset arasında farklılık oluşmasına neden olur. |
sources.numInputRows |
Bu kaynaktan işlenen giriş satırlarının sayısı. |
sources.inputRowsPerSecond |
Bu kaynaktan işlenmek üzere verilerin gelme hızı. |
sources.processedRowsPerSecond |
Spark'ın bu kaynaktan verileri işleme hızı. |
sources.metrics.avgOffsetsBehindLatest |
Akış sorgusunun abone olunan tüm konular arasında en son kullanılabilir uzaklığı geride bıraktığı ortalama uzaklık sayısı. |
sources.metrics.estimatedTotalBytesBehindLatest |
Sorgu işleminin abone olunan konu başlıklarından kullanmadığı tahmini bayt sayısı. |
sources.metrics.maxOffsetsBehindLatest |
Akış sorgusunun abone olunan tüm konular arasında en son kullanılabilir ofsetin arkasında kaldığı maksimum ofset sayısı. |
sources.metrics.minOffsetsBehindLatest |
Abone olunan tüm konular arasında akış sorgusunun en son kullanılabilir uzaklığı arkasında olduğu en düşük uzaklık sayısı. |
Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.
Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .
Otomatik Yükleyici kaynak ölçümleri
Otomatik Yükleyici akış veri kaynakları için kullanılan özel ölçümlerin tanımları.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
Dosyaların keşfedilme sırasına göre işlenmekte olan dosya dizisindeki mevcut konum. |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
cloudFiles kaynağının uygulama sürümü. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
En son doldurma işleminin başlangıç saati. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
En son doldurma işleminin bitiş saati. |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
Akış yeniden başlatılmadan önce akışın kullanıcı tarafından sağlanan son giriş yolu. |
sources.metrics.numFilesOutstanding |
Beklemedeki işler içindeki dosya sayısı |
sources.metrics.numBytesOutstanding |
Birikmiş işlemlerdeki dosyaların boyutu (byte) |
sources.metrics.approximateQueueSize |
İleti kuyruğunun yaklaşık boyutu. Yalnızca cloudFiles.useNotifications seçeneği etkinleştirildiğinde. |
sources.numInputRows |
Bu kaynaktan işlenen giriş satırlarının sayısı.
binaryFile Kaynak biçimi için, numInputRows dosya sayısına eşittir. |
PubSub kaynakları ölçümleri
PubSub akış veri kaynakları için kullanılan özel ölçümlerin tanımları. PubSub akış kaynaklarını izleme hakkında daha fazla bilgi için bkz. Pub/Sub akış ölçümlerini izleme.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
Bu, kayan nokta ile kodlanmış uygulama sürümüdür. |
sources.<startOffset / endOffset / latestOffset>.seqNum |
İşlenmekte olan kalıcı sıra numarası. |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
İşlenen en büyük veri alma dönemi. |
sources.metrics.numRecordsReadyToProcess |
Mevcut birikim içinde işlenebilecek kayıtların sayısı. |
sources.metrics.sizeOfRecordsReadyToProcess |
Mevcut birikmiş işler içindeki işlenmemiş verilerin bayt cinsinden toplam boyutu. |
sources.metrics.numDuplicatesSinceStreamStart |
Başlatıldığı ilk andan itibaren veri akışı tarafından işlenen mükerrer kayıtların toplam sayısı. |
Pulsar kaynak ölçümleri
Pulsar akış veri kaynakları için kullanılan özel ölçümlerin tanımları.
| Fields | Description |
|---|---|
sources.metrics.numInputRows |
Geçerli mikro toplu işlemde işlenen satır sayısı. |
sources.metrics.numInputBytes |
Geçerli mikro toplu işlemde işlenen toplam bayt sayısı. |
havuz nesnesi
Nesne türü: SinkProgress
| Fields | Description |
|---|---|
sink.description |
Lavabonun açıklaması, kullanılan belirli lavabo uygulamasını ayrıntılı olarak açıklar. |
sink.numOutputRows |
Çıkış satırlarının sayısı. Farklı havuz türleri, değerler için farklı davranışlara veya kısıtlamalara sahip olabilir. Desteklenen belirli türlere bakın |
sink.metrics |
ju.Map[String, String] lavabonun ölçümleri. |
Şu anda Azure Databricks iki özel sink nesnesi uygulaması sağlar:
| Lavabo türü | Details |
|---|---|
| Delta tablosu | Bkz . Delta havuz nesnesi. |
| Apache Kafka konusu | Bkz. Kafka havuz nesnesi. |
alanı, sink.metrics nesnenin her iki değişkeni sink için de aynı şekilde davranır.
Delta Lake havuz nesnesi
| Fields | Description |
|---|---|
sink.description |
Delta havuzu açıklaması, kullanılmakta olan delta havuzu uygulamasının ayrıntıları. Örneğin: ”DeltaSink[table]”. |
sink.numOutputRows |
Satır sayısı her zaman -1 Spark'ın Delta Lake havuzu sınıflandırması olan DSv1 havuzları için çıkış satırlarını çıkaramadığı içindir. |
Apache Kafka havuz nesnesi
| Fields | Description |
|---|---|
sink.description |
Akış sorgusunun yazmakta olduğu Kafka havuzu açıklaması, kullanılan belirli Kafka havuz uygulamasının ayrıntılarını gösterir. Örneğin: ”org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”. |
sink.numOutputRows |
Mikro toplu işlemin bir parçası olarak çıkış tablosuna veya veri havuzuna yazılan satır sayısı. Bazı durumlarda, bu değer "-1" olabilir ve genellikle "bilinmiyor" olarak yorumlanabilir. |
Examples
Kafka'dan Kafka'ya StreamingQueryListener olayı örneği
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Delta Lake'ten Delta Lake'e örnek bir StreamingQueryListener olayı
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/<user>/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Örnek Kinesis'ten Delta Lake'e StreamingQueryListener olayı
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/<run-id>/deltaTable]",
"numOutputRows" : -1
}
}
Örnek Kafka+Delta Lake ile Delta Lake arasındaki StreamingQueryListener olayı
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener olayına örnek hız kaynağı
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}