Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks

Zarządzanie stanem opartym na bazie danych RocksDB można włączyć, ustawiając następującą konfigurację w usłudze SparkSession przed rozpoczęciem zapytania przesyłania strumieniowego.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Bazę danych RocksDB można włączyć w potokach delta Live Tables. Zobacz Włączanie magazynu stanów bazy danych RocksDB dla tabel delta live.

Włączanie tworzenia punktów kontrolnych dziennika zmian

W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć tworzenie punktów kontrolnych dziennika zmian w celu obniżenia czasu trwania punktu kontrolnego i całkowitego opóźnienia obciążeń przesyłania strumieniowego ze strukturą. Usługa Databricks zaleca włączenie punktów kontrolnych dziennika zmian dla wszystkich zapytań stanowych przesyłania strumieniowego ze strukturą.

Tradycyjnie rocksDB State Store migawki i przekazuje pliki danych podczas tworzenia punktów kontrolnych. Aby uniknąć tego kosztu, punkty kontrolne dziennika zmian zapisują tylko rekordy, które uległy zmianie od ostatniego punktu kontrolnego na trwały magazyn.

Punkty kontrolne dziennika zmian są domyślnie wyłączone. Punkty kontrolne dziennika zmian można włączyć na poziomie SparkSession przy użyciu następującej składni:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Możesz włączyć tworzenie punktów kontrolnych dziennika zmian w istniejącym strumieniu i obsługiwać informacje o stanie przechowywane w punkcie kontrolnym.

Ważne

Zapytania z włączonym tworzeniem punktów kontrolnych dziennika zmian można uruchamiać tylko w środowisku Databricks Runtime 13.3 LTS lub nowszym. Możesz wyłączyć tworzenie punktów kontrolnych dziennika zmian, aby przywrócić starsze zachowanie punktów kontrolnych, ale nadal należy uruchamiać te zapytania w środowisku Databricks Runtime 13.3 LTS lub nowszym. Aby te zmiany zostały wprowadzone, należy ponownie uruchomić zadanie.

Metryki magazynu stanów bazy danych RocksDB

Każdy operator stanu zbiera metryki związane z operacjami zarządzania stanami wykonywanymi w wystąpieniu bazy danych RocksDB, aby obserwować magazyn stanów i potencjalnie pomóc w debugowaniu spowolnienia zadań. Te metryki są agregowane (suma) na operator stanu w zadaniu we wszystkich zadaniach, w których jest uruchomiony operator stanu. Te metryki są częścią customMetrics mapy wewnątrz stateOperators pól w pliku StreamingQueryProgress. Poniżej przedstawiono przykład w formularzu StreamingQueryProgress JSON (uzyskanym przy użyciu metody StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Szczegółowe opisy metryk są następujące:

Nazwa metryki opis
rocksdbCommitWriteBatchLatency Czas (w milisach) wziął na zastosowanie przygotowanych zapisów w strukturze w pamięci (WriteBatch) do natywnej bazy danych RocksDB.
rocksdbCommitFlushLatency Czas (w milisach) trwał opróżnianie zmian w pamięci bazy danych RocksDB na dysku lokalnym.
rocksdbCommitCompactLatency Czas (w milisach) wziął na kompaktowanie (opcjonalnie) podczas zatwierdzania punktu kontrolnego.
rocksdbCommitPauseLatency Czas (w milisach) trwał zatrzymanie wątków procesu roboczego w tle (w przypadku kompaktowania itp.) w ramach zatwierdzenia punktu kontrolnego.
rocksdbCommitCheckpointLatency Czas (w milisie) wziął na utworzenie migawki natywnej bazy danych RocksDB i zapisanie go w katalogu lokalnym.
rocksdbCommitFileSyncLatencyMs Czas (w milisach) wziął na synchronizację natywnych plików powiązanych z migawką bazy danych RocksDB z zewnętrznym magazynem (lokalizacja punktu kontrolnego).
rocksdbGetLatency Średni czas (w nanos) wziął na bazowe wywołanie natywne RocksDB::Get .
rocksdbPutCount Średni czas (w nanos) wziął na bazowe wywołanie natywne RocksDB::Put .
rocksdbGetCount Liczba wywołań natywnych RocksDB::Get (nie obejmuje Gets funkcji WriteBatch — w partii pamięci używanej do przejściowych zapisów).
rocksdbPutCount Liczba wywołań natywnych RocksDB::Put (nie obejmuje Puts funkcji WriteBatch — w partii pamięci używanej do przejściowych zapisów).
rocksdbTotalBytesReadByGet Liczba nieskompresowanych bajtów odczytanych za pośrednictwem wywołań natywnych RocksDB::Get .
rocksdbTotalBytesWrittenByPut Liczba nieskompresowanych bajtów napisanych za pośrednictwem wywołań natywnych RocksDB::Put .
rocksdbReadBlockCacheHitCount Liczba przypadków użycia natywnej pamięci podręcznej blokowej Bazy danych RocksDB w celu uniknięcia odczytywania danych z dysku lokalnego.
rocksdbReadBlockCacheMissCount Liczba nieodebranych i wymaganych danych odczytu z dysku lokalnego przez natywną pamięć podręczną bloków RocksDB.
rocksdbTotalBytesReadByCompaction Liczba bajtów odczytanych z dysku lokalnego przez natywny proces kompaktowania bazy danych RocksDB.
rocksdbTotalBytesWrittenByCompaction Liczba bajtów zapisanych na dysku lokalnym przez natywny proces kompaktowania bazy danych RocksDB.
rocksdbTotalCompactionLatencyMs Czas (w milisach) wziął na kompaktowanie bazy danych RocksDB (tło i opcjonalne kompaktowanie zainicjowane podczas zatwierdzania).
rocksdbWriterStallLatencyMs Czas (w milis) zapis utknął w martwym punkcie z powodu kompaktowania tła lub opróżniania memtables na dysku.
rocksdbTotalBytesReadThroughIterator Niektóre operacje stanowe (takie jak przetwarzanie limitu czasu w flatMapGroupsWithState agregacjach okiennych lub znakowanie wodne) wymagają odczytywania całych danych w bazie danych za pośrednictwem iteratora. Całkowity rozmiar nieskompresowanych danych odczytywanych przy użyciu iteratora.