Microsoft Azure изнутри

Microsoft Azure Service Bus: шаблоны обмена сообщениями с использованием сеансов

Бруно Теркали
Рикардо Виллалобос

Исходный код можно скачать по ссылке.

Бруно Теркали и Рикардо ВиллалобосВ одной из наших предыдущих статей мы обсуждали, насколько важно применять шаблоны обмена сообщениями (messaging patterns) в облаке, чтобы разрывать жесткие связи между решениями и способствовать поддержке легко масштабируемых программных архитектур (см. «Comparing Microsoft Azure Queues and Service Bus Queues» по ссылке msdn.microsoft.com/magazine/jj159884). Очереди — один из таких шаблонов обмена сообщениями, и платформа Microsoft Azure предлагает два основных варианта реализации этого подхода: сервисы хранилища очередей и Service Bus Queues; в обоих сценариях множество потребителей принимает и обрабатывает сообщения в очереди. Это каноническая модель поддержки варьируемой рабочей нагрузки в облаке, где можно динамически добавлять или удалять получатели в зависимости от размера очереди, тем самым предлагая механизм распределения нагрузки и преодоления сбоев для серверной части (рис. 1).

Шаблон обмена сообщениями через очереди: каждое сообщение потребляется одним получателем

Рис. 1. Шаблон обмена сообщениями через очереди: каждое сообщение потребляется одним получателем

Sender 1 Отправитель 1
Sender 2 Отправитель 2
Sender 3 Отправитель 3
Single Queue Единая очередь
Receiver 1 Получатель 1
Receiver 2 Получатель 2
Receiver 3 Получатель 3

Хотя шаблон обмена сообщениями через очереди — отличное решение для простого разъединения решений, нередки ситуации, где каждому получателю требуется своя копия сообщения с возможностью отбрасывания некоторых сообщений на основе неких правил. Хороший пример сценария этого типа показан на рис. 2, иллюстрирующем проблему, с которой часто сталкиваются розничные компании при отправке информации множеству филиалов, например новых каталогов товаров или обновленного прайс-листа.

Шаблон обмена сообщениями «публикатор/подписчик»: каждое сообщение может потребляться более чем один раз
Рис. 2. Шаблон обмена сообщениями «публикатор/подписчик»: каждое сообщение может потребляться более чем один раз

Headquarters Головной офис
Topic Topic
Subscription 1 Подписка 1
Subscription 2 Подписка 2
Subscription 3 Подписка 3
Store 1 Магазин 1
Store 2 Магазин 2
Store 3 Магазин 3

В этих ситуациях лучше подходит шаблон «публикатор/подписчик» (publisher/subscriber), где получатели просто выражают заинтересованность в одной или более категориях сообщений, подключаясь к независимой подписке, которая содержит копию потока сообщений. Microsoft Azure Service Bus реализует этот шаблон на основе тем (topics) и подписок, значительно расширяя возможности управления тем, как распространяются сообщения, на основе независимых правил и фильтров. В этой статье мы поясним, как использовать эти возможности Microsoft Azure Service Bus на простом примере из реальной жизни, и будем исходить из следующих требований:

  1. информация о товарах должна приниматься по порядку на основе каталожной страницы;
  2. некоторые из хранилищ не содержат какие-то категории каталога, и товары в этих категориях следует отфильтровывать для каждого магазина;
  3. новая информация каталога не должна применяться к системе хранения, пока не прибудут все сообщения.

Все примеры кода в этой статье были созданы в Visual Studio 2012 с использованием C# в качестве языка программирования. Кроме того, вам понадобятся Microsoft Azure SDK version 1.8 для .NET-разработчиков и доступ к подписке Microsoft Azure.

Подготовка плана обмена сообщениями для проекта

Прежде чем писать код, нужно определить различные сущности (темы и подписки), которые станут частью рабочего процесса обмена сообщениями. Это делается через Microsoft Azure Portal на manage.windowsazure.com. Войдите на портал со своими удостоверениями и действуйте по следующей схеме.

  1. Щелкните значок Create New в левой нижней части Management Portal.
  2. Щелкните значок APP SERVICES, затем SERVICE BUS TOPIC и, наконец, CUSTOM CREATE (рис. 3).
  3. В первом диалоге введите название темы и выберите подходящий регион и идентификатор подписки Microsoft Azure. Если это ваше первое пространство имен в выбранном регионе, мастер предложит очередь пространства имен (namespace queue): [имя вашей сущности]-ns. Это значение можно изменить.
  4. Щелкните метку NEXT (стрелку, указывающую вправо), чтобы вставить оставшиеся свойства. Вы можете согласиться со значениями по умолчанию. Щелкните галочку для создания темы (topic).
  5. Щелкните значок Service Bus на навигационной панели слева, чтобы получить список всех пространств имен. Заметьте, что созданное пространство имен может появиться в списке с некоторым запозданием. На его создание и обновление интерфейса портала уходит несколько секунд.
  6. Выберите только что созданный Topic из списка и щелкните ACCESS KEY, который вы найдете в нижней части экрана. Запишите себе полную строку подключения для последующего использования.
  7. В верхней части экрана Microsoft Azure Portal щелкните SUBSCRIPTIONS, а затем CREATE A NEW SUBSCRIPTION. В появившемся диалоге введите название (в нашем примере мы использовали «Store1Sub») и щелкните стрелку для продолжения.
  8. На следующем экране оставьте значения по умолчанию, но обязательно пометьте параметр Enable sessions. Щелкните галочку, чтобы создать подписку. Сеансы будут использоваться подписчиками для приема сообщений по порядку.
  9. Повторите операции из пп. 7 и 8 для каждого из трех магазинов.

Создание нового Service Bus Topic через портал Microsoft Azure
Рис. 3. Создание нового Service Bus Topic через портал Microsoft Azure

Создав темы и подписки, вы можете напрямую обращаться к ним и из Visual Studio. Для этого откройте Server Explorer (View | Server Explorer) и раскройте узел Microsoft Azure Service Bus (рис. 4). Щелкните правой кнопкой мыши этот узел Microsoft Azure Service Bus и выберите Add New Connection. Укажите Namespace Name, Issuer Name (обычно «владелец») и Issuer Access Key, который вы записали себе при создании пространства имен Microsoft Azure на портале.

Создание темы и подписок Service Bus средствами Visual Studio
Рис. 4. Создание темы и подписок Service Bus средствами Visual Studio

Помните, что вы можете программным способом создавать эти сущности и управлять ими через классы в пространстве имен Microsoft.ServiceBus.Messaging, включая TopicClient и SubscriptionClient, которые используются далее в этой статье.

Создав базовую структуру рабочего процесса обмена сообщениями, мы будем имитировать трафик с помощью двух консольных приложений, созданных в Visual Studio, как показано на рис. 5. Первой консольное приложение, MSDNSender, будет отправлять каталог товаров, а второе — MSDNReceiver — принимать информацию в каждом из магазинов. Анализ кода будет дан в следующих разделах. В шаблоне «публикатор/подписчик», или сокращенно Pub/Sub, программа MSDNSender является публикатором, а MSDNReceiver — подписчиком.

Решение Visual Studio, имитирующее сценарий с каталогом товаров
Рис. 5. Решение Visual Studio, имитирующее сценарий с каталогом товаров

Отправка каталога товаров из головного офиса

Как видно из рис. 2, головной офис (публикатор) посылает сообщения в Topic. Эта логика представлена кодом в основном файле, Program.cs, который является частью проекта MSDNSender. Program.cs инкапсулирует логику и код для отправки списка товаров как индивидуальных сообщений в Topic. Давайте рассмотрим различные части, начиная с метода Main. Заметьте, что сначала мы создаем клиент для Topic:

// Создаем topicClient, используя удостоверения для Service Bus
TopicClient topicClient =
  TopicClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName);

Как только topicClient создан, публикатор может с его помощью посылать сообщения. Список товаров хранится в XML-файле ProductsCatalog.xml; в нем содержится перечень из десяти сущностей Product, которые будут преобразованы в массив объектов. Затем объекты Product отображаются на классы Catalog и Product, хранящиеся в файле Product.cs:

// Десериализуем XML-файл с объектами Product
// и сохраняем их в массиве объектов
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();

Каждый Product в массиве catalog представляет структуру, показанную на рис. 6.

Рис. 6. Представление класса для объектов Product в каталоге

public class Product
  {
    [System.Xml.Serialization.XmlElement("ProductId")]
    public string ProductId { get; set; }
    [System.Xml.Serialization.XmlElement("ProductName")]
    public string ProductName { get; set; }
    [System.Xml.Serialization.XmlElement("Category")]
    public string Category { get; set; }
    [System.Xml.Serialization.XmlElement("CatalogPage")]
    public int CatalogPage { get; set; }
    [System.Xml.Serialization.XmlElement("MSRP")]
    public double MSRP { get; set; }
    [System.Xml.Serialization.XmlElement("Store")]
    public string Store { get; set; }
  }

Внутри цикла по массиву вызов метода CreateMessage извлекает различные свойства объектов Product и присваивает их сообщению, которое потом будет отправлено. Два свойства требуют дополнительного внимания:

if (isLastProductInArray)
  message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;

Сеансы чрезвычайно важны, потому что позволяют получателю определять, все ли сообщения, принадлежащие конкретной логической группе, приняты. В этом случае, устанавливая свойство SessionId сообщения, мы указываем, что получатель не должен использовать информацию каталога, пока не будут получены все сообщения с одинаковым значением catalogName. Кроме того, для последнего товара в массиве мы добавляем новое свойство, IsLastMessageInSession, которое позволит получателям определять, поступило ли последнее сообщение в сеансе, и каталог может быть полностью обработан. На рис. 7 показана выполняемая программа MSDNSender.

Выполнение проекта MSDNSender
Рис. 7. Выполнение проекта MSDNSender

Получение каталога товаров с помощью подписок в магазинах

Теперь, когда каталог и объекты-товары отправлены в Topic и скопированы в различные подписки, переключимся на проект MSDNReceiver, где сообщения принимаются и обрабатываются. Заметьте, что в методе Main в Program.cs код создает клиент для Subscription на основе информации, предоставляемой пользователем через команду Console.ReadLine. Ожидается, что пользователи будут вводить номера своих магазинов, отражая тем самым сообщения, которые им нужно принимать. Короче говоря, каждый филиал интересуют только сообщения, применимые к соответствующему магазину:

Console.WriteLine("Enter Store Number");
string storeNumber = Console.ReadLine();
Console.WriteLine("Selecting Subscription for Store...");
// Создаем клиент подписки на Topic
SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName,
  "Store" + storeNumber.Trim() + "Sub", 
  ReceiveMode.PeekLock);

Поскольку мы принимаем сообщения от подписок на основе сеансов (как пояснялось в предыдущем разделе), нам нужно запросить следующее сообщение, используя строку кода:

MessageSession sessionReceiver = subscriptionClient.
  AcceptMessageSession(TimeSpan.FromSeconds(5));

В основном это означает, что клиент будет проверять наличие в подписке любых подлежащих обработке сообщений (у которых свойство SessionId не равно null), и, если в течение пяти секунд таких сообщений не попадется, время ожидания запроса истечет, после чего приложение-получатель будет завершено. С другой стороны, если сеанс найден, будет вызван метод ReceivingSessionMessages. Прежде чем перейти к этой части кода, давайте обсудим концепцию состояния сеанса. Это состояние позволяет разработчику хранить информацию, которая может использоваться, пока принимаются сообщения, относящиеся к той же транзакции. В данном случае мы используем состояние сеанса для «запоминания» последней полученной страницы каталога, а также сообщений (товаров), прибывших вне должного порядка.

С учетом сказанного рабочий процесс в коде выглядит так.

  1. Текущее сообщение принимается в методе ReceiveSessionMessages (рис. 8), который полагается в обработке на метод ProcessMessage (рис. 9).
  2. Внутри метода ProcessMessage, если сообщение принято вне последовательности, оно автоматически откладывается, а его идентификатор сохраняется в состоянии сеанса. Иначе оно помечается как «завершенное» (complete) и удаляется из подписки. Кроме того, в сеансе сохраняется следующая ожидаемая последовательность — страница каталога.
  3. После обработки текущего принятого сообщения последующий код в ReceiveSessionMessages проверяет наличие идентификаторов отложенных сообщений в сеансе и пытается обработать их снова с учетом самой последней страницы каталога.
  4. Как только в сеансе приняты все сообщения, получатель закрывается.

Рис. 8. Метод ReceivedSessionMessages

static void ReceiveSessionMessages(MessageSession receiver)
  {
    // Считываем сообщения из подписки, пока она не опустеет
    Console.WriteLine("Reading messages from subscription {0}",
      receiver.Path);
    Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
    Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
    SequenceState sessionState = GetState(receiver);
    BrokeredMessage receivedMessage;
    while ((receivedMessage = receiver.Receive()) != null)
    {
      string sessionId = receiver.SessionId;
      ProcessMessage(receivedMessage, ref sessionState, receiver);
      while (sessionState.GetNextOutOfSequenceMessage() != -1)
      {
        // Вызываем обратно отложенные сообщения
        Console.WriteLine("Calling back for deferred message: Category {0}, 
          Message sequence {1}", receiver.SessionId,
            sessionState.GetNextSequenceId());
        receivedMessage = receiver.Receive(
          sessionState.GetNextOutOfSequenceMessage());
        ProcessMessage(receivedMessage, ref sessionState, receiver);
      }
      if (receivedMessage.Properties.ContainsKey(
        "IsLastMessageInSession"))
        break;
    }
    SetState(receiver, null);
    receiver.Close();
  }

Рис. 9. Метод ProcessMessage

static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
  MessageSession session = null)
  {
    if (session != null)
    {
      int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
      if (sessionState.GetNextSequenceId() == messageId)
      {
        OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
        sessionState.SetNextSequenceId(messageId + 1);
        message.Complete();
        SetState(session, sessionState);
      }
      else
      {
        Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}", 
          session.SessionId, messageId);
        sessionState.AddOutOfSequenceMessage(messageId,
          message.SequenceNumber);
        message.Defer();
        SetState(session, sessionState);
      }
    }
    Thread.Sleep(receiverDelay);
  }

Учитывайте, что для этого проекта идентификаторы отложенных сообщений хранятся в состоянии сеанса и потенциально могут быть потеряны. В производственной среде мы рекомендуем для этой цели использовать какое-либо постоянное хранилище (один из вариантов — Microsoft Azure Tables). Заметьте: если сообщение содержит свойство IsLastMessageSessionInSession (установленное в процессе отправки), цикл сеанса завершается. Консольный вывод для проекта MSDNReceiver показан на рис. 10.

Выполнение проекта MSDNReceiver
Рис. 10. Выполнение проекта MSDNReceiver

Подписки Microsoft Azure Service Bus позволяют создавать специфические правила, чтобы отфильтровывать ненужные сообщения. В данном случае было бы сравнительно легко создать правило, которое разделяет товары по категории или по номеру магазина (что мы проигнорировали в этом проекте). Правила можно создавать программным способом, напрямую через портал Microsoft Azure или средствами Visual Studio.

Заключение

Microsoft Azure Service Bus предлагает потрясающе надежную и гибкую реализацию шаблона публикации и подписки. На основе тем и подписок можно решать множество других задач. Поддержка множества отправителей, широковещательно посылающих сообщения множеству получателей, в сочетании с поддержкой логического группирования и сортировки сообщений открывает перед современными разработчиками массу возможностей. Более того, способность использовать постоянный (persistent) сеанс для отслеживания состояния упрощает логическое группирование сообщений и управление их последовательностью. В мире, где распределенные среды стали нормой, понимание того, как использовать шаблоны обмена сообщениями и соответствующие инструменты, крайне важно для нынешних архитекторов ПО, имеющих дело с облаком.


Бруно Теркали (Bruno Terkaly)* — разработчик-идеолог в Microsoft. Его глубокие знания обусловлены долголетним опытом работы в различных областях, написанием кода с использованием множества платформ, языков, инфраструктур, SDK, библиотек и API. Основное внимание в своей работе он уделяет написанию кода, ведению блога и проведению презентаций по созданию облачных приложений, в частности на платформе Microsoft Azure. Не так давно им была написана серия статей по iOS и Microsoft Azure Mobile Services, доступна по ссылке bit.ly/UPXGdV.*

Рикардо Виллалобос (Ricardo Villalobos) — квалифицированный архитектор ПО более чем с 15-летним опытом проектирования и создания приложений для компаний в сфере управления цепочками поставок. Обладатель различных технических сертификатов, а также степени магистра в управлении бизнесом от Университета Далласа, работает в качестве архитектора облака в инкубационной группе Microsoft Azure CSV для Microsoft.

Выражаем благодарность за рецензирование статьи эксперту Абишеку Лалу (Abhishek Lal).