Aracılığıyla paylaş


Azure Databricks'te RocksDB durum depolarını yapılandırma

Akış sorgusunu başlatmadan önce SparkSession'da aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Delta Live Tables işlem hatlarında RocksDB'yi etkinleştirebilirsiniz. Bkz . Durum bilgisi olan işleme için işlem hattı yapılandırmasını iyileştirme.

Değişiklik günlüğü denetim noktasını etkinleştirme

Databricks Runtime 13.3 LTS ve üzeri sürümlerde, Yapılandırılmış Akış iş yükleri için denetim noktası süresini ve uçtan uca gecikme süresini azaltmak için değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz. Databricks, tüm Yapılandırılmış Akış durum bilgisi sorguları için değişiklik günlüğü denetim noktası oluşturmanın etkinleştirilmesini önerir.

Geleneksel olarak RocksDB State Store denetim noktası oluşturma sırasında veri dosyalarını anlık görüntüler ve karşıya yükler. Bu maliyeti önlemek için değişiklik günlüğü denetim noktası oluşturma yalnızca son denetim noktasından bu yana değişen kayıtları dayanıklı depolamaya yazar."

Değişiklik günlüğü denetim noktası oluşturma varsayılan olarak devre dışıdır. Aşağıdaki söz dizimini kullanarak SparkSession düzeyinde değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Var olan bir akışta değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilir ve denetim noktasında depolanan durum bilgilerini koruyabilirsiniz.

Önemli

Değişiklik günlüğü denetim noktası oluşturmayı etkinleştiren sorgular yalnızca Databricks Runtime 13.3 LTS ve üzeri üzerinde çalıştırılabilir. Eski denetim noktası oluşturma davranışına dönmek için değişiklik günlüğü denetim noktası oluşturmayı devre dışı bırakabilirsiniz, ancak bu sorguları Databricks Runtime 13.3 LTS veya üzerinde çalıştırmaya devam etmeniz gerekir. Bu değişikliklerin gerçekleşmesi için işi yeniden başlatmanız gerekir.

RocksDB durum deposu ölçümleri

Her durum operatörü, durum deposunu gözlemlemek ve işin yavaşlığının hatalarını ayıklamaya yardımcı olmak için RocksDB örneğinde gerçekleştirilen durum yönetimi işlemleriyle ilgili ölçümleri toplar. Bu ölçümler, durum işlecinin çalıştığı tüm görevlerde işteki durum işleci başına toplanır (toplam). Bu ölçümler içindeki alanların StreamingQueryProgressiçindeki haritanın stateOperators bir parçasıdırcustomMetrics. Aşağıda JSON biçimindeki bir örneği StreamingQueryProgress verilmiştir (kullanılarak StreamingQueryProgress.json()elde edilir).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Ölçümlerin ayrıntılı açıklamaları aşağıdaki gibidir:

Ölçüm adı Açıklama
rocksdbCommitWriteBatchLatency Bellek içi yapıdaki (WriteBatch) hazırlanmış yazma işlemlerinin yerel RocksDB'ye uygulanması için geçen süre (milisaniye cinsinden).
rocksdbCommitFlushLatency RocksDB bellek içi değişikliklerinin yerel diske boşaltılması için geçen süre (milisaniye cinsinden).
rocksdbCommitCompactLatency Denetim noktası işlemesi sırasında sıkıştırma (isteğe bağlı) için geçen süre (milisaniye cinsinden).
rocksdbCommitPauseLatency Denetim noktası işlemesinin bir parçası olarak arka plan çalışan iş parçacıklarını (sıkıştırma vb.) durdurmak için geçen süre (milis cinsinden).
rocksdbCommitCheckpointLatency Yerel RocksDB'nin anlık görüntüsünü almak ve yerel bir dizine yazmak için geçen süre (milis cinsinden).
rocksdbCommitFileSyncLatencyMs Yerel RocksDB anlık görüntüsüyle ilgili dosyaların bir dış depolama alanıyla (denetim noktası konumu) eşitlenmesi için geçen süre (milisaniye cinsinden).
rocksdbGetLatency Temel alınan yerel RocksDB::Get çağrı başına ortalama süre (nano cinsinden).
rocksdbPutCount Temel alınan yerel RocksDB::Put çağrı başına ortalama süre (nano cinsinden).
rocksdbGetCount Yerel RocksDB::Get çağrı sayısı (WriteBatch'ten içermez Gets - yazma işlemleri için kullanılan bellek toplu işlemine).
rocksdbPutCount Yerel RocksDB::Put çağrı sayısı (yazma işlemleri için kullanılan bellek toplu işlemine WriteBatch'e dahil Puts değildir).
rocksdbTotalBytesReadByGet Yerel RocksDB::Get çağrılar aracılığıyla okunan sıkıştırılmamış bayt sayısı.
rocksdbTotalBytesWrittenByPut Yerel RocksDB::Put çağrılar aracılığıyla yazılan sıkıştırılmamış bayt sayısı.
rocksdbReadBlockCacheHitCount Yerel diskten veri okunmasını önlemek için yerel RocksDB blok önbelleğinin kaç kez kullanıldığı.
rocksdbReadBlockCacheMissCount Yerel RocksDB bloğu önbelleğinin kaç kez kaçırılması ve yerel diskten okuma verilerinin gerekli olması.
rocksdbTotalBytesReadByCompaction Yerel RocksDB sıkıştırma işlemi tarafından yerel diskten okunan bayt sayısı.
rocksdbTotalBytesWrittenByCompaction Yerel RocksDB sıkıştırma işlemi tarafından yerel diske yazılan bayt sayısı.
rocksdbTotalCompactionLatencyMs RocksDB sıkıştırmaları (hem arka plan hem de işleme sırasında başlatılan isteğe bağlı sıkıştırma) için geçen süre (milisaniye cinsinden).
rocksdbWriterStallLatencyMs Yazıcının arka plan sıkıştırması veya diske boşaltılması nedeniyle zaman (milis cinsinden) durdu.
rocksdbTotalBytesReadThroughIterator Durum bilgisi olan işlemlerden bazıları (içinde zaman aşımı işleme flatMapGroupsWithState veya pencerelenmiş toplamalarda filigranlama gibi) yineleyici aracılığıyla DB'deki verilerin tamamının okunmasını gerektirir. Yineleyici kullanılarak okunan sıkıştırılmamış verilerin toplam boyutu.