Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Operacje ramki danych lub funkcje wartości tabeli SQL umożliwiają wykonywanie zapytań dotyczących danych i metadanych stanu przesyłania strumieniowego ze strukturą. Te funkcje umożliwiają obserwowanie informacji o stanie zapytań stanowych przesyłania strumieniowego ze strukturą, które mogą być przydatne do monitorowania i debugowania.
Aby wysyłać zapytania o dane stanu lub metadane, musisz mieć dostęp do odczytu do ścieżki punktu kontrolnego zapytania strumieniowego. Funkcje opisane w tym artykule zapewniają dostęp do danych stanu i metadanych wyłącznie do odczytu. Można tylko korzystać z semantyki odczytu wsadowego do zapytań dotyczących informacji o stanie.
Uwaga
Nie można wykonywać zapytań dotyczących informacji o stanie potoków deklaratywnych Lakeflow Spark, tabel przesyłania strumieniowego ani zmaterializowanych widoków. Nie można wykonywać zapytań dotyczących informacji o stanie przy użyciu bezserwerowych zasobów obliczeniowych lub obliczeń skonfigurowanych w trybie dostępu standardowego.
Wymagania
- Użyj jednej z następujących konfiguracji obliczeniowych:
- Środowisko Databricks Runtime 16.3 lub nowsze na potrzeby obliczeń skonfigurowanych ze standardowym trybem dostępu.
- Środowisko Databricks Runtime 14.3 LTS lub nowsze na obliczeniach skonfigurowanych w trybie dedykowanej izolacji dostępu lub bez izolacji.
- Dostęp do odczytu ścieżki punktu kontrolnego używanej przez zapytanie przesyłania strumieniowego.
Odczyt magazynu stanów w Structured Streaming
Możesz odczytać informacje o sklepie stanów dotyczące zapytań Strukturalnego Przesyłania Strumieniowego wykonywanych w dowolnej obsługiwanej wersji Databricks Runtime. Użyj następującej składni:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Opcje i schemat interfejsu API czytnika stanu
Aby uzyskać pełną listę opcji formatu statestore, patrz Magazyn stanu.
Dane wyjściowe mają następujący schemat:
| Kolumna | Typ | opis |
|---|---|---|
key |
Struktura (dalszy typ pochodzący z klucza stanu) | Klucz rekordu operatora stanowego w punkcie kontrolnym stanu. |
value |
Struktura (dalszy typ pochodzący z wartości stanu) | Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu. |
partition_id |
Liczba całkowita | Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego. |
W środowisku Databricks Runtime 16.4 LTS i nowszym, gdy readChangeFeed opcja jest ustawiona na true, dane wyjściowe mają następujący schemat:
| Kolumna | Typ | opis |
|---|---|---|
batch_id |
Długi | Identyfikator partii, do której należy zmiana statusu. |
change_type |
Sznurek | Typ zmiany zastosowanej przez partię: update dla wstawień i aktualizacji oraz delete dla usunięć. |
key |
Struktura (dalszy typ pochodzący z klucza stanu) | Klucz rekordu operatora stanowego w punkcie kontrolnym stanu. |
value |
Struktura (dalszy typ pochodzący z wartości stanu) | Wartość rekordu operatora z zachowaniem stanu w punkcie kontrolnym stanu.
null dla rekordów, w których change_type ma wartość delete. |
partition_id |
Liczba całkowita | Partycja punktu kontrolnego stanu, która zawiera rekord operatora stanowego. |
Zobacz read_statestore funkcji wartości tabeli.
Odczytywanie zmian stanu w Structured Streaming
Dostępne w środowisku Databricks Runtime 16.4 LTS lub nowszym. Aby sprawdzić, jak zmienia się stan między mikropartiami, zamiast wyświetlać pełny stan dla jednej mikropartii, ustaw readChangeFeed na true i określ changeStartBatchId. Opcjonalnie określ wartość changeEndBatchId. Aby uzyskać pełną listę opcji, zobacz State store.
Na przykład, aby odczytać zmiany stanu od partii 2 do najnowszej zatwierdzonej partii:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Schemat wyjściowy zawiera dodatkowe kolumny batch_id i change_type. Aby uzyskać pełny schemat, zobacz Opcje i schemat interfejsu API czytnika stanu.
Odczyt metadanych stanu Structured Streaming
Dostępne w środowisku Databricks Runtime 14.3 LTS lub nowszym. Możesz odczytać informacje o metadanych stanu dla zapytań Structured Streaming:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Zwrócone dane mają następujący schemat:
| Kolumna | Typ | opis |
|---|---|---|
operatorId |
Liczba całkowita | Całkowity identyfikator operatora stanowego przesyłania strumieniowego. |
operatorName |
Sznurek | Nazwa stanowego operatora przesyłania strumieniowego. |
stateStoreName |
Sznurek | Nazwa magazynu stanowego operatora. |
numPartitions |
Liczba całkowita | Liczba partycji repozytorium stanu. |
minBatchId |
Długi | Minimalny identyfikator serii dostępny do zapytań o stan. |
maxBatchId |
Długi | Maksymalny identyfikator partii dostępny dla stanu wykonywania zapytań. |
Uwaga
Wartości identyfikatorów partii dostarczone przez minBatchId i maxBatchId odzwierciedlają stan w momencie zapisu punktu kontrolnego. Stare partie są automatycznie czyszczone w wyniku mikroprzetwarzania partii, więc podana tutaj wartość nie jest gwarantowana, że będzie nadal dostępna.
Zobacz read_state_metadata funkcji wartości tabeli.
Przykład: wykonywanie zapytań po jednej stronie sprzężenia strumienia strumienia
Użyj następującej składni, aby wykonywać zapytania po lewej stronie łączenia strumieni:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Przykład: Wykonywanie zapytań dotyczących magazynu stanów dla strumienia z wieloma operatorami stanowymi
W tych przykładach użyto czytnika metadanych stanu do zbierania szczegółów metadanych zapytania przesyłania strumieniowego z wieloma operatorami stanowymi, a następnie używa wyników metadanych jako opcji czytnika stanu.
Czytnik metadanych stanu przyjmuje ścieżkę punktu kontrolnego jako jedyną opcję, jak w poniższym przykładzie składni:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
W poniższej tabeli przedstawiono przykładowe dane wyjściowe metadanych magazynu stanów:
| operatorId | Nazwa operatora | stateStoreName | liczbaPodziałów | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | domyślny | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | domyślny | 200 | 0 | 13 |
Aby uzyskać wyniki dla dedupeWithinWatermark operatora, wyślij zapytanie do czytnika stanu za pomocą operatorId opcji , jak w poniższym przykładzie:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);