Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Операции DataFrame или функции табличного значения SQL можно использовать для запроса данных о состоянии и метаданных структурированной потоковой передачи. Используйте эти функции для отслеживания сведений о состоянии для запросов с отслеживанием состояния структурированной потоковой передачи, которые могут быть полезны для мониторинга и отладки.
Для запроса данных о состоянии или метаданных необходимо иметь доступ на чтение к пути контрольной точки для потокового запроса. Функции, описанные в этой статье, предоставляют доступ только для чтения к данным состояния и метаданным. Для запроса сведений о состоянии можно использовать только пакетную семантику чтения.
Примечание.
Невозможно запрашивать сведения о состоянии для декларативных конвейеров Lakeflow Spark, потоковых таблиц или материализованных представлений. Невозможно запрашивать сведения о состоянии с помощью бессерверных вычислений или вычислений, настроенных в стандартном режиме доступа.
Требования
- Используйте одну из следующих конфигураций вычислений:
- Databricks Runtime 16.3 и более поздних версий на вычислительных системах, настроенных в стандартном режиме доступа.
- Databricks Runtime 14.3 LTS и выше на вычислительных системах, настроенных на режим выделенного доступа или без режима изоляции.
- Доступ на чтение к пути контрольной точки, используемому потоковым запросом.
Чтение хранилища состояний структурированной потоковой передачи
Вы можете считывать сведения о хранилище состояний для запросов структурированной потоковой передачи, выполняемых в любой поддерживаемой среде выполнения Databricks. Используйте следующий синтаксис:
Питон
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Параметры API чтения состояний
API чтения состояний поддерживает следующие необязательные конфигурации:
| Вариант | Тип | Значение по умолчанию | Описание |
|---|---|---|---|
batchId |
Длинный | последний идентификатор партии | Представляет целевой пакет для чтения. Чтобы запросить сведения о состоянии предыдущей версии запроса, укажите этот параметр. Пакет должен быть зафиксирован, но еще не очищен. |
operatorId |
Длинный | 0 | Представляет целевой оператор, от которого следует читать. Этот параметр используется при использовании нескольких операторов с отслеживанием состояния. |
storeName |
Строка | "DEFAULT" | Имя целевого хранилища состояний, из которого производится чтение. Этот параметр используется, когда оператор с отслеживанием состояния использует несколько экземпляров хранилища состояний. Для соединения потокового пара необходимо указать либо storeName, либо joinSide, но не оба одновременно. |
joinSide |
Строка ("слева" или "справа") | Представляет целевую сторону для чтения. Этот параметр используется, когда пользователи хотят считывать состояние из соединения потоков. | |
stateVarName |
Строка | Отсутствует | Имя переменной состояния для чтения в рамках этого запроса. Имя переменной состояния — это уникальное имя, заданное каждой переменной в init функции оператора, используемой StatefulProcessor оператором transformWithState . Этот параметр обязателен, если используется оператор transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
readRegisteredTimers |
Булев | неправда | Установите true, чтобы обеспечить чтение зарегистрированных таймеров, используемых в операторе transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
flattenCollectionTypes |
Булев | правда | Если true, выравнивает записи, возвращаемые для переменных состояния карты и списка. Если false, записи возвращаются с помощью Spark SQL Array или Map. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
Возвращенные данные имеют следующую схему:
| Столбец | Тип | Описание |
|---|---|---|
key |
Структура (дополнительный тип, производный от ключа состояния) | Ключ для записи оператора с отслеживанием состояния в контрольной точке состояния. |
value |
Структура (дополнительный тип, производный от значения состояния) | Значение записи состояния оператора в контрольной точке состояния. |
partition_id |
Целое | Раздел контрольной точки состояния, который содержит запись оператора с сохранением состояния. |
См. табличную функцию read_statestore.
Чтение метаданных состояния структурированной потоковой передачи
Внимание
Для записи метаданных состояния необходимо выполнить потоковые запросы в Databricks Runtime 14.2 или более поздней версии. Файлы метаданных состояния не нарушают обратную совместимость. Если вы решили выполнить потоковый запрос в Databricks Runtime 14.1 или ниже, существующие файлы метаданных состояния игнорируются и новые файлы метаданных состояния не записываются.
Вы можете считывать сведения о метаданных состояния для запросов структурированной потоковой передачи, выполняемых в Databricks Runtime 14.2 или более поздней версии. Используйте следующий синтаксис:
Питон
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Возвращенные данные имеют следующую схему:
| Столбец | Тип | Описание |
|---|---|---|
operatorId |
Целое | Целочисленный идентификатор оператора потоковой передачи с отслеживанием состояния. |
operatorName |
Целое | Имя оператора потоковой передачи состояния. |
stateStoreName |
Строка | Имя хранилища состояний оператора. |
numPartitions |
Целое | Количество секций хранилища состояний. |
minBatchId |
Длинный | Минимальный идентификатор пакета, доступный для запроса состояния. |
maxBatchId |
Длинный | Максимальный идентификатор пакета, доступный для запроса состояния. |
Примечание.
Значения идентификатора пакета, предоставленные minBatchId и maxBatchId, показывают состояние на момент записи контрольной точки. Старые пакеты автоматически очищаются с помощью микро-пакетного выполнения, поэтому указанное здесь значение не гарантируется по-прежнему доступно.
См. табличную функцию read_state_metadata.
Пример: Запрос одной стороны соединения двух потоков
Используйте следующий синтаксис, чтобы запросить левую сторону stream-stream соединения.
Питон
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Пример: Запрос к хранилищу состояний для потока с несколькими операторами состояния
В этом примере средство чтения метаданных состояния используется для сбора сведений о метаданных потокового запроса с несколькими операторами с отслеживанием состояния, а затем использует результаты метаданных в качестве параметров для средства чтения состояний.
Средство чтения метаданных состояния принимает путь контрольной точки в качестве единственного варианта, как показано в следующем примере синтаксиса:
Питон
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
В следующей таблице представлен пример выходных данных метаданных хранилища состояний:
| operatorId | имяОператора | stateStoreName | числоРазделов | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | по умолчанию | 200 | 0 | 13 (тринадцать) |
| 1 | dedupeWithinWatermark | по умолчанию | 200 | 0 | 13 (тринадцать) |
Чтобы получить результаты для dedupeWithinWatermark оператора, запросите средство чтения состояния, используя опцию operatorId, как показано в следующем примере:
Питон
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);