Поделиться через


Чтение сведений о состоянии структурированной потоковой передачи

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать операции DataFrame или функции табличного значения SQL для запроса данных о состоянии структурированной потоковой передачи и метаданных. Эти функции можно использовать для наблюдения за сведениями о состоянии для запросов с отслеживанием состояния структурированной потоковой передачи, которые могут быть полезны для мониторинга и отладки.

Для запроса данных о состоянии или метаданных необходимо иметь доступ на чтение к пути контрольной точки для потокового запроса. Функции, описанные в этой статье, предоставляют доступ только для чтения к данным состояния и метаданным. Для запроса сведений о состоянии можно использовать только пакетную семантику чтения.

Примечание.

Невозможно запрашивать сведения о состоянии конвейеров Delta Live Tables, потоковых таблиц или материализованных представлений.

Чтение хранилища состояний структурированной потоковой передачи

Вы можете считывать сведения о хранилище состояний для запросов структурированной потоковой передачи, выполняемых в любой поддерживаемой среде выполнения Databricks. Используйте следующий синтаксис:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Поддерживаются следующие необязательные конфигурации:

Вариант Тип Default value Description
batchId Long последний идентификатор пакетной службы Представляет целевой пакет для чтения. Укажите этот параметр для запроса сведений о состоянии для предыдущего состояния запроса. Пакет должен быть зафиксирован, но еще не очищен.
operatorId Long 0 Представляет целевой оператор для чтения. Этот параметр используется при использовании нескольких операторов с отслеживанием состояния.
storeName Строка "DEFAULT" Представляет имя целевого хранилища состояний для чтения. Этот параметр используется, когда оператор с отслеживанием состояния использует несколько экземпляров хранилища состояний. joinSide Либо storeName или должны быть указаны для соединения потокового пара, но не оба.
joinSide Строка ("слева" или "справа") Представляет целевую сторону для чтения. Этот параметр используется, когда пользователи хотят считывать состояние из соединения stream-stream.

Возвращенные данные имеют следующую схему:

Column Type Описание
key Структура (дополнительный тип, производный от ключа состояния) Ключ для записи оператора с отслеживанием состояния в контрольной точке состояния.
value Структура (дополнительный тип, производный от значения состояния) Значение записи оператора с отслеживанием состояния в контрольной точке состояния.
partition_id Целое Раздел контрольной точки состояния, содержащей запись оператора с отслеживанием состояния.

Чтение метаданных состояния структурированной потоковой передачи

Внимание

Для записи метаданных состояния необходимо выполнить потоковые запросы в Databricks Runtime 14.2 или более поздней версии. Файлы метаданных состояния не нарушают обратную совместимость. Если вы решили выполнить потоковый запрос в Databricks Runtime 14.1 или ниже, существующие файлы метаданных состояния игнорируются и новые файлы метаданных состояния не записываются.

Вы можете считывать сведения о метаданных состояния для запросов структурированной потоковой передачи, выполняемых в Databricks Runtime 14.2 или более поздней версии. Используйте следующий синтаксис:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Возвращенные данные имеют следующую схему:

Column Type Описание
operatorId Целое Целочисленный идентификатор оператора потоковой передачи с отслеживанием состояния.
operatorName Целое Имя оператора потоковой передачи с отслеживанием состояния.
stateStoreName Строка Имя хранилища состояний оператора.
numPartitions Целое Количество секций хранилища состояний.
minBatchId Long Минимальный идентификатор пакета, доступный для запроса состояния.
maxBatchId Long Максимальный идентификатор пакета, доступный для запроса состояния.

Примечание.

Значения идентификатора пакета, предоставленные minBatchId и maxBatchId отражают состояние во время записи контрольной точки. Старые пакеты автоматически очищаются с помощью микро-пакетного выполнения, поэтому указанное здесь значение не гарантируется по-прежнему доступно.