Использование функционала тем и подписок шины обслуживания (Service Bus Topics/Subscriptions) в Windows Azure
В этом руководстве показано, как работать с темами и подписками шины обслуживания (Service Bus Topics/Subscriptions). Примеры написаны на C# с использованием .NET API. В документе рассмотрены следующие сценарии: создание тем и подписок, создание фильтров подписок, отправка сообщений в тему, получение сообщений от подписок и удаление тем и подписок. Дополнительную информацию о темах и подписках см. в разделе «Дальнейшие действия».
- Что такое темы и подписки шины обслуживания
- Создание пространства имен службы
- Получение учетных данных по умолчанию для управления пространством имен
- Настройка приложения для использования шины обслуживания
- Как создать поставщика маркера безопасности
- Как создать тему
- Как создать подписки
- Как отправить сообщения в тему
- Как получить сообщения от подписки
- Как решить проблему со сбоями приложений и нечитаемыми сообщениями
- Как удалить темы и подписки
- Дальнейшие действия
Темы и подписки шины обслуживания поддерживают модель обмена сообщениями публикации и (или) подписки. При использовании тем и подписок компоненты распределенного приложения не взаимодействуют между собой напрямую, а обмениваются сообщениями через тему, которая играет роль посредника.
В отличие от очередей шины обслуживания, где каждое сообщение обрабатывается одним потребителем, темы и подписки взаимодействуют по принципу «один ко многим» с помощью модели публикации/подписки. Для одной темы можно зарегистрировать несколько подписок. Сообщение, отправленное в тему, становится доступно всем подпискам для одновременного анализа или обработки.
Подписка темы является своего рода виртуальной очередью, которая получает копии сообщений, отправленных в тему. При необходимости можно зарегистрировать правила фильтрации темы для каждой подписки, чтобы отфильтровывать либо ограничивать определенные сообщения, получаемые подписками через темы.
Темы и подписки шины обслуживания позволяют обрабатывать огромное количество сообщений для множества пользователей и приложений.
Чтобы использовать темы и подписки сервисной шины в Windows Azure, сначала необходимо создать пространство имен службы. Пространство имен службы представляет собой контейнер, задающий область действия ресурсов шины обслуживания в приложении.
Чтобы создать пространство имен службы, выполните следующие действия.
Войдите на портал управления Windows Azure. В левой нижней панели навигации портала управления щелкните Service Bus, Access Control & Caching. В левой верхней панели портала управления щелкните узел Service Bus, а затем нажмите кнопку New.
В диалоговом окне Create a new Service Namespace введите значение в поле Namespace, а затем — чтобы убедиться в его уникальности — нажмите кнопку Check Availability.
Проверив доступность имени Namespace, выберите страну или регион, в котором будет размещаться это пространство имен (убедитесь, что параметр Country/Region совпадает с тем, для которого вы развертываете вычислительные ресурсы), и нажмите кнопку Create Namespace.
Созданное пространство имен появится на портале управления. Его активация займет несколько секунд. Перед выполнением дальнейших действий дождитесь смены состояния на Active.
Получение учетных данных по умолчанию для управления пространством имен
Чтобы управлять пространством имен, например создавать темы или подписки, необходимо получить учетные данные для управления.
В левой панели навигации щелкните узел Service Bus, чтобы открыть список доступных пространств имен:
В списке выберите только что созданное пространство имен.
В правой панели Properties будут выведены свойства нового пространства имен.
Свойство Default Key скрыто. Нажмите кнопку View, чтобы отобразить учетные данные для безопасного доступа.
Запишите значение параметров Default Issuer и Default Key, чтобы использовать их при работе с пространством имен.
При создании приложения, использующего шину обслуживания, необходимо добавить ссылку на ее сборку и подключить соответствующие пространства имен.
В Solution Explorer Visual Studio щелкните правой кнопкой мыши References и выберите Add Reference.
На вкладке Brows перейдите в папку C:\Program Files\Microsoft SDKs\Windows Azure\.NET SDK\2012-06\ref и добавьте ссылку на Microsoft.ServiceBus.dll.
Добавьте следующий код в начало всех исходных файлов C#, где будут использоваться темы и подписки шины обслуживания.
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
Теперь можно писать код, использующий шину обслуживания.
Шина обслуживания хранит в строке подключения конечные точки и учетные данные. Строку подключения можно поместить в файл конфигурации, а не прописывать ее в коде приложения:
- При использовании облачных служб Windows Azure рекомендуется сохранять строку подключения с помощью системы настройки служб Windows Azure (и файлы).
- При использовании веб-сайтов Windows Azure или виртуальных машин Windows Azure рекомендуется сохранять строку подключения с помощью системы настройки .NET (например, файл).
В обоих случаях строку подключения можно получить посредством метода, как будет показано далее.
В проектах на основе облачных служб Windows Azure реализован уникальный механизм настройки, с помощью которого можно динамически изменять конфигурацию на портале управления Windows Azure без повторного развертывания приложения. Например, добавьте параметр Setting в файл определения службы (), как показано ниже:
<ServiceDefinition name="WindowsAzure1">
...
<WebRole name="MyRole" vmsize="Small">
<ConfigurationSettings>
<Setting name="Microsoft.ServiceBus.ConnectionString" />
</ConfigurationSettings>
</WebRole>
...
</ServiceDefinition>
Затем укажите значения в файле конфигурации службы ().
<ServiceConfiguration serviceName="WindowsAzure1">
...
<Role name="MyRole">
<ConfigurationSettings>
<Setting name="Microsoft.ServiceBus.ConnectionString"
value="Endpoint=sb://[yourServiceNamespace].servicebus.windows.net/;SharedSecretIssuer=[issuerName];SharedSecretValue=[yourDefaultKey]" />
</ConfigurationSettings>
</Role>
...
</ServiceConfiguration>
Используйте значения издателя и ключа, полученные на портале управления, как описано в предыдущем разделе.
При использовании веб-сайтов или виртуальных машин рекомендуется применять систему настройки .NET (например). При этом строка подключения будет храниться в элементе.
<configuration>
<appSettings>
<add key="Microsoft.ServiceBus.ConnectionString"
value="Endpoint=sb://[yourServiceNamespace].servicebus.windows.net/;SharedSecretIssuer=[issuerName];SharedSecretValue=[yourDefaultKey]" />
</appSettings>
</configuration>
Используйте значения издателя и ключа, полученные на портале управления, как описано в предыдущем разделе.
Управление темами и подписками шины обслуживания осуществляется с помощью класса NamespaceManager. Класс NamespaceManager содержит методы для создания, перечисления и удаления очередей.
В данном примере объект NamespaceManager создается на основе класса Windows Azure CloudConfigurationManager со строкой подключения, состоящей из базового адреса пространства имен шины обслуживания и соответствующих учетных данных с правами на управление. Строка подключения имеет формат
Endpoint=sb://<yourServiceNamespace>.servicebus.windows.net/;SharedSecretIssuer=<issuerName>;SharedSecretValue=<yourDefaultKey>
В качестве примера возьмем параметры конфигурации из предыдущего раздела.
// Create the topic if it does not exist already
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager =
NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.TopicExists("TestTopic"))
{
namespaceManager.CreateTopic("TestTopic");
}
У метода CreateTopic есть несколько перегруженных копий, позволяющих тонко настраивать свойства, например задавать время жизни по умолчанию для сообщений, отправляемых в тему. Эти параметры применяются посредством класса TopicDescription. В следующем примере мы создадим тему под названием TestTopic с максимальным размером 5 ГБ и временем жизни по умолчанию для сообщений 1 минута.
// Configure Topic Settings
TopicDescription td = new TopicDescription("TestTopic");
td.MaxSizeInMegabytes = 5120;
td.DefaultMessageTimeToLive = new TimeSpan(0, 1, 0);
// Create a new Topic with custom settings
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager =
NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.TopicExists("TestTopic"))
{
namespaceManager.CreateTopic(td);
}
Примечание. С помощью метода TopicExists в объектах NamespaceManager можно проверить, есть ли тема с таким именем в пространстве имен службы.
Как создать подписки
Подписки тем также создаются с помощью класса NamespaceManager. Подпискам присваиваются имена и дополнительные фильтры, ограничивающие набор сообщений, которые передаются в виртуальную очередь подписки.
Фильтр MatchAll по умолчанию применяется к созданной подписке, если для нее не задан особый фильтр. При использовании MatchAll все сообщения, публикуемые в теме, помещаются в виртуальную очередь подписки. В следующем примере создается подписка под названием AllMessages с фильтром MatchAll по умолчанию.
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager =
NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.SubscriptionExists("TestTopic", "AllMessages"))
{
namespaceManager.CreateSubscription("TestTopic", "AllMessages");
}
Можно также настроить фильтры, чтобы ограничить набор отправляемых в тему сообщений для определенной подписки этой темы.
Самым гибким фильтром для подписок является SqlFilter, реализующий подмножество языка SQL92. Фильтры SQL работают со свойствами сообщений, опубликованных в теме. Подробную информацию о выражениях, поддерживаемых в фильтре SQL, можно найти в документации по синтаксису SqlFilter.SqlExpression.
В следующем примере создается подписка с названием HighMessages и фильтром SqlFilter, который пропускает только сообщения со свойством MessageNumber больше 3.
// Create a "HighMessages" filtered subscription
SqlFilter highMessagesFilter =
new SqlFilter("MessageNumber > 3");
namespaceManager.CreateSubscription("TestTopic",
"HighMessages",
highMessagesFilter);
В следующем примере создается подписка с названием LowMessages и фильтром SqlFilter, который пропускает только сообщения со свойством MessageNumber меньше 3.
// Create a "LowMessages" filtered subscription
SqlFilter lowMessagesFilter =
new SqlFilter("MessageNumber <= 3");
namespaceManager.CreateSubscription("TestTopic",
"LowMessages",
lowMessagesFilter);
Теперь сообщение, отправленное в TestTopic, будет доставлено всем подписчикам AllMessages, а также некоторым подписчикам HighMessages и LowMessages (в зависимости от содержания сообщения).
Для отправки сообщения в тему шины обслуживания приложение должно создать объект MessageSender. Аналогично объектам NamespaceManager этот объект создается на основе базового URI пространства имен службы и соответствующих учетных данных (строки подключения).
В приведенном ниже коде показано, как получить объект MessageSender для созданной ранее темы TestTopic.
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
TopicClient Client =
TopicClient.CreateFromConnectionString(connectionString, "TestTopic");
Client.Send(new BrokeredMessage());
Сообщения, отправляемые в темы сервисной шины, являются экземплярами класса BrokeredMessage. Объекты BrokeredMessage имеют набор стандартных свойств (таких как Label и TimeToLive), словарь для добавления особых свойств приложения, а также различные данные приложения. Приложение может сформировать тело сообщения, передав сериализованный объект в конструктор класса BrokeredMessage, причем для сериализации объекта будет использоваться соответствующий класс DataContractSerializer. Кроме того, можно применить System.IO.Stream.
В следующем примере показано, как отправить пять тестовых сообщений в объект MessageSender темы TestTopic, взятый из предыдущего фрагмента кода. Обратите внимание, что значение свойства MessageNumber для каждого сообщения изменяется на каждом шаге цикла (это свойство определяет, какие подписки получат данное сообщение).
for (int i=0; i<5; i++)
{
// Create message, passing a string message for the body
BrokeredMessage message =
new BrokeredMessage("Test message " + i);
// Set additional custom app-specific property
message.Properties["MessageNumber"] = i;
// Send message to the topic
Client.Send(message);
}
Темы шины обслуживания поддерживают сообщения размером до 256 МБ (размер заголовка, содержащего стандартные и особые свойства приложения, может достигать 64 МБ). Количество сообщений для темы не ограничено, однако их общий объем все же лимитирован. Размер очереди задается в момент создания и не может превышать 5 ГБ.
Простейший способ получить сообщение из подписки — воспользоваться объектом MessageReceiver. Объекты MessageReceiver поддерживают два режима работы: ReceiveAndDelete и PeekLock.
В режиме ReceiveAndDelete получение является единовременной операцией: когда сервисная шина получает запрос на чтение сообщения в подписке, она помечает сообщение как принятое и возвращает его приложению. Режим ReceiveAndDelete реализует простейшую модель работы для тех сценариев, в которых приложение не обрабатывает сообщения по причине собственного сбоя. Чтобы лучше понять это, представьте, что приложение-потребитель отправляет запрос на получение и затем аварийно завершается до его обработки. Поскольку шина обслуживания пометит такое сообщение как принятое, то после перезапуска приложение пропустит его, когда начнет приемку сообщений.
В режиме PeekLock, используемом по умолчанию, процедура получения состоит из двух этапов. Это позволяет поддерживать приложения, которые не должны пропускать сообщения. Когда шина обслуживания получает запрос, она находит следующее за ним сообщение и ставит блокировку, чтобы другие потребители не получили его, и затем возвращает его приложению. После того как приложение обработает сообщение (или надежно сохранит его для дальнейшего анализа), начнется второй этап процедуры получения: будет вызвана функция Complete для принятого сообщения. Как только шина обслуживания обнаружит вызов Complete, она пометит сообщение как принятое и удалит его из подписки.
В следующем примере показано, как принимаются и обрабатываются сообщения в режиме PeekLock по умолчанию. В следующем примере создается бесконечный цикл обработки сообщений, отправляемых в подписку HighMessages. Обратите внимание, что подписка HighMessage" имеет формат «<путь к теме>/subscriptions/<имя подписки>».
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
SubscriptionClient Client =
SubscriptionClient.CreateFromConnectionString
(connectionString, "TestTopic", "HighMessages");
Client.Receive();
// Continuously process messages received from the HighMessages subscription
while (true)
{
BrokeredMessage message = Client.Receive();
if (message != null)
{
try
{
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("MessageNumber: " +
message.Properties["MessageNumber"]);
// Remove message from subscription
message.Complete();
}
catch (Exception)
{
// Indicate a problem, unlock message in subscription
message.Abandon();
}
}
}
Шина обслуживания позволяет эффективно восстанавливать работу приложения после сбоя или ошибки обработки сообщений. Если конечному приложению не удалось обработать сообщение по какой-либо причине, оно может вызвать метод Abandon для этого сообщения (вместо метода Complete). В результате шина обслуживания разблокирует сообщение в подписке и снова разрешит его получение для того же самого или другого приложения-потребителя.
Кроме того, для заблокированного сообщения в подписке предусмотрен период ожидания — если приложение не сможет обработать сообщение за время действия блокировки (например, аварийно завершится), то шина обслуживания автоматически разблокирует сообщение и снова разрешит его получение.
Если приложение аварийно завершится после обработки сообщения, но до отправки запроса Complete, то при перезапуске приложения сообщение будет доставлено повторно. Такой подход часто называют обработкой по принципу «хотя бы одно» , т. е. каждое сообщение обрабатывается по меньшей мере один раз, но в некоторых случаях оно может быть отправлено повторно. Если приложение не должно обрабатывать дубликаты, то разработчикам необходимо реализовать в нем дополнительную проверку таких ситуаций. Это можно сделать с помощью свойства MessageId сообщения, которое не изменяется при повторных доставках.
В следующем примере показано, как удалить тему под названием TestTopic из пространства имен службы HowToSample.
// Delete Topic
namespaceManager.DeleteTopic("TestTopic");
При удалении темы будут также удалены все зарегистрированные в ней подписки. Кроме того, подписки можно удалить и по отдельности. В приведенном ниже коде показано, как удалить подписку HighMessages из темы TestTopic.
namespaceManager.DeleteSubscription("TestTopic", "HighMessages");
Теперь вы приобрели базовые знания о темах и подписках шины обслуживания. Для дальнейшего изучения перейдите по ссылкам ниже.
- См. руководство MSDN «Очереди, темы и подписки».
- Документация по API SqlFilter.
- Создание приложения для отправки и получения сообщений из очереди шины обслуживания: «Руководство по обмену сообщениями через шину обслуживания на .NET».