Несколько конкурирующих потребителей обрабатывают сообщения, полученные в одном канале обмена сообщениями. С несколькими одновременными потребителями система может одновременно обрабатывать несколько сообщений для оптимизации пропускной способности, повышения масштабируемости и доступности, а также для балансировки рабочей нагрузки.
Контекст и проблема
Приложение, работающее в облаке, должно обрабатывать большое количество запросов. Вместо того чтобы обрабатывать каждый запрос синхронно, обычно приложение передает их через систему обмена сообщениями в другую службу (служба потребителя), которая обрабатывает их асинхронно. Эта стратегия помогает убедиться, что бизнес-логика в приложении не заблокирована, а запросы обрабатываются.
Количество запросов может существенно варьироваться со временем по многим причинам. Внезапное увеличение активности пользователей или агрегированных запросов в нескольких клиентах может вызвать непредсказуемую рабочую нагрузку. В пиковые часы система может потребоваться обработать много сотен запросов в секунду, в то время как в другое время число может быть очень небольшим. Кроме того, характер выполняемой работы для обработки этих запросов может сильно изменяться. С помощью одного экземпляра службы потребителей можно привести к тому, что экземпляр будет переполнен запросами. Кроме того, система обмена сообщениями может быть перегружена притоком сообщений, поступающих из приложения. Для обработки этой изменяющейся рабочей нагрузки система может выполнить несколько экземпляров службы потребителя. Однако этих потребителей необходимо скоординировать, чтобы обеспечить доставку каждого сообщения отдельному потребителю. Необходимо также сбалансировать рабочую нагрузку среди потребителей, чтобы экземпляр не вызывал проблемы с производительностью.
Решение
С помощью очереди сообщений можно реализовать коммуникационный канал между приложением и экземплярами службы потребителя. Приложение отправляет запросы в виде сообщения в очередь. Экземпляры службы потребителя получают сообщения из очереди и обрабатывают их. Такой подход позволяет одному пулу экземпляров службы потребителя обрабатывать сообщения из любого экземпляра приложения. На рисунке ниже показано использование очереди сообщений для распределения нагрузки в экземплярах службы.
Примечание.
Хотя существует несколько потребителей этих сообщений, это не так же, как шаблон подписки публикации (pub/sub). При подходе "Конкурирующие потребители" каждое сообщение передается одному потребителю для обработки, в то время как при использовании подхода Pub/Sub все потребители передают каждое сообщение.
Это решение имеет следующие преимущества:
Оно предоставляет систему с выровненной нагрузкой, которая может обрабатывать большое число запросов, отправленных экземплярами приложения. Очередь выступает в качестве буфера между экземплярами приложения и экземплярами службы потребителя. Этот буфер может помочь свести к минимуму влияние на доступность и скорость реагирования как для приложений, так и для экземпляров службы. Дополнительные сведения см . в шаблоне выравнивания нагрузки на основе очередей. Длительная обработка сообщений не предотвращает параллельную обработку других сообщений другими экземплярами службы потребителя.
Это повышает надежность. Если производитель взаимодействует с потребителем напрямую, вместо того чтобы использовать этот шаблон, но не отслеживает его, существует высокая вероятность того, что сообщения могут быть утеряны или их не удастся обработать, если произойдет сбой потребителя. В этом шаблоне сообщения не отправляются в конкретный экземпляр службы. Поврежденный экземпляр службы не будет блокировать производителя, и сообщения можно обрабатывать с помощью любого рабочего экземпляра службы.
Это не требует комплексной координации между потребителями или между производителем и экземплярами потребителя. Очередь сообщений гарантирует, что каждое сообщение будет доставлено не менее одного раза.
Оно масштабируемое. При применении автоматического масштабирования система может динамически увеличивать или уменьшать количество экземпляров службы потребителей по мере изменения объема сообщений.
Оно может улучшить устойчивость, если очередь сообщения предоставляет транзакционные операции считывания. Если экземпляр службы потребителя считывает и обрабатывает сообщение как часть транзакционной операции и происходит его сбой, этот шаблон может гарантировать, что сообщение будет возвращено в очередь, чтобы другой экземпляр службы потребителя мог выбрать его для обработки. Чтобы снизить риск непрерывного сбоя сообщения, рекомендуется использовать очереди недоставленных сообщений.
Проблемы и рекомендации
При принятии решения о реализации этого шаблона необходимо учитывать следующие моменты.
Упорядочивание сообщений. Порядок, в котором экземпляры службы потребителя получают сообщения, ненадежный и не обязательно должен совпадать с порядком создания сообщений. Разработайте систему, чтобы гарантировать идемпотентную обработку сообщений. Это поможет исключить любые зависимости в порядке обработки сообщений. Дополнительные сведения см . в блоге Idempotency Patterns on Jonathon Oliver.
Очереди служебной шины Microsoft Azure могут реализовать гарантированную доставку сообщений по методу FIFO ("первым прибыл, первым обслужен") с помощью сеансов обмена сообщениями. Дополнительные сведения см. в статье Microsoft Azure Service Bus: шаблоны обмена сообщениями с использованием сеансов.
Разработка служб для обеспечения устойчивости. Если система разработана, чтобы определять и перезапускать неработоспособные экземпляры служб, может понадобиться реализовать обработку, выполненную экземплярами службы в качестве отдельной операции, чтобы снизить воздействие одного сообщения, которое извлекается и обрабатывается более одного раза.
Определение сообщений о сбое. Неправильно сформированные сообщения или задачи, которым требуется доступ к недоступным ресурсам, могут вызвать сбой экземпляра службы. Система должна предотвратить возвращение таких сообщений в очередь и вместо этого собрать и сохранить сведения об этих сообщениях в другом месте, чтобы при необходимости их можно было анализировать.
Обработка результатов. Экземпляр службы, который обрабатывает сообщение, полностью отключен от логики приложения, которая создает сообщение. Они не могут взаимодействовать напрямую. Если экземпляр службы формирует результаты, которые должны быть переданы обратно в логику приложения, эти сведения должны храниться в доступном для них расположении. Чтобы логика приложения не получала неполные данные, система должна определять, когда обработка завершена.
Если вы используете Azure, рабочий процесс может передать результат обратно в логику приложения с помощью выделенной очереди ответов сообщения. Логика приложения должна секционировать эти результаты с исходным сообщением. Этот сценарий описан более подробно в руководстве по асинхронному обмеру сообщениями.
Масштабирование системы обмена сообщениями. В крупномасштабном решении одна очередь сообщений может быть переполненной количеством сообщений и может стать узким местом в системе. В этом случае попробуйте секционировать систему обмена сообщениями для отправки писем от конкретных поставщиков в определенные очереди. Вы также можете использовать балансировку нагрузки, чтобы распределить сообщения в нескольких очередях сообщений.
Обеспечение надежности системы обмена сообщениями. Надежная система обмена сообщениями необходима, чтобы гарантировать, что сообщения не будут потеряны, когда приложение поставит их в очередь. Эта система необходима для обеспечения доставки всех сообщений по крайней мере один раз.
Когда следует использовать этот шаблон
Используйте этот шаблон в следующих случаях:
- Рабочая нагрузка для приложения разделена на задачи, которые могут выполняться асинхронно.
- Задачи являются независимыми и могут выполняться параллельно.
- Объем работ может сильно изменяться, требуя масштабируемого решения.
- Решение должно предоставлять высокий уровень доступности и должно быть устойчивым в случае сбоя обработки задачи.
Этот шаблон может оказаться неэффективным в следующих случаях:
- Сложно разделить рабочую нагрузку приложения на дискретные задачи, или имеется высокий уровень зависимости между приложениями.
- Задачи необходимо выполнить синхронно, и логике приложения необходимо ждать завершения выполнения задачи, прежде чем продолжить.
- Задачи должны выполняться в определенной последовательности.
Некоторые системы обмена сообщениями поддерживают сеансы, которые позволяют производителю группировать сообщения вместе и обеспечить их выполнение одним и тем же потребителем. Этот механизм можно использовать с приоритетными сообщениями (если они поддерживаются), чтобы реализовать вариант упорядочивания сообщений, который последовательно доставляет сообщения от производителя к одному потребителю.
Проектирование рабочей нагрузки
Архитектор должен оценить, как шаблон конкурирующих потребителей можно использовать в проектировании рабочей нагрузки для решения целей и принципов, описанных в основных принципах платформы Azure Well-Architected Framework. Например:
Принцип | Как этот шаблон поддерживает цели основных компонентов |
---|---|
Решения по проектированию надежности помогают рабочей нагрузке стать устойчивой к сбоям и обеспечить восстановление до полнофункционального состояния после сбоя. | Этот шаблон создает избыточность в обработке очередей, рассматривая потребителей как реплики, поэтому сбой экземпляра не препятствует другим потребителям обрабатывать сообщения очереди. - ИЗБЫТОЧНОСТЬ RE:05 - Задания RE:07 Фоновые задания |
Оптимизация затрат ориентирована на поддержание и улучшение рентабельности инвестиций рабочей нагрузки. | Этот шаблон поможет оптимизировать затраты, включив масштабирование на основе глубины очереди до нуля, если очередь пуста. Кроме того, это позволяет оптимизировать затраты, позволяя ограничить максимальное количество одновременных экземпляров потребителей. - Оптимизация скорости CO:05 - Затраты на компоненты CO:07 |
Эффективность производительности помогает рабочей нагрузке эффективно соответствовать требованиям путем оптимизации масштабирования, данных, кода. | Распределение нагрузки по всем узлам потребителей увеличивает использование и динамическое масштабирование на основе глубины очереди свести к минимуму избыточность. - Pe:05 Масштабирование и секционирование - Pe:07 Code и инфраструктура |
Как и любое решение по проектированию, рассмотрите любые компромиссы по целям других столпов, которые могут быть представлены с этим шаблоном.
Пример
Azure предоставляет служебная шина очереди и триггеры очереди функций Azure, которые при объединении представляют собой прямую реализацию этого шаблона разработки облака. Функции Azure интегрироваться с Служебная шина Azure с помощью триггеров и привязок. Интеграция с служебная шина позволяет создавать функции, использующие сообщения очереди, отправленные издателями. Приложения публикации будут публиковать сообщения в очередь, а потребители, реализованные как Функции Azure, могут получать сообщения из этой очереди и обрабатывать их.
Для обеспечения устойчивости очередь служебная шина позволяет потребителю использовать PeekLock
режим при получении сообщения из очереди. Этот режим не удаляет сообщение, а просто скрывает его от других потребителей. Среда выполнения Функции Azure получает сообщение в режиме PeekLock, если функция успешно завершает его вызовы Complete on the message, или может вызвать "Отказаться", если функция завершится ошибкой, и сообщение станет видимым снова, позволяя другому потребителю получить его. Если функция выполняется в течение длительного периода, чем время ожидания PeekLock, блокировка автоматически обновляется до тех пор, пока функция запущена.
Функции Azure может масштабировать или масштабироваться на основе глубины очереди, все они действуют в качестве конкурирующих потребителей очереди. Если несколько экземпляров функций создаются, все они конкурируют путем независимого извлечения и обработки сообщений.
Дополнительные сведения об использовании очередей служебной шины Azure см. в статье Очереди, разделы и подписки служебной шины.
Сведения о активации очереди Функции Azure см. в Служебная шина Azure триггере для Функции Azure.
В следующем коде показано, как создать новое сообщение и отправить его в очередь служебная шина с помощью экземпляраServiceBusClient
.
private string serviceBusConnectionString = ...;
...
public async Task SendMessagesAsync(CancellationToken ct)
{
try
{
var msgNumber = 0;
var serviceBusClient = new ServiceBusClient(serviceBusConnectionString);
// create the sender
ServiceBusSender sender = serviceBusClient.CreateSender("myqueue");
while (!ct.IsCancellationRequested)
{
// Create a new message to send to the queue
string messageBody = $"Message {msgNumber}";
var message = new ServiceBusMessage(messageBody);
// Write the body of the message to the console
this._logger.LogInformation($"Sending message: {messageBody}");
// Send the message to the queue
await sender.SendMessageAsync(message);
this._logger.LogInformation("Message successfully sent.");
msgNumber++;
}
}
catch (Exception exception)
{
this._logger.LogException(exception.Message);
}
}
В следующем примере кода показан потребитель, написанный как функция Azure C#, который считывает метаданные сообщения и записывает сообщение служебная шина очереди. Обратите внимание, как ServiceBusTrigger
атрибут используется для привязки атрибута к очереди служебная шина.
[FunctionName("ProcessQueueMessage")]
public static void Run(
[ServiceBusTrigger("myqueue", Connection = "ServiceBusConnectionString")]
string myQueueItem,
Int32 deliveryCount,
DateTime enqueuedTimeUtc,
string messageId,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function consumed message: {myQueueItem}");
log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
log.LogInformation($"DeliveryCount={deliveryCount}");
log.LogInformation($"MessageId={messageId}");
}
Следующие шаги
Руководство по асинхронному обмену сообщениями. Очереди сообщений — это механизмы для асинхронного обмена данными. Если службе потребителя необходимо отправить ответ в приложение, может потребоваться реализовать некоторые варианты обмена сообщениями через ответы. Руководство по асинхронному обмену сообщениями предоставляет сведения о реализации обмена сообщениями через запрос или ответ с помощью очередей сообщения.
Руководство по автоматическому масштабированию. Вы можете выполнить и остановить экземпляры службы потребителя, так как длина очереди, в которой приложение публикует сообщения, отличается. Автомасштабирование может помочь поддерживать пропускную способность во время максимальной нагрузки.
Связанные ресурсы
При реализации этого шаблона следует принять во внимание следующие шаблоны и рекомендации.
Шаблон консолидации вычислительных ресурсов. Вы можете выполнить консолидацию нескольких экземпляров службы потребителя в один процесс, чтобы сократить расходы и затраты на управление. В статье Compute Resource Consolidation pattern (Шаблон консолидации вычислительных ресурсов) описаны преимущества и недостатки использования этого подхода.
Шаблон балансировки нагрузки на основе очередей. Добавление очереди сообщений может повысить устойчивость системы, позволить экземплярам службы обрабатывать разные объемы запросов из экземпляров приложения. Очередь сообщений работает как буфер, которой уравнивает нагрузку. Статья Queue-based Load Leveling pattern (Шаблон балансировки нагрузки на основе очередей) описывает этот сценарий более подробно.