Триггер очереди

Завершено

Очередь сообщений — это программный компонент, который используется для обработки обмена сообщениями между процессами, потоками или приложениями. Очередь может хранить сообщение, а рабочие роли могут вовремя получить сообщение.

Очереди сообщений могут создавать в облаке события с полезной нагрузкой. Служба, например Функции Azure, может прослушивать такие сообщения и выполнять их код при публикации сообщения.

Работа с очередями сообщений

Чтобы функция Azure могла воспринять сообщение из очереди сообщений, ей необходим триггер и, возможно, привязка.

Функция Azure должна ожидать передачи данных из определенной очереди, чтобы ее код активировался при публикации нового сообщения в этой очереди. Чтобы настроить триггер, необходимо предоставить правильные учетные данные, чтобы код триггера знал, как подключиться к очереди сообщений. Необходимо создать запись в файле function.json для функции, которая ожидает передачи данных в очередь. В элементе bindings укажите следующие свойства записи:

Свойство Значение
name Имя, на которое можно ссылаться в коде
type queueTrigger
direction in
queueName Как называется очередь
connection Переменная конфигурации в local.settings.json

Пример записи можно определить следующим образом:

{
    "name": "myQueueItem",
    "type": "queueTrigger",
    "direction": "in",
    "queueName": "messages-incoming",
    "connection": "AzureWebJobsStorage"
  }

Если эта очередь находится в учетной записи хранения, значение AzureWebJobsStorage является значение строки подключения.

При использовании сообщения из очереди привязка не является необходимой. Однако если вы хотите выполнить запись в очередь, потребуется выходная привязка. Имея такую привязку, вы получаете ссылку на требующуюся очередь.

Примечание.

В настоящее время для очередей поддерживаются только выходные привязки.

Локальная разработка

Как разработчику вам требуется использовать короткие циклы обратной связи. Кроме того, необходимо обеспечить, чтобы разработка была максимально приближена к рабочей среде. Для достижения обеих этих целей можно использовать эмулятор очереди.

Эмулятор очереди позволяет симулировать реальные сообщения в очереди, на которые будет реагировать функция Azure. Для использования эмулятора выполните следующие действия:

  1. Установка эмулятора. Выполните поиск Azurite в Visual Studio Code или загрузите расширение Azurite.

  2. Чтобы использовать функции эмулятора, запустите его, нажав Azure: запуск службы очередей в палитре команд.

    При выполнении этой команды запускается прослушиватель под названием Обозреватель службы хранилища Azure, который может выбрать другое приложение. Обозреватель службы хранилища — это клиентское приложение, которое позволяет просматривать облачные ресурсы и использовать функции эмулятора.

  3. Загрузка Обозревателя службы хранилища Azure. Откройте приложение, и вы увидите следующее уведомление о том, что эмулятор работает:

    Screenshot that shows the emulator in Azure Storage Explorer.

  4. Создайте очередь в эмуляторе. Эта очередь будет использоваться как часть настройки конечной точки функции. Щелкнув правой кнопкой мыши элементу очереди, можно создать новую очередь.

  5. Чтобы убедиться, что приложение Функций использует эмулятор, необходимо правильно задать строку подключения. Откройте local.settings.json, выберите элемент AzureWebJobsStorage и присвойте ему значение "UseDevelopmentStorage=true".

    Примечание.

    Не забудьте задать это свойство по-другому при переходе в облако. Оно должно указывать на фактический ресурс в Azure в рабочей среде.

Создание функции

Теперь вы настроили локальный эмулятор, и в нем есть очередь. Вы также настроили проект таким образом, чтобы он указывал на локальный эмулятор. Теперь необходимо создать функцию для работы с триггером очереди.

Создание конечной точки функции

У вас все готово для создания функции, которая может обрабатывать входящие сообщения очереди. Создайте папку для функции и присвойте ей имя, например queueTrigger. Затем создайте файл function.json и наполните его следующим содержимым:

{
  "bindings": [{
    "name" "queueItem",
    "type": "queueTrigger",
    "direction": "in",
    "queueName" : "items",
    "connection": "AzureWebJobsStorage"
  }]
}

Значение элемента name важно, так как вы будете ссылаться на него позже в коде для анализа входящих данных из очереди. Оно должно иметь тип queueTrigger, чтобы очередь активировала его при появлении нового сообщения.

Элемент queueName уникальным образом определяет, с какой очередью вы взаимодействуете. То, что вы вводите здесь, должно совпадать с названием очереди в эмуляторе или позже с названием реальной очереди в Azure.

Элемент connection указывает на значение элемента AzureWebJobsStorage в файле local.settings.json.

Обработка сообщения очереди

Для обработки входящего сообщения очереди необходимо написать код, который может анализировать необходимое сообщение. На этом этапе можно решить, что делать дальше. Например, вы можете запустить веб-запрос, поместить это сообщение в другую очередь или отправить сообщение в базу данных.

Настройка маршрута

Для обработки входящих запросов необходим маршрут. Функции Azure будут обрабатывать запросы к очереди на корневом уровне. Запрос будет вызываться как http://localhost:<port>/queueTrigger при настройке маршрута следующим образом:

http.HandleFunc("/queueTrigger", handleQueueTrigger)

Декодирование запроса

При отправке вам сообщения очереди оно имеет следующий вид:

{
  "Data": {
    "queueItem": "your message"
  },
  "Metadata": {
    "DequeueCount": 1,
    "ExpirationTime": "2019-10-16T17:58:31+00:00",
    "Id": "800ae4b3-bdd2-4c08-badd-f08e5a34b865",
    "InsertionTime": "2019-10-09T17:58:31+00:00",
    "NextVisibleTime": "2019-10-09T18:08:32+00:00",
    "PopReceipt": "AgAAAAMAAAAAAAAAAgtnj8x+1QE=",
    "sys": {
      "MethodName": "QueueTrigger",
      "UtcNow": "2019-10-09T17:58:32.2205399Z",
      "RandGuid": "24ad4c06-24ad-4e5b-8294-3da9714877e9"
    }
  }
}

Для декодирования входящего запроса необходима вспомогательная структура, моделирующая предыдущее сообщение. Он должен выглядеть так:

type InvokeRequest {
   Data map[string]json.RawMessage
   Metadata map[string]interface{}
}

Начните писать код для принятия и декодирования такого входящего запроса:

func handleQueueTrigger(w http.ResponseWrite, r *http.Request) {
   var invokeRequest InvokeRequest
   d := json.NewDecoder(r.Body)
   d.Decode(&invokeRequest)
}

Теперь вы находитесь на этапе, когда запрос был декодирован, но необходимо проанализировать само сообщение очереди.

Анализ сообщения очереди

После декодирования запроса сообщение очереди может быть извлечено из запроса в свойство Data. Также необходимо обратиться к сообщению, используя значение свойства name, настроенное в файле function.json. Код для извлечения сообщения состоит из одной строки и выглядит примерно так:

invokeRequest.Data["queueItem"]

Так как вам нужно иметь возможность читать это сообщение в виде понятного текста, вы воспользуетесь библиотекой JSON и проанализируете его. Библиотека JSON использует метод Unmarshal(), извлекающий два параметра: анализируемое сообщение и переменную для размещения проанализированного сообщения. Поэтому ваш код должен выглядеть следующим образом:

var parsedMessage string
json.Unmarshal(invokeRequest.Data["queueItem"], &parsedMessage)

На этом этапе parsedMessage содержит сообщение. Если вы хотите вывести его на консоль, используйте следующий код:

fmt.Println(parsedMessage) // your message

Примечание.

Если сообщение является чем-то более сложным, чем строка, то структура parsedMessage должна соответствовать форме того, на что указывает queueMessage.

Активация сообщения

Чтобы протестировать приложение, можно использовать Обозреватель службы хранилища Azure. На правой панели средства нажмите кнопку Добавить сообщение, чтобы создать сообщение в очереди.

Screenshot that shows the button for adding a message on the queue.

Если у вас есть приложение Функций и на этом этапе оно запущено, оно активирует привязку, и будет вызван код.