Azure Databricks'te RocksDB durum depolarını yapılandırma

RocksDB, Databricks Runtime 17.3 ve üzeri sürümlerin varsayılan durum deposu sağlayıcısıdır. 17.3'ün altındaki Databricks Runtime sürümleri için akış sorgusunu başlatmadan önce SparkSession'da aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Lakeflow Spark Bildirimli İşlem Hatlarında RocksDB'yi etkinleştirebilirsiniz. Bkz. durumsal işleme için işlem hattı yapılandırmasını optimize etme.

Değişiklik günlüğü denetim noktasını etkinleştir

Databricks Runtime 13.3 LTS ve üzeri sürümlerde, Yapılandırılmış Akış iş yükleri için denetim noktası süresini ve uçtan uca gecikme süresini azaltmak için değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz. Databricks, tüm Yapılandırılmış Akış durum bilgisi sorguları için değişiklik günlüğü denetim noktası oluşturmanın etkinleştirilmesini önerir. Değişiklik günlüğü denetim noktası oluşturma, Databricks Runtime 17.3 ve üzerinde varsayılan olarak etkindir.

Geleneksel olarak, RocksDB State Store denetim noktası oluşturma sırasında veri dosyalarının anlık görüntülerini alır ve yükler. Bu maliyeti önlemek için değişiklik günlüğü denetim noktası oluşturma yalnızca son denetim noktasından bu yana değişen kayıtları dayanıklı depolamaya yazar."

Değişiklik günlüğü denetim noktası oluşturma, 17.3'ün altındaki Databricks Runtime sürümlerinde varsayılan olarak devre dışıdır. Aşağıdaki söz dizimini kullanarak SparkSession düzeyinde değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Var olan bir akışta değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilir ve denetim noktasında depolanan durum bilgilerini koruyabilirsiniz.

Önemli

Değişiklik günlüğü denetim noktası oluşturmayı etkinleştiren sorgular yalnızca Databricks Runtime 13.3 LTS ve üzeri üzerinde çalıştırılabilir. Eski denetim noktası oluşturma davranışına dönmek için değişiklik günlüğü denetim noktası oluşturmayı devre dışı bırakabilirsiniz, ancak bu sorguları Databricks Runtime 13.3 LTS veya üzerinde çalıştırmaya devam etmeniz gerekir. Bu değişikliklerin gerçekleşmesi için işi yeniden başlatmanız gerekir.

RocksDB durum deposu ölçümleri

Her durum operatörü, RocksDB örneğinde gerçekleştirilen durum yönetimi işlemleri ile ilgili ölçümleri, durum deposunu gözlemlemek ve iş yavaşlamasını gidermeye yardımcı olmak için toplar.

Databricks Runtime 16.4 LTS ve sonrasında, belirli bir durum deposu örneğinin ölçümleri bölüm kimliği ve depo adıyla etiketlenir, bu sayede ayrı kalmaları sağlanır. Diğer tüm ölçümler, durum işlecinin aktif olduğu tüm görevlerde her durum işleci için toplam olarak raporlanır.

Bu ölçümler, customMetrics içindeki stateOperators alanlarındaki StreamingQueryProgress haritasının bir parçasıdır. JSON biçimindeki StreamingQueryProgress örneği aşağıda verilmiştir (bu örnek StreamingQueryProgress.json() kullanılarak elde edilmiştir).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Ölçümlerin ayrıntılı açıklamaları aşağıdaki gibidir:

Ölçüm adı Açıklama
rocksdbTaahhütYazmaDemetiGecikmesi Bellek içi yapıdaki (WriteBatch) aşamalı yazma işlemlerinin yerel RocksDB'ye uygulanması için geçen süre (milisaniyeler cinsinden).
rocksdbCommitFlush Gecikmesi RocksDB bellek içi değişikliklerinin yerel diske boşaltılması için geçen süre (milisaniye cinsinden).
rocksdbCommitCompactGecikmesi Denetim noktası işlemi sırasında sıkıştırma (isteğe bağlı) için geçen süre (milisaniye cinsinden).
rocksdbYazmaDuraklatmaGecikmesi Denetim noktası işlemesi kapsamında arka plan çalışan iş parçacıklarını (sıkıştırma işlemleri gibi) durdurmak için geçen süre (milisaniye cinsinden).
rocksdbCommitKontrolNoktasıGecikmesi Yerel sistemdeki RocksDB veritabanının anlık görüntüsünü almak ve bunu yerel bir dizine yazmak için geçen süre (milisaniye cinsinden).
rocksdbCommitFileSyncLatencyMs (RocksDB Taahhüt Dosya Senkronizasyon Gecikme Süresi Ms) Yerel RocksDB anlık görüntüsüyle ilgili dosyaların bir dış depolama alanıyla (kontrol noktası konumu) eşitlenmesi için geçen süre (milisaniye cinsinden).
rocksdbGetGecikme Altta yatan yerel RocksDB::Get çağrı başına ortalama süre (nano cinsinden).
rocksdbPutCount Altta yatan yerel RocksDB::Put çağrı başına ortalama süre (nano cinsinden).
rocksdbGetSayısıAl Yerel RocksDB::Get çağrı sayısı (Bellekte toplu işlem olarak kullanılan WriteBatch'ten kaynaklanan Gets hariç).
rocksdbPutCount Yerel RocksDB::Put çağrı sayısı (yazmayı aşamalı hale getirmek için kullanılan bellek içi toplu işlem olan WriteBatch'e yapılan Puts çağrıları içermez).
rocksdbTotalBytesReadByGet (get işlemi ile okunan toplam bayt sayısı) Yerel RocksDB::Get çağrılar aracılığıyla okunan sıkıştırılmamış bayt sayısı.
RocksDB toplam koyulan bayt sayısı Yerel RocksDB::Put çağrılar aracılığıyla yazılan sıkıştırılmamış bayt sayısı.
rocksdbOkumaBlokÖnbellekVuruşSayısı Yerel diskten veri okunmasını önlemek için yerel RocksDB blok önbelleğinin kaç kez kullanıldığı.
rocksdbReadBlockCacheMissSayısı RocksDB yerel blok önbelleğinin kaç kez kaçırıldığı ve verilerin yerel diskten okunmasının gerektiği.
rocksdbToplamBaytOkumaYapılandırması Yerel RocksDB sıkıştırma işlemi tarafından yerel diskten okunan bayt sayısı.
rocksdbToplamBaytYazmaSıkıştırmaTarafından Yerel RocksDB sıkıştırma işlemi tarafından yerel diske yazılan bayt sayısı.
rocksdbToplamSıkıştırmaGecikmesiMs RocksDB sıkıştırmaları (hem arka plan hem de işleme sırasında başlatılan isteğe bağlı sıkıştırma) için geçen süre (milisaniye cinsinden).
rocksdbWriterStallLatencyMs Yazıcının arka plan sıkıştırması veya memtable'ların diske boşaltılması nedeniyle bekleme süresi (milisaniye cinsinden).
rocksdbBytesThroughIterator toplam okunan bayt sayısı Durum bilgisi olan işlemlerden bazıları (örneğin, flatMapGroupsWithState içindeki zaman aşımı işleme veya pencerelenmiş toplamalardaki filigranlama) yineleyici aracılığıyla DB'deki tüm verilerin okunmasını gerektirir. Yineleyici kullanılarak okunan sıkıştırılmamış verilerin toplam boyutu.