Поделиться через


Чтение сведений о состоянии структурированной потоковой передачи

Операции DataFrame или функции табличного значения SQL можно использовать для запроса данных о состоянии и метаданных структурированной потоковой передачи. Используйте эти функции для отслеживания сведений о состоянии для запросов с отслеживанием состояния структурированной потоковой передачи, которые могут быть полезны для мониторинга и отладки.

Для запроса данных о состоянии или метаданных необходимо иметь доступ на чтение к пути контрольной точки для потокового запроса. Функции, описанные в этой статье, предоставляют доступ только для чтения к данным состояния и метаданным. Для запроса сведений о состоянии можно использовать только пакетную семантику чтения.

Примечание.

Невозможно запрашивать сведения о состоянии для декларативных конвейеров Lakeflow Spark, потоковых таблиц или материализованных представлений. Невозможно запрашивать сведения о состоянии с помощью бессерверных вычислений или вычислений, настроенных в стандартном режиме доступа.

Требования

  • Используйте одну из следующих конфигураций вычислений:
    • Databricks Runtime 16.3 и более поздних версий на вычислительных системах, настроенных в стандартном режиме доступа.
    • Databricks Runtime 14.3 LTS и выше на вычислительных системах, настроенных на режим выделенного доступа или без режима изоляции.
  • Доступ на чтение к пути контрольной точки, используемому потоковым запросом.

Чтение хранилища состояний структурированной потоковой передачи

Вы можете считывать сведения о хранилище состояний для запросов структурированной потоковой передачи, выполняемых в любой поддерживаемой среде выполнения Databricks. Используйте следующий синтаксис:

Питон

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Параметры API чтения состояний

API чтения состояний поддерживает следующие необязательные конфигурации:

Вариант Тип Значение по умолчанию Описание
batchId Длинный последний идентификатор партии Представляет целевой пакет для чтения. Чтобы запросить сведения о состоянии предыдущей версии запроса, укажите этот параметр. Пакет должен быть зафиксирован, но еще не очищен.
operatorId Длинный 0 Представляет целевой оператор, от которого следует читать. Этот параметр используется при использовании нескольких операторов с отслеживанием состояния.
storeName Строка "DEFAULT" Имя целевого хранилища состояний, из которого производится чтение. Этот параметр используется, когда оператор с отслеживанием состояния использует несколько экземпляров хранилища состояний. Для соединения потокового пара необходимо указать либо storeName, либо joinSide, но не оба одновременно.
joinSide Строка ("слева" или "справа") Представляет целевую сторону для чтения. Этот параметр используется, когда пользователи хотят считывать состояние из соединения потоков.
stateVarName Строка Отсутствует Имя переменной состояния для чтения в рамках этого запроса. Имя переменной состояния — это уникальное имя, заданное каждой переменной в init функции оператора, используемой StatefulProcessor оператором transformWithState . Этот параметр обязателен, если используется оператор transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях.
readRegisteredTimers Булев неправда Установите true, чтобы обеспечить чтение зарегистрированных таймеров, используемых в операторе transformWithState. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях.
flattenCollectionTypes Булев правда Если true, выравнивает записи, возвращаемые для переменных состояния карты и списка. Если false, записи возвращаются с помощью Spark SQL Array или Map. Этот параметр применяется только к оператору transformWithState и игнорируется для других операторов. Доступно в Databricks Runtime 16.2 и более поздних версиях.

Возвращенные данные имеют следующую схему:

Столбец Тип Описание
key Структура (дополнительный тип, производный от ключа состояния) Ключ для записи оператора с отслеживанием состояния в контрольной точке состояния.
value Структура (дополнительный тип, производный от значения состояния) Значение записи состояния оператора в контрольной точке состояния.
partition_id Целое Раздел контрольной точки состояния, который содержит запись оператора с сохранением состояния.

См. табличную функцию read_statestore.

Чтение метаданных состояния структурированной потоковой передачи

Внимание

Для записи метаданных состояния необходимо выполнить потоковые запросы в Databricks Runtime 14.2 или более поздней версии. Файлы метаданных состояния не нарушают обратную совместимость. Если вы решили выполнить потоковый запрос в Databricks Runtime 14.1 или ниже, существующие файлы метаданных состояния игнорируются и новые файлы метаданных состояния не записываются.

Вы можете считывать сведения о метаданных состояния для запросов структурированной потоковой передачи, выполняемых в Databricks Runtime 14.2 или более поздней версии. Используйте следующий синтаксис:

Питон

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Возвращенные данные имеют следующую схему:

Столбец Тип Описание
operatorId Целое Целочисленный идентификатор оператора потоковой передачи с отслеживанием состояния.
operatorName Целое Имя оператора потоковой передачи состояния.
stateStoreName Строка Имя хранилища состояний оператора.
numPartitions Целое Количество секций хранилища состояний.
minBatchId Длинный Минимальный идентификатор пакета, доступный для запроса состояния.
maxBatchId Длинный Максимальный идентификатор пакета, доступный для запроса состояния.

Примечание.

Значения идентификатора пакета, предоставленные minBatchId и maxBatchId, показывают состояние на момент записи контрольной точки. Старые пакеты автоматически очищаются с помощью микро-пакетного выполнения, поэтому указанное здесь значение не гарантируется по-прежнему доступно.

См. табличную функцию read_state_metadata.

Пример: Запрос одной стороны соединения двух потоков

Используйте следующий синтаксис, чтобы запросить левую сторону stream-stream соединения.

Питон

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Пример: Запрос к хранилищу состояний для потока с несколькими операторами состояния

В этом примере средство чтения метаданных состояния используется для сбора сведений о метаданных потокового запроса с несколькими операторами с отслеживанием состояния, а затем использует результаты метаданных в качестве параметров для средства чтения состояний.

Средство чтения метаданных состояния принимает путь контрольной точки в качестве единственного варианта, как показано в следующем примере синтаксиса:

Питон

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

В следующей таблице представлен пример выходных данных метаданных хранилища состояний:

operatorId имяОператора stateStoreName числоРазделов minBatchId maxBatchId
0 stateStoreSave по умолчанию 200 0 13 (тринадцать)
1 dedupeWithinWatermark по умолчанию 200 0 13 (тринадцать)

Чтобы получить результаты для dedupeWithinWatermark оператора, запросите средство чтения состояния, используя опцию operatorId, как показано в следующем примере:

Питон

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);