Odczytywanie informacji o stanie strumieniowania ustrukturyzowanego

Operacje ramki danych lub funkcje wartości tabeli SQL umożliwiają wykonywanie zapytań dotyczących danych i metadanych stanu przesyłania strumieniowego ze strukturą. Te funkcje umożliwiają obserwowanie informacji o stanie zapytań stanowych przesyłania strumieniowego ze strukturą, które mogą być przydatne do monitorowania i debugowania.

Aby wysyłać zapytania o dane stanu lub metadane, musisz mieć dostęp do odczytu do ścieżki punktu kontrolnego zapytania strumieniowego. Funkcje opisane w tym artykule zapewniają dostęp do danych stanu i metadanych wyłącznie do odczytu. Można tylko korzystać z semantyki odczytu wsadowego do zapytań dotyczących informacji o stanie.

Uwaga

Nie można wykonywać zapytań dotyczących informacji o stanie potoków deklaratywnych Lakeflow Spark, tabel przesyłania strumieniowego ani zmaterializowanych widoków. Nie można wykonywać zapytań dotyczących informacji o stanie przy użyciu bezserwerowych zasobów obliczeniowych lub obliczeń skonfigurowanych w trybie dostępu standardowego.

Wymagania

  • Użyj jednej z następujących konfiguracji obliczeniowych:
    • Środowisko Databricks Runtime 16.3 lub nowsze na potrzeby obliczeń skonfigurowanych ze standardowym trybem dostępu.
    • Środowisko Databricks Runtime 14.3 LTS lub nowsze na obliczeniach skonfigurowanych w trybie dedykowanej izolacji dostępu lub bez izolacji.
  • Dostęp do odczytu ścieżki punktu kontrolnego używanej przez zapytanie przesyłania strumieniowego.

Odczyt magazynu stanów w Structured Streaming

Możesz odczytać informacje o sklepie stanów dotyczące zapytań Strukturalnego Przesyłania Strumieniowego wykonywanych w dowolnej obsługiwanej wersji Databricks Runtime. Użyj następującej składni:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Opcje i schemat interfejsu API czytnika stanu

Aby uzyskać pełną listę opcji formatu statestore, patrz Magazyn stanu.

Dane wyjściowe mają następujący schemat:

Kolumna Typ opis
key Struktura (dalszy typ pochodzący z klucza stanu) Klucz rekordu operatora stanowego w punkcie kontrolnym stanu.
value Struktura (dalszy typ pochodzący z wartości stanu) Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu.
partition_id Liczba całkowita Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego.

W środowisku Databricks Runtime 16.4 LTS i nowszym, gdy readChangeFeed opcja jest ustawiona na true, dane wyjściowe mają następujący schemat:

Kolumna Typ opis
batch_id Długi Identyfikator partii, do której należy zmiana statusu.
change_type Sznurek Typ zmiany zastosowanej przez partię: update dla wstawień i aktualizacji oraz delete dla usunięć.
key Struktura (dalszy typ pochodzący z klucza stanu) Klucz rekordu operatora stanowego w punkcie kontrolnym stanu.
value Struktura (dalszy typ pochodzący z wartości stanu) Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu. null dla rekordów, w których change_type ma wartość delete.
partition_id Liczba całkowita Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego.

Zobacz read_statestore funkcji wartości tabeli.

Odczytywanie zmian stanu w Structured Streaming

Dostępne w środowisku Databricks Runtime 16.4 LTS lub nowszym. Aby sprawdzić, jak zmienia się stan między mikropartiami, zamiast wyświetlać pełny stan dla jednej mikropartii, ustaw readChangeFeed na true i określ changeStartBatchId. Opcjonalnie określ wartość changeEndBatchId. Aby uzyskać pełną listę opcji, zobacz State store.

Na przykład, aby odczytać zmiany stanu od partii 2 do najnowszej zatwierdzonej partii:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Schemat wyjściowy zawiera dodatkowe kolumny batch_id i change_type. Aby uzyskać pełny schemat, zobacz Opcje i schemat interfejsu API czytnika stanu.

Odczyt metadanych stanu Structured Streaming

Dostępne w środowisku Databricks Runtime 14.3 LTS lub nowszym. Możesz odczytać informacje o metadanych stanu dla zapytań Structured Streaming:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Zwrócone dane mają następujący schemat:

Kolumna Typ opis
operatorId Liczba całkowita Całkowity identyfikator operatora stanowego przesyłania strumieniowego.
operatorName Sznurek Nazwa stanowego operatora przesyłania strumieniowego.
stateStoreName Sznurek Nazwa magazynu stanowego operatora.
numPartitions Liczba całkowita Liczba partycji repozytorium stanu.
minBatchId Długi Minimalny identyfikator serii dostępny do zapytań o stan.
maxBatchId Długi Maksymalny identyfikator partii dostępny dla stanu wykonywania zapytań.

Uwaga

Wartości identyfikatorów partii dostarczone przez minBatchId i maxBatchId odzwierciedlają stan w momencie zapisu punktu kontrolnego. Stare partie są automatycznie czyszczone w wyniku mikroprzetwarzania partii, więc podana tutaj wartość nie jest gwarantowana, że będzie nadal dostępna.

Zobacz read_state_metadata funkcji wartości tabeli.

Przykład: wykonywanie zapytań po jednej stronie sprzężenia strumienia strumienia

Użyj następującej składni, aby wykonywać zapytania po lewej stronie łączenia strumieni:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Przykład: Wykonywanie zapytań dotyczących magazynu stanów dla strumienia z wieloma operatorami stanowymi

W tych przykładach użyto czytnika metadanych stanu do zbierania szczegółów metadanych zapytania przesyłania strumieniowego z wieloma operatorami stanowymi, a następnie używa wyników metadanych jako opcji czytnika stanu.

Czytnik metadanych stanu przyjmuje ścieżkę punktu kontrolnego jako jedyną opcję, jak w poniższym przykładzie składni:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

W poniższej tabeli przedstawiono przykładowe dane wyjściowe metadanych magazynu stanów:

operatorId Nazwa operatora stateStoreName liczbaPodziałów minBatchId maxBatchId
0 stateStoreSave domyślny 200 0 13
1 dedupeWithinWatermark domyślny 200 0 13

Aby uzyskać wyniki dla dedupeWithinWatermark operatora, wyślij zapytanie do czytnika stanu za pomocą operatorId opcji , jak w poniższym przykładzie:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);