Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Операции DataFrame или функции табличного значения SQL можно использовать для запроса данных о состоянии и метаданных структурированной потоковой передачи. Используйте эти функции для отслеживания сведений о состоянии для запросов с отслеживанием состояния структурированной потоковой передачи, которые могут быть полезны для мониторинга и отладки.
Для запроса данных о состоянии или метаданных необходимо иметь доступ на чтение к пути контрольной точки для потокового запроса. Функции, описанные в этой статье, предоставляют доступ только для чтения к данным состояния и метаданным. Для запроса сведений о состоянии можно использовать только пакетную семантику чтения.
Примечание.
Невозможно запрашивать сведения о состоянии для декларативных конвейеров Lakeflow Spark, потоковых таблиц или материализованных представлений. Невозможно запрашивать сведения о состоянии с помощью бессерверных вычислений или вычислений, настроенных в стандартном режиме доступа.
Требования
- Используйте одну из следующих конфигураций вычислений:
- Databricks Runtime 16.3 и более поздних версий на вычислительных системах, настроенных в стандартном режиме доступа.
- Databricks Runtime 14.3 LTS и выше на вычислительных системах, настроенных на режим выделенного доступа или без режима изоляции.
- Доступ на чтение к пути контрольной точки, используемому потоковым запросом.
Чтение хранилища состояний структурированной потоковой передачи
Вы можете считывать сведения о хранилище состояний для запросов структурированной потоковой передачи, выполняемых в любой поддерживаемой среде выполнения Databricks. Используйте следующий синтаксис:
Питон
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Параметры API чтения состояний
API чтения состояний поддерживает следующие необязательные конфигурации:
| Вариант | Тип | Значение по умолчанию | Описание |
|---|---|---|---|
batchId |
Длинный | последний идентификатор партии | Представляет целевой пакет для чтения. Чтобы запросить сведения о состоянии предыдущей версии запроса, укажите этот параметр. Пакет должен быть зафиксирован, но еще не очищен. |
operatorId |
Длинный | 0 | Представляет целевой оператор, от которого следует читать. Этот параметр используется при использовании нескольких операторов с отслеживанием состояния. |
storeName |
Строка | "DEFAULT" | Имя целевого хранилища состояний, из которого производится чтение. Этот параметр используется, когда оператор с отслеживанием состояния использует несколько экземпляров хранилища состояний. Для соединения потокового пара необходимо указать либо storeName, либо joinSide, но не оба одновременно. |
joinSide |
Строка ("слева" или "справа") | Представляет целевую сторону для чтения. Этот параметр используется, когда пользователи хотят считывать состояние из соединения потоков. | |
stateVarName |
Строка | Отсутствует | Имя переменной состояния для чтения в рамках этого запроса. Имя переменной состояния — это уникальное имя, заданное каждой переменной в init функции оператора, используемой StatefulProcessor оператором transformWithState . Этот параметр обязателен, если используется оператор transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
readRegisteredTimers |
Булев | неправда | Установите true, чтобы обеспечить чтение зарегистрированных таймеров, используемых в операторе transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
flattenCollectionTypes |
Булев | правда | Если true, выравнивает записи, возвращаемые для переменных состояния карты и списка. Если false, записи возвращаются с помощью Spark SQL Array или Map. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях. |
Возвращенные данные имеют следующую схему:
| Столбец | Тип | Описание |
|---|---|---|
key |
Структура (дополнительный тип, производный от ключа состояния) | Ключ для записи оператора с отслеживанием состояния в контрольной точке состояния. |
value |
Структура (дополнительный тип, производный от значения состояния) | Значение записи состояния оператора в контрольной точке состояния. |
partition_id |
Целое | Раздел контрольной точки состояния, который содержит запись оператора с сохранением состояния. |
См. табличную функцию read_statestore.
Чтение метаданных состояния структурированной потоковой передачи
Внимание
Для записи метаданных состояния необходимо выполнить потоковые запросы в Databricks Runtime 14.2 или более поздней версии. Файлы метаданных состояния не нарушают обратную совместимость. Если вы решили выполнить потоковый запрос в Databricks Runtime 14.1 или ниже, существующие файлы метаданных состояния игнорируются и новые файлы метаданных состояния не записываются.
Вы можете считывать сведения о метаданных состояния для запросов структурированной потоковой передачи, выполняемых в Databricks Runtime 14.2 или более поздней версии. Используйте следующий синтаксис:
Питон
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Возвращенные данные имеют следующую схему:
| Столбец | Тип | Описание |
|---|---|---|
operatorId |
Целое | Целочисленный идентификатор оператора потоковой передачи с отслеживанием состояния. |
operatorName |
Целое | Имя оператора потоковой передачи состояния. |
stateStoreName |
Строка | Имя хранилища состояний оператора. |
numPartitions |
Целое | Количество секций хранилища состояний. |
minBatchId |
Длинный | Минимальный идентификатор пакета, доступный для запроса состояния. |
maxBatchId |
Длинный | Максимальный идентификатор пакета, доступный для запроса состояния. |
Примечание.
Значения идентификатора пакета, предоставленные minBatchId и maxBatchId, показывают состояние на момент записи контрольной точки. Старые пакеты автоматически очищаются с помощью микро-пакетного выполнения, поэтому указанное здесь значение не гарантируется по-прежнему доступно.
См. табличную функцию read_state_metadata.
Пример: Запрос одной стороны соединения двух потоков
Используйте следующий синтаксис, чтобы запросить левую сторону stream-stream соединения.
Питон
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Пример: Запрос к хранилищу состояний для потока с несколькими операторами состояния
В этом примере средство чтения метаданных состояния используется для сбора сведений о метаданных потокового запроса с несколькими операторами с отслеживанием состояния, а затем использует результаты метаданных в качестве параметров для средства чтения состояний.
Средство чтения метаданных состояния принимает путь контрольной точки в качестве единственного варианта, как показано в следующем примере синтаксиса:
Питон
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
В следующей таблице представлен пример выходных данных метаданных хранилища состояний:
| operatorId | имяОператора | stateStoreName | числоРазделов | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | по умолчанию | 200 | 0 | 13 (тринадцать) |
| 1 | dedupeWithinWatermark | по умолчанию | 200 | 0 | 13 (тринадцать) |
Чтобы получить результаты для dedupeWithinWatermark оператора, запросите средство чтения состояния, используя опцию operatorId, как показано в следующем примере:
Питон
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);