Передача, блокировка и согласование сообщений

Одной из основных возможностей брокера обмена сообщениями, в том числе служебной шины, является прием сообщений в очередь или раздел и их хранение для последующего извлечения. Отправка — это термин, который широко применяется для передачи сообщения в брокер обмена сообщениями. Получение — это термин, который широко применяется для передачи сообщения в получающий клиент.

Когда клиент отправляет сообщение, обычно он хочет знать, правильно ли сообщение передается и принимается брокером или возникает ли какая-то ошибка. Это подтверждение или неподтверждение передачи позволяет согласовать представление клиента и брокера о состоянии передачи сообщения, поэтому оно называется согласованием.

Аналогичным образом, когда брокер передает сообщение клиенту, брокер и клиент хотят установить представление о том, успешно ли обработано сообщение и поэтому может быть удалено, или происходит сбой доставки или обработки сообщения, поэтому сообщение может быть доставлено снова.

Согласование операций отправки

При использовании любого поддерживаемого клиента API служебной шины операции отправки в служебную шину всегда согласовываются явным образом. Это значит, что операция API ожидает результата приема из служебной шины, и только после его получения выполняет операцию отправки.

Если служебная шина отклоняет сообщение, то данные отклонения содержат индикатор ошибки и текст с идентификатором трассировки (tracking-id). Эти данные также содержат сведения о том, можно ли повторить операцию с какой-либо вероятностью успеха. В клиенте эти сведения преобразуются в исключение, передаваемое вызывающей стороне операции отправки. Если сообщение принято, операция автоматически завершается.

Протокол расширенной очереди сообщений (AMQP) — это единственный протокол, поддерживаемый для клиентов .NET Standard, Java, JavaScript, Python и Go. Для клиентов платформа .NET Framework можно использовать протокол служебная шина обмена сообщениями (SBMP) или AMQP. При использовании протокола AMQP передачи сообщений и расчетов конвейерируются и асинхронны. Рекомендуется использовать API для асинхронной модели программирования.

30 сентября 2026 г. мы удалим библиотеки пакета SDK Служебная шина Azure WindowsAzure.ServiceBus, Microsoft.Azure.ServiceBus и com.microsoft.azure.servicebus, которые не соответствуют рекомендациям по пакету SDK Azure. Мы также завершим поддержку протокола SBMP, поэтому вы больше не сможете использовать этот протокол после 30 сентября 2026 года. Перейдите в последние библиотеки пакета SDK Azure, которые предлагают критически важные обновления системы безопасности и улучшенные возможности до этой даты.

Хотя старые библиотеки по-прежнему могут использоваться после 30 сентября 2026 года, они больше не будут получать официальную поддержку и обновления от Майкрософт. Дополнительные сведения см. в объявлении о выходе на пенсию в службу поддержки.

Отправитель может быстро передать несколько сообщений по сети в течение короткого промежутка времени без ожидания подтверждения каждого сообщения, что имело бы место в случае с протоколом SBMP или HTTP 1.1. Эти асинхронные операции отправки считаются выполненными, когда соответствующие сообщения принимаются и сохраняются в секционированных сущностях или когда совмещаются операции отправки в разные сущности. Выполнение также может завершаться не в первоначальном порядке отправки.

Стратегия обработки результата операций отправки может мгновенно и значительно влиять на производительность вашего приложения. Примеры в этом разделе написаны на C# и применяются к фьючерсам Java, Java Mono, обещаниям JavaScript и эквивалентным концепциям на других языках.

Если приложение создает пакет сообщений, показанных здесь в простом цикле, и должно ожидать завершения каждой операции отправки перед отправкой следующего сообщения, то результаты синхронного и асинхронного API будут схожи, так как они смогут отправить все 10 сообщений только после 10 последовательных выполнений кругового пути для согласования.

При предполагаемом расстоянии от 70-миллисекундного протокола управления передачей (TCP) от локального сайта до служебная шина и предоставления всего 10 мс для служебная шина принимать и хранить каждое сообщение, следующий цикл занимает не менее 8 секунд, не подсчитывая время передачи полезных данных или потенциальные эффекты перегрузки маршрута:

for (int i = 0; i < 10; i++)
{
    // creating the message omitted for brevity
    await sender.SendMessageAsync(message);
}

Если приложение друг за другом запускает 10 последовательных операций асинхронной отправки и ожидает их завершения по отдельности, то время кругового пути этих 10 операций отправки будет перекрываться. 10 сообщений передаются подряд, потенциально даже в общих кадрах TCP, и общее время передачи во многом зависит от времени, необходимого сети для передачи этих сообщений в брокер.

При действии тех же предположений, что и для предыдущего цикла, общее совмещенное время выполнения приведенного ниже цикла может составить меньше секунды:

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    tasks.Add(sender.SendMessageAsync(message));
}
await Task.WhenAll(tasks);

Важно отметить, что все асинхронные модели программирования используют разновидности скрытых рабочих очередей в памяти, в которых хранятся ожидающие операции. Когда API отправки возвращается, задача отправки помещается в очередь в этой рабочей очереди, но жест протокола начинается только после его выполнения. Для кода, который, как правило, толкает всплески сообщений и где надежность является проблемой, следует принять во внимание, что не слишком много сообщений помещаются в полет одновременно, так как все отправленные сообщения занимают память, пока они не будут помещены в провод.

Семафоры, как показано в следующем фрагменте кода на C#, — это объекты синхронизации, при необходимости обеспечивающие подобное регулирование на уровне приложения. Такое использование семафора гарантирует, что в состоянии "на лету" одновременно могут находиться не более 10 сообщений. Один из 10 доступных семафоров блокируется перед отправкой, и блокировка снимается только после завершения отправки. 11-й проход по циклу ожидает по крайней мере один из предыдущих операций отправки, а затем делает его блокировку доступной:

var semaphore = new SemaphoreSlim(10);

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    await semaphore.WaitAsync();

    tasks.Add(sender.SendMessageAsync(message).ContinueWith((t)=>semaphore.Release()));
}
await Task.WhenAll(tasks);

Приложения никогда не должны инициировать операцию асинхронной передачи по принципу "отправить и забыть", то есть без получения результата операции. В противном случае это может привести к тому, что внутренняя невидимая очередь задач займет всю память, и приложение не сможет обнаруживать ошибки отправки.

for (int i = 0; i < 10; i++)
{
    sender.SendMessageAsync(message); // DON’T DO THIS
}

С низкоуровневым клиентом AMQP служебная шина также принимает "предустановленные" передачи. Предустановленная передача — это операция пожара и забыть, для которой результат, в любом случае, не сообщается клиенту, и сообщение считается урегулированным при отправке. Отсутствие обратной связи для клиента также означает отсутствие действенных данных для диагностики. Поэтому этот режим не соответствует требованиям для получения помощи от службы поддержки Azure.

Согласование операций получения

Для операций получения клиенты API служебной шины используют два разных явных режима: ReceiveAndDelete и PeekLock.

Получить и удалить

В режиме ReceiveAndDelete брокер считает, что все сообщения, отправляемые им в получающий клиент, являются согласованными после отправки. Это означает, что сообщение считается потребляемым, как только брокер помещает его в провод. Если происходит сбой передачи сообщения, оно утрачивается.

Преимуществом этого режима является то, что получателю не требуется предпринимать дальнейшие действия с сообщением и его работа не замедляется из-за ожидания результата согласования. Если данные, содержащиеся в отдельных сообщениях, имеют низкую ценность и (или) важны только в течение очень короткого времени, этот режим является неплохим выбором.

Блокировка для просмотра

В режиме PeekLock брокеру указывается, что принимающему клиенту необходимо явно согласовывать полученные сообщения. Сообщение становится доступным для обработкой получателем. На этот период на сообщение накладывается монопольная блокировка в службе, чтобы его не могли просмотреть другие конкурирующие получатели. Длительность блокировки изначально определяется на уровне очереди или подписки и может быть расширена клиентом, владельцем блокировки, с помощью операции RenewMessageLockAsync . Подробнее об продлении блокировок см. в разделе Продление блокировок этой статьи.

Когда сообщение заблокировано, другие клиенты, получающие сообщения из той же очереди или подписки, могут накладывать свои блокировки и получать следующие доступные сообщения, которые не заблокированы. Когда блокировка сообщения явно освобождается или когда срок действия блокировки истекает, сообщение помещается в передней части порядка извлечения для повторного получения.

Если сообщение несколько раз освобождается получателями или они позволяют истечь сроку его блокировки заданное число раз (maxDeliveryCount), то сообщение автоматически удаляется из очереди или подписки и помещается в связанную очередь недоставленных сообщений.

Принимающий клиент инициирует урегулирование полученного сообщения с положительным подтверждением при вызове API Complete API для сообщения. Это указывает брокеру, что сообщение успешно обработано, и оно удаляется из очереди или подписки. Брокер отвечает на намерение согласования получателя, сообщая, возможно ли выполнить это согласование.

Если получающий клиент не может обработать сообщение, но хочет, чтобы сообщение было повторно удалено, оно может явно попросить освободить и разблокировать сообщение мгновенно, вызвав API "Отказ " для сообщения, или он не может ничего сделать и позволить блокировке истекать.

Если получающий клиент не сможет обработать сообщение и знает, что повторное удаление сообщения и повторная попытка операции не поможет, он может отклонить сообщение, которое перемещает его в очередь недоставленных писем, вызвав API DeadLetter в сообщении, что также позволяет задать пользовательское свойство, включая код причины, который можно получить с сообщением из очереди недоставленных букв.

Примечание.

Подзапуск недоставленной буквы существует для очереди или подписки раздела только в том случае, если для очереди или подписки включена функция недоставленной буквы.

Особым случаем согласования является откладывание, которое рассматривается в отдельной статье.

Операции CompleteDeadLetterили RenewLock операции могут завершиться сбоем из-за проблем с сетью, если срок действия удерживаемой блокировки истек или существуют другие условия, которые препятствуют урегулированию. В одном из последний случаев служба отправляет неподтверждение передачи, которое отображается в клиентах API как исключение. Если причиной является нарушение сетевого подключения, то блокировка снимается, так как служебная шина не поддерживает восстановление существующих ссылок AMQP через другое подключение.

Если Complete происходит сбой, который обычно происходит в самом конце обработки сообщений и в некоторых случаях после нескольких минут обработки, принимающее приложение может решить, следует ли сохранить состояние работы и игнорировать то же сообщение, когда оно доставлено во второй раз, или вывести результат работы и повторить попытку при повторном выполнении сообщения.

Типичный механизм определения доставки повторяющихся сообщений подразумевает проверку идентификатора сообщения, который может и должен быть задан отправителем в виде уникального значения, возможно, в соответствии с идентификатором из исходного процесса. Планировщик заданий скорее всего присвоит идентификатору сообщения идентификатор задания, которое он пытается назначить рабочему процессу, и рабочий процесс пропустит второй экземпляр назначенного задания, если это задание уже выполнено.

Внимание

Важно отметить, что блокировка, полученная в сообщении PeekLock или SessionLock, является нестабильной и может быть потеряна в следующих условиях.

  • Обновление службы
  • Обновление ОС
  • Изменение свойств сущности (очереди, раздела, подписки) при удержании блокировки.

При потере блокировки Служебная шина Azure создаст messageLockLostException или SessionLockLostException, которое будет отображаться в клиентском приложении. В этом случае на клиенте должна автоматически запуститься логика повтора по умолчанию и повторить операцию.

Продление блокировок

Значение по умолчанию для длительности блокировки составляет 1 минуту. Можно задать другую длительность блокировки на уровне очереди или подписки. Клиент, владеющий блокировкой, может продлить блокировку сообщения с помощью методов объекта-получателя. Вместо этого можно использовать функцию автоматического продления блокировки, в которой можно указать период действия продления блокировки.

Рекомендуется задать длительность блокировки более высокой, чем обычное время обработки, поэтому вам не нужно продлевать блокировку. Максимальное значение составляет 5 минут, поэтому необходимо продлить блокировку, если вы хотите его продлить. Наличие длительности блокировки, чем требуется, также имеет некоторые последствия. Например, когда клиент перестанет работать, сообщение станет доступным только после прохождения длительности блокировки.

Следующие шаги