Поделиться через


Служебная шина помещает выходные данные из Azure Stream Analytics в очередь

Очереди служебной шины доставляют сообщения конкурирующим потребителям по типу FIFO. Как правило, сообщения принимаются и обрабатываются получателями в том порядке, в котором они были добавлены в очередь. Каждое сообщение получается и обрабатывается только одним объектом-получателем сообщения.

На уровне совместимости 1.2 Azure Stream Analytics использует для записи в очереди и разделы служебной шины протокол передачи сообщений AMQP. AMQP позволяет создавать кроссплатформенные гибридные приложения, использующие протокол открытого стандарта.

Конфигурация выходных данных

В таблице ниже приведены имена и описание свойств для создания выходных данных очереди.

Имя свойства Description
Псевдоним выходных данных Понятное имя, которое используется в запросах для направления выходных данных запроса в очередь служебной шины.
Пространство имен служебной шины Контейнер для набора сущностей обмена сообщениями.
Имя очереди Имя очереди служебной шины.
Имя политики очереди При создании очереди можно также создать политики общего доступа на вкладке "Настройка очереди". Каждая политика общего доступа имеет имя, заданные разрешения и ключи доступа.
Ключ политики очереди Ключ общего доступа, используемый для аутентификации доступа к пространству имен Служебной шины.
Формат сериализации событий Формат сериализации для выходных данных. Поддерживаются форматы JSON, CSV и Avro.
Кодировка В настоящее время единственным поддерживаемым форматом кодирования файлов CSV и JSON является UTF-8.
Разделитель Применяется только для сериализации в формате CSV. Служба Stream Analytics позволяет использовать ряд распространенных разделителей для сериализации данных в формате CSV. Поддерживаются такие разделители: запятая, точка с запятой, пробел, табуляция и вертикальная черта.
Форматировать Применимо только для типа JSON. Вариант строки-разделители предусматривает форматирование выходных данных таким образом, что каждый объект JSON будет отделен новой строкой. Если выбрать вариант строки-разделители, то JSON считывает по одному объекту за раз. Все содержимое само по себе не будет допустимым форматом JSON. Вариант массив означает, что выходные данные будут отформатированы как массив объектов JSON.
Столбцы свойств Необязательно. Столбцы с разделителями-запятыми, которые необходимо присоединить как пользовательские свойства исходящего сообщения вместо полезных данных. Дополнительные сведения об этой функции см. в разделе Свойства пользовательских метаданных для выходных данных.
Столбцы системных свойств Необязательно. Пары "ключ-значение" системных свойств и соответствующих имен столбцов, которые необходимо присоединить к исходящему сообщению вместо полезных данных.

Количество разделов основано на размере и номере SKU служебной шины. Ключ раздела — это уникальное целое значение для каждого раздела.

Секционирование

Секционирование выбирается автоматически. Количество разделов основано на размере и номере SKU служебной шины. Ключ раздела — это уникальное целое значение для каждого раздела. Число модулей записи вывода совпадает с количеством секций в очереди выходных данных.

Размер выходного пакета

Максимальный размер сообщения составляет 256 КБ на сообщение для стандартного уровня и 1 МБ для уровня "Премиум". Дополнительные сведения см. в статье об ограничениях служебной шины. Для оптимизации используйте одно событие на сообщение.

Свойства пользовательских метаданных для выходных данных

Столбцы запросов можно прикреплять к исходящим сообщениям как пользовательские свойства. Эти столбцы не переходят в полезные данные. Свойства представлены в виде словаря в выходном сообщении. Ключ — это имя столбца, а значение — это значение столбца в словаре свойств. Поддерживаются все типы данных Stream Analytics, кроме записи и массива.

В следующем примере мы добавим в метаданные два поля DeviceId и DeviceStatus.

  1. Используйте следующий запрос:

    select *, DeviceId, DeviceStatus from iotHubInput
    
  2. Настройте DeviceId,DeviceStatus как столбцы свойств в выходных данных.

    Property columns

На следующем рисунке изображены ожидаемые свойства выходного сообщения, проверенные в EventHub с помощью обозревателя служебной шины.

Event custom properties

Свойства системы

Столбцы запросов можно присоединять как системные свойства к исходящим сообщениям очереди или раздела служебной шины.

Эти столбцы не попадают в полезные данные, а соответствующее системное свойство ServiceBusMessage заполняется значениями столбца запроса. Эти системные свойства поддерживаются — MessageId, ContentType, Label, PartitionKey, ReplyTo, SessionId, CorrelationId, To, ForcePersistence, TimeToLive, ScheduledEnqueueTimeUtc.

Строковые значения этих столбцов анализируются как соответствующие типы значений системных свойств, а все ошибки синтаксического анализа обрабатываются как ошибки данных. Это поле предоставляется в формате объекта JSON. Далее приведены сведения об этом формате:

  • Используйте фигурные скобки {}.
  • Записываются в виде пар "ключ — значение".
  • Ключи и значения должны быть строковыми.
  • Ключ — это имя системного свойства, а значение — имя столбца запроса.
  • Ключи и значения разделяются двоеточием.
  • Каждая пара "ключ — значение" разделяется запятой.

В этом примере показано, как использовать это свойство —

  • Запрос: select *, column1, column2 INTO queueOutput FROM iotHubInput
  • Столбцы системных свойств: { "MessageId": "column1", "PartitionKey": "column2"}

Задает MessageId в сообщениях очереди служебной шины со значениями column1, а для PartitionKey задаются значения column2.

Следующие шаги