Отправка связанных сообщений по порядку с использованием последовательного сопровождения в Azure Logic Apps с помощью служебной шины Microsoft Azure

Область применения: Azure Logic Apps (потребление)

Если необходимо отправлять связанные сообщения в определенном порядке, можно использовать шаблон последовательного сопровождения в Azure Logic Apps с соединителем служебной шины Microsoft Azure. В связанных сообщениях есть свойство, определяющее связь между этими сообщениями, например идентификатор сеанса в служебной шине Microsoft Azure.

К примеру, предположим, что у вас есть 10 сообщений для сеанса с именем "Сеанс 1" и 5 сообщений для сеанса с именем "Сеанс 2", которые отправляются в одну и ту же очередь служебной шины Microsoft Azure. Можно создать приложение логики, которое обрабатывает все находящиеся в очереди сообщения из "Сеанса 1" одним выполнением триггера, а все сообщения из "Сеанса 2" — следующим выполнением.

Общий шаблон последовательного сопровождения

В этой статье описано создание приложения логики, которое реализует этот шаблон с помощью шаблона Коррелированная упорядоченная доставка с помощью сеансов служебной шины. Этот шаблон определяет рабочий процесс приложения логики, который начинается с триггера При получении сообщения в очереди (просмотр и блокировка) соединителя служебной шины Microsoft Azure, получающего сообщения из очереди служебной шины Microsoft Azure. Вот краткое описание действий, выполняемых этим приложением логики:

  • Инициализация сеанса на основе сообщения, которое триггер считывает из очереди служебной шины Microsoft Azure.

  • Считывание и обработка всех сообщений из одного сеанса в очереди во время текущего выполнения рабочего процесса.

JSON-файл этого шаблона см. на сайте GitHub: service-bus-sessions.json.

Подробнее см. в статье Шаблон последовательного сопровождения — конструктивные шаблоны облачных архитектур Azure.

Предварительные требования

Проверка доступа к пространству имен служебной шины Microsoft Azure

Если вы не уверены, есть ли у вашего приложения логики разрешения на доступ к пространству имен служебной шины Microsoft Azure, подтвердите эти разрешения.

  1. Войдите на портал Azure. Найдите и выберите пространство имен служебной шины Microsoft Azure.

  2. В меню пространства имен в разделе Параметры выберите Политики общего доступа. В разделе Утверждения убедитесь, что у вас есть разрешения на управление для этого пространства имен.

    Управление разрешениями для пространства имен служебной шины Microsoft Azure

  3. Теперь получите строку подключения для пространства имен служебной шины Microsoft Azure. Эту строку можно использовать позже при создании подключения к пространству имен из вашего приложения логики.

    1. В области Политики общего доступа в разделе Политика выберите RootManageSharedAccessKey.

    2. Теперь нажмите кнопку копирования рядом с основной строкой подключения. Сохраните строку подключения для дальнейшего использования.

      Копирование строки подключения к пространству имен служебной шины

    Совет

    Чтобы проверить, относится ли строка подключения к пространству имен служебной шины или сущности обмена сообщениями, например очереди, найдите в ней параметр EntityPath. Если в строке подключения есть такой параметр, значит она относится к определенной сущности и ее нельзя использовать в приложении логики.

Создание приложения логики

В этом разделе вы создадите приложение логики с помощью шаблона Коррелированная упорядоченная доставка с помощью сеансов служебной шины, который содержит триггер и действия для реализации этого шаблона рабочего процесса. Вы также создадите подключение к пространству имен служебной шины Microsoft Azure и укажете имя очереди служебной шины, которую вы хотите использовать.

  1. Создайте пустое приложение логики на портале Azure. На домашней странице Azure последовательно выберите Создать ресурс>Интеграция>Приложение логики.

  2. Когда откроется коллекция шаблонов, прокрутите разделы видео и общих триггеров. В разделе Шаблоны выберите шаблон Коррелированная упорядоченная доставка с помощью сеансов служебной шины.

    Выберите шаблон

  3. В открывшемся окне подтверждения выберите Да, чтобы продолжить.

  4. В конструкторе приложений логики в фигуре Служебная шина Microsoft Azure выберите Продолжить, а затем щелкните появившийся знак ( + ).

    Выберите

  5. Теперь создайте подключение к служебной шине Microsoft Azure, выбрав один из вариантов.

    • Чтобы использовать строку подключения, скопированную ранее из пространства имен служебной шины Microsoft Azure, выполните следующие действия:

      1. Выберите Ввести сведения о подключении вручную.

      2. В поле Connection Name укажите имя своего подключения. В поле Connection String ставьте свою строку подключения к пространству имен и выберите Create (Создать), например:

        Введите имя подключения и строку подключения к служебной шине Microsoft Azure

    • Чтобы выбрать пространство имен служебной шины Microsoft Azure из текущей подписки Azure, выполните следующие действия:

      1. В поле Connection Name укажите имя своего подключения. Для Service Bus Namespace выберите свое пространство имен служебной шины Microsoft Azure. Например:

        Введите имя подключения и выберите пространство имен служебной шины Microsoft Azure

      2. Когда откроется следующая панель, выберите свою политику служебной шины Microsoft Azure и щелкните Create (Создать).

        Выберите политику служебной шины Microsoft Azure и нажмите

  6. Когда все будет готово, выберите Продолжить.

    В конструкторе приложений логики теперь отображается шаблон Коррелированная упорядоченная доставка с помощью сеансов служебной шины, который содержит предварительно заполненный рабочий процесс с триггером и действиями, включая две области, реализующие обработку ошибок согласно шаблону Try-Catch.

Теперь вы можете узнать подробнее о триггере и действиях в шаблоне или перейти к предоставлению значений для шаблона приложения логики.

Сводка по шаблону

Вот общая схема рабочего процесса в шаблоне Коррелированная упорядоченная доставка с помощью сеансов служебной шины, в котором подробные сведения свернуты:

Общая схема рабочего процесса шаблона

Имя Описание
When a message is received in a queue (peek-lock) В зависимости от заданной периодичности триггер служебной шины Microsoft Azure проверяет указанную очередь служебной шины на наличие сообщений. Если в очереди есть сообщение, срабатывает триггер, который создает и запускает экземпляр рабочего процесса.

Термин просмотр и блокировка означает, что триггер отправляет запрос на извлечение сообщения из очереди. Если сообщение существует, триггер извлекает и блокирует его, чтобы предотвратить прочую обработку этого сообщения, пока не истечет период блокировки. Подробнее см. в разделе Инициализация сеанса.

Init isDone Это действие инициализации переменной создает логическую переменную со значением false и показывает, что выполняются следующие условия:

— В сеансе больше нет сообщений, доступных для считывания.
— Блокировку сеанса больше не требуется обновлять, поэтому текущий экземпляр рабочего процесса можно завершить.

Подробнее см. в разделе Инициализация сеанса.

Try Это действие области содержит действия, выполняемые для обработки сообщения. Если в области Try возникает проблема, то последующее действие областиCatch обрабатывает эту проблему. Подробнее см. в разделе Область "Try".
Catch Это действие области содержит действия, которые выполняются, если в предыдущей области Try возникает проблема. Подробнее см. в разделе Область "Catch".

Область "Try"

Далее представлена общая схема потока в действии областиTry (подробные сведения свернуты):

Рабочий процесс действия области

Имя Описание
Send initial message to topic Это действие можно заменить на любое другое действие, которое должно обрабатывать первое сообщение из сеанса в очереди. Сеанс указывается с помощью идентификатора сеанса.

Для этого шаблона действие служебной шины Microsoft Azure отправляет первое сообщение в раздел служебной шины Microsoft Azure. Подробнее см. в разделе Обработка исходного сообщения.

(параллельная ветвь) Это действие параллельной ветви создает два пути:

— Ветвь №1: продолжить обработку сообщения. Подробнее см. в разделе Ветвь №1: завершить исходное сообщение в очереди.

— Ветвь №2: отказаться от сообщения в случае неполадки и разблокировать для получения другим выполнением триггера. Подробнее см. в разделе Ветвь №2: отказаться от исходного сообщения из очереди.

Оба пути будут позже объединены в действии Закрыть сеанс в очереди и завершить его выполнение, как описано в следующей строке.

Close a session in a queue and succeed Это действие служебной шины Microsoft Azure объединяет ранее описанные ветви и закрывает сеанс в очереди после того, как происходит одно из следующих событий:

— Рабочий процесс завершает обработку доступных сообщений в очереди.
— Рабочий процесс отказывается от исходного сообщения в случае неполадки.

Подробнее см. в разделе Закрыть сеанс в очереди и завершить его выполнение.

Ветвь №1: завершить исходное сообщение в очереди

Имя Описание
Complete initial message in queue Это действие служебной шины Microsoft Azure помечает успешно полученное сообщение как завершенное и удаляет его из очереди, чтобы предотвратить повторную обработку. Подробнее см. в разделе Обработка исходного сообщения.
While there are more messages for the session in the queue Этот цикл Until будет продолжать получать сообщения, пока они есть в очереди или пока не пройдет один час. Подробнее о действиях в этом цикле см. в разделе Пока в очереди есть сообщения для сеанса.
Set isDone = true Если сообщений больше нет, это действие Задать переменную присваивает isDone значение true.
Renew session lock until cancelled Этот цикл Until гарантирует, что блокировка сеанса удерживается приложением логики, пока в очереди есть сообщения или пока не пройдет один час. Подробнее о действиях в этом цикле см. в разделе Продлять блокировку сеанса до отмены.

Ветвь #2: отказаться от исходного сообщения из очереди

Если действие, обрабатывающее первое сообщение, завершается ошибкой, действие служебной шины Microsoft Azure Отказаться от исходного сообщения из очереди разблокирует исходное сообщение, позволяя другому выполнению экземпляра рабочего процесса получить и обработать его. Подробнее см. в разделе Обработка исходного сообщения.

Область "Catch"

Если действия в области Try не выполняются, приложение логики все равно должно завершить сеанс. Действие областиCatchвыполняется, если действие области Try приводит к состоянию Failed, Skipped или TimedOut. Область возвращает сообщение об ошибке, содержащее идентификатор сеанса, в котором возникла проблема, и завершает приложение логики.

Вот общая схема потока в действии области Catch (подробные сведения свернуты):

Рабочий процесс действия области

Имя Описание
Close a session in a queue and fail Это действие служебной шины Microsoft Azure закрывает сеанс в очереди, чтобы блокировка сеанса не оставалась открытой. Подробнее см. в разделе Закрыть сеанс в очереди и завершить с ошибкой.
Find failure msg from 'Try' block Это действие Фильтр массива создает массив из входных и выходных данных всех действий в области Try на основе указанных критериев. В этом случае данное действие возвращает выходные данные действий, которые привели к состоянию Failed. Дополнительные сведения см. в разделе Найти сообщение о сбое в блоке "Try".
Select error details Это действие Выбрать создает массив, содержащий объекты JSON на основе указанных критериев. Эти объекты JSON формируются на основе значений в массиве, созданном предыдущим действием Find failure msg from 'Try' block. В этом случае данное действие возвращает массив, содержащий объект JSON, который создан на основе сведений об ошибке, возвращенных предыдущим действием. Подробнее см. в разделе Выбор сведений об ошибке.
Terminate Это действие Завершить останавливает выполнение рабочего процесса, отменяет все выполняемые действия, пропускает все оставшиеся действия и возвращает указанное состояние, идентификатор сеанса и результат ошибки из действия Select error details. Подробнее см. в разделе Завершение приложения логики.

Заполнение шаблона

Чтобы указать значения для триггера и действий в шаблоне Коррелированная упорядоченная доставка с помощью сеансов служебной шины, выполните следующие действия. Перед сохранением приложения логики необходимо указать все обязательные значения, помеченные звездочкой ( * ).

Инициализация сеанса

  • Для триггера При получении сообщения в очереди (просмотр и блокировка) укажите эти сведения, чтобы шаблон мог инициализировать сеанс с помощью свойства Session id (ИД сеанса). Например:

    Сведения о триггере служебной шины Microsoft Azure

    Примечание

    Изначально для интервала опроса устанавливается значение 3 минуты, чтобы приложение логики не выполнялось слишком часто и не приводило к излишним затратам. В идеале задайте интервал и частоту 30 секунд, чтобы приложение логики запускалось сразу же при поступлении сообщения.

    Свойство Требуется для этого сценария Значение Описание
    Имя очереди Да <queue-name> Имя ранее созданной очереди служебной шины Microsoft Azure. В этом примере используется имя "Fabrikam-Service-Bus-Queue".
    Тип очереди Да Основная страница Основная очередь служебной шины Microsoft Azure.
    Идентификатор сеанса Да Следующий доступный Этот параметр получает сеанс для каждого выполнения триггера на основе идентификатора сеанса из сообщения в очереди служебной шины Microsoft Azure. Кроме того, сеанс блокируется, чтобы другие приложения логики или клиенты не могли обрабатывать сообщения, связанные с этим сеансом. Последующие действия рабочего процесса обрабатывают все сообщения, связанные с этим сеансом, как описано далее в этой статье.

    Вот дополнительные сведения о других вариантах идентификатора сеанса:

    - None (Нет): опция по умолчанию; в результате никаких сеансов нет, и их нельзя использовать для реализации шаблона последовательного сопровождения.

    - Enter custom value (Ввести пользовательское значение): используйте этот параметр, если вы знаете, какой ИД сеанса вы хотите использовать, и хотите всегда выполнять триггер для этого идентификатора.

    Примечание. Соединитель служебной шины Microsoft Azure может одновременно сохранять в своем кэше ограниченное количество уникальных сеансов из служебной шины Microsoft Azure. Если число сеансов превышает это максимальное значение, старые сеансы удаляются из кэша. Подробнее см. в разделе Обмен сообщениями в облаке с помощью Azure Logic Apps и служебной шины Microsoft Azure.

    Интервал Да <number-of-intervals> Число единиц времени между повторениями перед проверкой наличия сообщения.
    Частота Да Second (секунда), Minute (минута), Hour (час), Day (день), Week (неделя) или Month (месяц). Единица времени для повторения, используемое при проверке наличия сообщения.

    Совет. Чтобы добавить Часовой пояс или Время запуска, выберите эти свойства в списке Добавить новый параметр.

    Подробнее о триггерах см. в разделе Служебная шина Microsoft Azure — при получении сообщения в очереди (просмотр и блокировка). Триггер выводит ServiceBusMessage (Сообщение служебной шины).

После инициализации сеанса рабочий процесс использует действие Initialize variable (Инициализировать переменную), чтобы создать логическую переменную с начальным значением false, и указывает, что выполняются следующие условия:

  • В сеансе больше нет сообщений, доступных для считывания.

  • Блокировку сеанса больше не требуется обновлять, поэтому текущий экземпляр рабочего процесса можно завершить.

Сведения о действии

Затем в блоке Try рабочий процесс выполняет действия с первым считанным сообщением.

Обработка исходного сообщения

Первое действие — это действие-заполнитель служебной шины Microsoft Azure Send initial message to topic (Отправить исходное сообщение в раздел), которое можно заменить любым другим действием для обработки первого сообщения из сеанса в очереди. Идентификатор сеанса указывает сеанс, из которого было получено сообщение.

Действие-заполнитель служебной шины Microsoft Azure отправляет первое сообщение в раздел служебной шины, указанный свойством Session ID (ИД сеанса). Таким образом все сообщения, связанные с конкретным сеансом, отправляются в один и тот же раздел. Все свойства Session Id (ИД сеанса) для последующих действий в этом шаблоне используют одно и то же значение ИД сеанса.

Сведения о действии служебной шины Microsoft Azure

  1. В действии служебной шины Microsoft Azure Complete initial message in queue (Завершить исходное сообщение в очереди) укажите имя своей очереди служебной шины и не изменяйте значения по умолчанию для всех остальные свойств в действии.

    Сведения о действии служебной шины Microsoft Azure

  2. В действии служебной шины Microsoft Azure Abandon initial message from the queue (Отказаться от исходного сообщения из очереди) укажите имя своей очереди служебной шины и не изменяйте значения по умолчанию для всех остальные свойств в действии.

    Сведения о действии служебной шины Microsoft Azure

Далее вы предоставите необходимые сведения для действий, следующих за действием Завершить исходное сообщение в очереди. Вы начнете с действий в цикле Пока в очереди есть сообщения для сеанса.

Пока в очереди есть сообщения для сеанса

Данный цикл Until выполняет эти действия, пока в очереди есть сообщения или пока не пройдет один час. Чтобы изменить максимальное время цикла, измените значение свойства Timeout (Время ожидания) цикла.

  • Получение дополнительных сообщений из очереди, пока в ней есть сообщения.

  • Проверка количества оставшихся сообщений. Если есть сообщения, их обработка продолжается. Если сообщений больше нет, рабочий процесс присваивает переменной isDone значение true, а затем завершает цикл.

Цикл Until: обработка сообщений, пока они есть в очереди

  1. В действии служебной шины Microsoft Azure Get additional messages from session (Получать дополнительные сообщения из сеанса) укажите имя очереди служебной шины Microsoft Azure. Не изменяйте значения по умолчанию всех остальных свойств в действии.

    Примечание

    По умолчанию максимальное число сообщений равно 175, но на это ограничение влияют размер сообщения и свойство "максимальный размер сообщения" в служебной шине Microsoft Azure. Подробнее см. в разделе Размер сообщения для очереди.

    Действие служебной шины

    Далее рабочий процесс разделяется на следующие параллельные ветви:

    • Если во время проверки на наличие дополнительных сообщений возникает ошибка или сбой, для переменной isDone задается значение true.

    • Условие Process messages if we got any (Обрабатывать сообщения при наличии) проверяет, не равно ли количество оставшихся сообщений нулю. Если значение равно false и есть еще сообщения, обработка продолжается. Если значение равно true и сообщений больше нет, рабочий процесс присваивает переменной isDone значение true.

    Условие

    В разделе If false цикл For each обрабатывает каждое сообщение в порядке "первым поступил — первым обслужен" (FIFO). В параметрах цикла для параметра Concurrency Control (Управление параллелизмом) задано значение 1, поэтому одновременно обрабатывается только одно сообщение.

    Цикл

  2. Для действий служебной шины Microsoft Azure Complete the message in a queue (Завершить сообщение в очереди) и Abandon the message in a queue (Отказаться от сообщения в очереди) укажите имя вашей очереди служебной шины.

    Действия служебной шины Microsoft Azure —

    После завершения цикла Пока в очереди есть сообщения для сеанса рабочий процесс задает для переменной isDone значение true.

Далее вы укажете необходимые сведения для действий в цикле Продлить блокировку сеанса до отмены.

Продлить блокировку сеанса до отмены

Этот цикл Until гарантирует, что блокировка сеанса удерживается приложением логики, пока в очереди есть сообщения или пока не пройдет один час при выполнении этих действий. Чтобы изменить максимальное время цикла, измените значение свойства Timeout (Время ожидания) цикла.

  • Применяется задержка 25 секунд или на время, меньшее времени ожидания блокировки для обрабатываемой очереди. Наименьшая продолжительность блокировки составляет 30 секунд, поэтому значения по умолчанию достаточно. Однако можно оптимизировать количество выполнений цикла, изменив соответствующие значения.

  • Проверьте, имеет ли переменная isDone значение true.

    • Если параметр isDone имеет значение true, рабочий процесс все еще обрабатывает сообщения, поэтому он обновляет блокировку сеанса в очереди и снова проверяет условие цикла.

      Вы должны указать имя вашей очереди служебной шины Microsoft Azure в действии служебной шины Renew lock on the session in a queue (Продлить блокировку сеанса в очереди).

    • Если параметр isDone имеет значение true, рабочий процесс не обновляет блокировку сеанса в очереди и выходит из цикла.

    Цикл Until —

Продлить блокировку сеанса в очереди

Это действие служебной шины Microsoft Azure продляет блокировку сеанса в очереди, пока рабочий процесс обрабатывает сообщения.

  • В действии служебной шины Renew lock on the session in a queue (Продлить блокировку сеанса в очереди) укажите имя вашей очереди служебной шины Microsoft Azure.

    Действие служебной шины Microsoft Azure —

Далее вы предоставите необходимые сведения для действия служебной шины Microsoft Azure Закрыть сеанс в очереди и завершить.

Закрыть сеанс в очереди и завершить

Это действие служебной шины Microsoft Azure закрывает сеанс в очереди после того, как рабочий процесс заканчивает обработку всех доступных сообщений в очереди либо отказывается от исходного сообщения.

  • В действии служебной шины Microsoft Azure Close a session in a queue and succeed (Закрыть сеанс в очереди и завершить) укажите имя вашей очереди служебной шины Microsoft Azure.

    Действие служебной шины Microsoft Azure —

В следующих разделах описываются действия в разделе Catch, которые обрабатывают ошибки и исключения, происходящие в рабочем процессе.

Закрыть сеанс в очереди и завершить с ошибкой

Это действие служебной шины Microsoft Azure всегда выполняется как первое действие в области Catch и закрывает сеанс в очереди.

  • В действии служебной шины Microsoft Azure Close a session in a queue and fail (Закрыть сеанс в очереди и завершить с ошибкой) укажите имя вашей очереди служебной шины Microsoft Azure.

    Действие служебной шины Microsoft Azure —

Затем рабочий процесс создает массив с входными и выходными данными из всех действий в области Try, чтобы приложение логики могло получить доступ к сведениям об ошибке или сбое.

Найти сообщение о сбое в блоке "Try"

Это действие Фильтр массива создает массив из входных и выходных данных всех действий в области Try на основе указанных критериев с помощью функции result(). В этом случае данное действие возвращает выходные данные из действий, имеющих состояние Failed, с помощью функции equals() и функции item().

Действие

Вот определение JSON для этого действия:

"Find_failure_msg_from_'Try'_block": {
   "inputs": {
      "from": "@Result('Try')",
      "where": "@equals(item()['status'], 'Failed')"
   },
   "runAfter": {
      "Close_the_session_in_the_queue_and_fail": [
         "Succeeded"
      ]
   },
   "type": "Query"
},

Затем рабочий процесс создает массив с объектом JSON, который содержит сведения об ошибке в массиве, возвращенном из действия Find failure msg from 'Try' block.

Выбор сведений об ошибке

Это действие Выбрать создает массив, содержащий объекты JSON, на основе входного массива, полученного из предыдущего действия Find failure msg from 'Try' block. В частности, это действие возвращает массив, который содержит только указанные свойства для каждого объекта в массиве. В данном случае массив содержит свойства "имя действия" и "результат ошибки".

Действие

Вот определение JSON для этого действия:

"Select_error_details": {
   "inputs": {
      "from": "@body('Find_failure_msg_from_''Try''_block')[0]['outputs']",
      "select": {
         "action": "@item()['name']",
         "errorResult": "@item()"
      }
   },
   "runAfter": {
      "Find_failure_msg_from_'Try'_block": [
         "Succeeded"
      ]
   },
   "type": "Select"
},

Далее рабочий процесс останавливает выполнение приложения логики и возвращает состояние выполнения, а также дополнительные сведения об ошибке или сбое.

Завершение выполнения приложения логики

Это действие Завершить останавливает выполнение приложения логики и возвращает Failed в качестве состояния для выполнения приложения логики вместе с идентификатором сеанса и ошибкой из действия Select error details.

Действие

Вот определение JSON для этого действия:

"Terminate": {
   "description": "This Failure Termination only runs if the Close Session upon Failure action runs - otherwise the LA will be terminated as Success",
   "inputs": {
      "runError": {
         "code": "",
         "message": "There was an error processing messages for Session ID @{triggerBody()?['SessionId']}. The following error(s) occurred: @{body('Select_error_details')['errorResult']}"
         },
         "runStatus": "Failed"
      },
      "runAfter": {
         "Select_error_details": [
            "Succeeded"
         ]
      },
      "type": "Terminate"
   }
},

Сохранение и запуск приложения логики

После заполнения шаблона можно сохранить приложение логики. На панели инструментов конструктора щелкните Сохранить.

Чтобы протестировать приложение логики, отправьте сообщения в свою очередь служебной шины Microsoft Azure.

Дальнейшие действия