Руководство. Обновление информации о запасах с помощью портала Azure, разделов и подписок

Служба "Служебная шина Azure" — это мультитенантное облачное решение для обмена сообщениями, которое позволяет пересылать информацию между приложениями и службами. Асинхронная работа обеспечивает гибкий обмен сообщениями через брокер и обработку сообщений в порядке поступления, а также возможность публикации и подписки. Подробный обзор Служебной шины Azure см. в статье Что такое служебная шина.

В этом руководстве объясняется, как использовать подписки и темы Служебной шины Azure для обновления списка розничных товаров через каналы публикации (подписки) с помощью портала Azure и .NET. Такой сценарий, например, отлично подходит для обновления сведений об ассортименте товаров в нескольких магазинах розничной торговли. В этом сценарии каждый магазин или группа магазинов получают сообщения, позволяющие им обновить ассортимент. В этом руководстве показано, как реализовать этот сценарий с помощью подписок и фильтров. Сначала вы создадите раздел с тремя подписками, добавите в него правила и фильтры, а затем отправите и получите сообщения через разделы и подписки.

Image showing a sender, a topic with three subscriptions, and three receivers.

В этом руководстве описано следующее:

  • создание раздела служебной шины и трех подписок на этот раздел с помощью портала Azure;
  • добавление фильтров для подписок с помощью кода .NET;
  • создание сообщений с разным содержимым;
  • отправка сообщений и проверка их поступления в ожидаемые подписки;
  • получение сообщений из подписок.

Необходимые компоненты

В рамках этого руководства вам потребуются:

  • Подписка Azure. Чтобы использовать службы Azure, включая Служебную шину Azure, вам потребуется подписка. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
  • Visual Studio 2019 или более поздней версии.

Разделы и подписки служебной шины

Каждая подписка на раздел может получать копию любого сообщения. Разделы полностью совместимы с очередями служебной шины на уровнях протоколов и семантики. Разделы служебной шины поддерживают широкий набор правил отбора и условий фильтра, а также позволяют создать действия для присвоения или изменения свойств сообщения. Каждый раз, когда срабатывает правило, создается новое сообщение. Чтобы узнать больше о правилах, фильтрах и действиях, перейдите по этой ссылке.

Создание пространства имен на портале Azure

Чтобы приступить к использованию сущностей обмена сообщениями в служебной шине в Azure, сначала необходимо создать пространство имен с уникальным для Azure именем. Пространство имен предоставляет контейнер области для служебная шина ресурсов (очередей, тем и т. д.) в приложении.

Создание пространства имен службы:

  1. Войдите на портал Azure.

  2. Перейдите на страницу "Все службы".

  3. На панели навигации слева выберите "Интеграция" из списка категорий, наведите указатель мыши на служебная шина, а затем нажмите + кнопку на плитке служебная шина.

    Image showing selection of Create a resource, Integration, and then Service Bus in the menu.

  4. В теге Основные сведения на странице Создание пространства имен выполните следующие действия:

    1. Выберите подписку Azure, в которой будет создано пространство имен.

    2. Выберите существующую группу ресурсов, в которую будет включено это пространство имен, или создайте новую.

    3. Введите имя для пространства имен. В имени пространства имен должны соблюдаться следующие соглашения об именовании:

      • Это имя должно быть уникальным в пределах Azure. Система немедленно проверяет, доступно ли оно.
      • Длина имени составляет не менее 6 и не более 50 символов.
      • Имя может содержать только буквы, цифры и дефисы (-).
      • Имя должно начинаться с буквы или цифры и заканчиваться буквой или цифрой.
      • Имя не должно оканчиваться на -sb или -mgmt.
    4. Укажите расположение — регион для размещения пространства имен.

    5. Для параметра Ценовая категория выберите ценовую категорию ("Базовый", "Стандартный" или "Премиум") для пространства имен. Для работы с этим кратким руководством выберите вариант Стандартный.

      Внимание

      Чтобы использовать разделы и подписки, выберите категорию "Стандартный" или "Премиум". Разделы и подписки не поддерживаются в ценовой категории "Базовый".

      Если выбрана ценовая категория Премиум, укажите число единиц обмена сообщениями. В категории "Премиум" обеспечивается изоляция ресурсов на уровне ЦП и памяти, так что рабочая нагрузка выполняется изолированно от других. Контейнер ресурса называется единицей обмена сообщениями. Пространству имен ценовой категории "Премиум" выделяется по крайней мере одна единица обмена сообщениями. Для каждого пространства имен служебной шины Premium можно выбрать 1, 2, 4, 8 или 16 единиц обмена сообщениями. Дополнительные сведения см. в статье Уровни обмена сообщениями через служебную шину Premium и Standard.

    6. В нижней части страницы выберите Review + create (Проверить и создать).

      Image showing the Create a namespace page

    7. На странице Проверить и создать проверьте параметры и нажмите кнопку Создать.

  5. После успешного развертывания ресурса выберите "Перейти к ресурсу " на странице развертывания.

    Image showing the deployment succeeded page with the Go to resource link.

  6. Вы увидите домашнюю страницу пространства имен служебной шины.

    Image showing the home page of the Service Bus namespace created.

Получение строка подключения в пространство имен (портал Azure)

При создании нового пространства имен автоматически создается начальная политика подписанного URL-адреса (SAS), первичный и вторичный ключи, а также первичная и вторичная строки подключения, каждая из которых предоставляет полный контроль над всеми аспектами пространства имен. Сведения о том, как в дальнейшем создавать правила с ограниченными правами для постоянных отправителей и получателей, см. в статье Аутентификация и авторизация в служебной шине.

Клиент может использовать строка подключения для подключения к пространству имен служебная шина. Чтобы скопировать первичную строку подключения для пространства имен, выполните следующие действия:

  1. На странице Пространство имен служебной шины выберите Политики общего доступа в меню слева.

  2. На странице Политики общего доступа щелкните RootManageSharedAccessKey.

  3. В окне Политика: RootManageSharedAccessKey нажмите кнопку копирования рядом с полем Первичная строка подключения, чтобы скопировать строку подключения в буфер обмена для последующего использования. Вставьте на время эти значения в Блокноте или любом другом месте.

    Screenshot shows an S A S policy called RootManageSharedAccessKey, which includes keys and connection strings.

    Эту страницу можно использовать для копирования первичного и вторичного ключей, а также первичной и вторичной строки подключения.

Создание раздела с помощью портала Azure

  1. На странице Пространство имен служебной шины выберите пункт Разделы в левом меню.

  2. На панели инструментов выберите + Раздел.

  3. Введите имя раздела. Для других параметров оставьте значения по умолчанию.

  4. Нажмите кнопку создания.

    Screenshot of the Create topic page.

Создание подписок на раздел

  1. Выберите раздел, который был создан в предыдущем разделе.

    Screenshot of the Topics page with your topic selected.

  2. На странице Раздел служебной шины выберите пункт Подписки в левом меню, а затем — + Подписка на панели инструментов.

    Screenshot of the Subscriptions page with the Add subscription button selected.

  3. На странице Создать подписку выполните следующие действия:

    1. Введите S1 в качестве имени подписки.

    2. Выберите Создать, чтобы создать подписку.

      Screenshot of the Create subscription page.

  4. Повторите предыдущий шаг дважды, создав подписки с именами S2 и S3.

Создание правил фильтрации по подпискам

После подготовки пространства имен и разделов и подписок, а также строка подключения в пространство имен, вы готовы создать правила фильтрации для подписок, а затем отправлять и получать сообщения. Этот код можно изучить в папке с примером на GitHub.

Отправка и получение сообщений

Чтобы выполнить код:

  1. В командной строке или в командной строке PowerShell выполните следующую команду, которая клонирует репозиторий GitHub для Служебной шины:

    git clone https://github.com/Azure/azure-service-bus.git
    
  2. Перейдите к папке azure-service-bus\samples\DotNet\Azure.Messaging.ServiceBus\BasicSendReceiveTutorialWithFilters с примерами.

  3. Получите строка подключения, скопированные в Блокнот ранее в этом руководстве. Также здесь потребуется имя раздела, который вы создали в предыдущем разделе.

  4. В командной строке введите следующую команду:

    dotnet build
    
  5. Перейдите в папку BasicSendReceiveTutorialWithFilters\bin\Debug\netcoreapp3.1.

  6. Введите приведенную ниже команду, чтобы запустить программу. Не забудьте ввести вместо myConnectionString значение, которое вы получили ранее, а вместо myTopicName — имя созданного раздела:

    dotnet --roll-forward Major BasicSendReceiveTutorialWithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"
    
  7. Следуйте инструкциям в консоли, чтобы выбрать процедуру создания фильтра. При создании фильтров необходимо также удалить стандартные фильтры. Если вы используете PowerShell или CLI, стандартный фильтр удалять не требуется, но при выполнении операций в коде это сделать необходимо. Команды консоли 1 и 3 позволяют управлять фильтрами для ранее созданных подписок:

    • Выполните команду 1, чтобы удалить стандартные фильтры.

    • Выполните команду 2, чтобы добавить собственные фильтры.

    • Выполните 3. Пропустите этот шаг для руководства. При необходимости этот параметр удаляет собственные фильтры. Это действие не восстанавливает стандартные фильтры.

      Showing output of 2

  8. Завершив создание фильтров, вы можете отправлять сообщения. Нажмите клавишу 4 и наблюдайте, как в раздел отправляются 10 сообщений:

    Send output

  9. Нажмите клавишу 5 и наблюдайте за получением этих сообщений. Если не все 10 сообщений будут успешно получены, нажмите клавишу M для отображения меню и снова нажмите 5.

    Receive output

Очистка ресурсов

Выполните следующие действия, чтобы очистить ресурсы, которые больше не нужны.

  1. Перейдите к пространству имен на портале Azure.
  2. На странице Служебная шина пространства имен выберите Удалить на панели команд, чтобы удалить пространство имен и ресурсы (очереди, разделы и подписки) в ней.

Разбор примера кода

Этот раздел содержит дополнительные сведения о работе этого примера кода.

Получение строки подключения и раздела

Прежде всего этот код объявляет набор переменных, которые управляют выполнением остальных частей программы.

string ServiceBusConnectionString;
string TopicName;

static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
    { "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
    { "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
    { "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule. 
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
    { "S1", "" },
    { "S2", "" },
    { "S3", "SET sys.Label = 'SalesEvent'"  }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.

Строка подключения и имя раздела передаются через параметры командной строки, как показано здесь, и затем считываются в метод Main():

static void Main(string[] args)
{
    string ServiceBusConnectionString = "";
    string TopicName = "";

    for (int i = 0; i < args.Length; i++)
    {
        if (args[i] == "-ConnectionString")
        {
            Console.WriteLine($"ConnectionString: {args[i + 1]}");
            ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
        }
        else if (args[i] == "-TopicName")
        {
            Console.WriteLine($"TopicName: {args[i + 1]}");
            TopicName = args[i + 1]; // Alternatively enter your queue name here.
        }
    }

    if (ServiceBusConnectionString != "" && TopicName != "")
    {
        Program P = StartProgram(ServiceBusConnectionString, TopicName);
        P.PresentMenu().GetAwaiter().GetResult();
    }
    else
    {
        Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
        Console.ReadKey();
    }
}

Удаление стандартных фильтров

При создании подписки в Служебной шине создается стандартный фильтр для каждой подписки. Этот фильтр позволяет получать каждое сообщение, отправленное в раздел. Если вы намерены использовать пользовательские фильтры, этот стандартный фильтр можно удалить, как показано в следующем коде:

private async Task RemoveDefaultFilters()
{
    Console.WriteLine($"Starting to remove default filters.");

    try
    {
        var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
        foreach (var subscription in Subscriptions)
        {
            await client.DeleteRuleAsync(TopicName, subscription, CreateRuleOptions.DefaultRuleName);
            Console.WriteLine($"Default filter for {subscription} has been removed.");
        }

        Console.WriteLine("All default Rules have been removed.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

Создание фильтров

В следующем коде добавляются пользовательские фильтры, которые описаны в этом руководстве:

private async Task CreateCustomFilters()
{
    try
    {
        for (int i = 0; i < Subscriptions.Length; i++)
        {
            var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
            string[] filters = SubscriptionFilters[Subscriptions[i]];
            if (filters[0] != "")
            {
                int count = 0;
                foreach (var myFilter in filters)
                {
                    count++;

                    string action = SubscriptionAction[Subscriptions[i]];
                    if (action != "")
                    {
                        await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
                        {
                            Filter = new SqlRuleFilter(myFilter),
                            Action = new SqlRuleAction(action),
                            Name = $"MyRule{count}"
                        });
                    }
                    else
                    {
                        await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
                        {
                            Filter = new SqlRuleFilter(myFilter),
                            Name = $"MyRule{count}"
                        });
                    }
                }
            }

            Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
        }

        Console.WriteLine("All filters and actions have been created.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

Удаление созданных пользовательских фильтров

Если нужно удалить из подписки все фильтры, воспользуйтесь следующим примером кода:

private async Task CleanUpCustomFilters()
{
    foreach (var subscription in Subscriptions)
    {
        try
        {
            var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
            IAsyncEnumerator<RuleProperties> rules = client.GetRulesAsync(TopicName, subscription).GetAsyncEnumerator();
            while (await rules.MoveNextAsync())
            {
                await client.DeleteRuleAsync(TopicName, subscription, rules.Current.Name);
                Console.WriteLine($"Rule {rules.Current.Name} has been removed.");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }
    Console.WriteLine("All default filters have been removed.\n");

    await PresentMenu();
}

Отправка сообщений

Отправка сообщений в раздел происходит так же, как и отправка сообщений в очередь. В этом примере показано, как отправлять сообщения, используя список задач и асинхронную обработку:

public async Task SendMessages()
{
    try
    {
        await using var client = new ServiceBusClient(ServiceBusConnectionString);
        var taskList = new List<Task>();
        for (int i = 0; i < Store.Length; i++)
        {
            taskList.Add(SendItems(client, Store[i]));
        }

        await Task.WhenAll(taskList);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
    Console.WriteLine("\nAll messages sent.\n");
}

private async Task SendItems(ServiceBusClient client, string store)
{
    // create the sender
    ServiceBusSender tc = client.CreateSender(TopicName);

    for (int i = 0; i < NrOfMessagesPerStore; i++)
    {
        Random r = new Random();
        Item item = new Item(r.Next(5), r.Next(5), r.Next(5));

        // Note the extension class which is serializing an deserializing messages
        ServiceBusMessage message = item.AsMessage();
        message.To = store;
        message.ApplicationProperties.Add("StoreId", store);
        message.ApplicationProperties.Add("Price", item.GetPrice().ToString());
        message.ApplicationProperties.Add("Color", item.GetColor());
        message.ApplicationProperties.Add("Category", item.GetItemCategory());

        await tc.SendMessageAsync(message);
        Console.WriteLine($"Sent item to Store {store}. Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}"); ;
    }
}

Получение сообщений

Сообщения принимаются обратно через список задач. Этот код использует стратегию пакетной обработки. Пакетную обработку можно применять как для отправки, так и для получения. Но в нашем примере реализовано только пакетное получение. Для реальных систем этот цикл обычно не прерывается, а повторяется неограниченно долго и с более длительным периодом ожидания, например в одну минуту. На протяжении этого периода поддерживается подключение к брокеру. Когда появятся новые сообщения, программа немедленно вернет их и создаст новый вызов для получения данных. Такой подход называется методом продолжительного опроса. Еще чаще применяется средство переноса полученных данных, которое реализовано в этом кратком руководстве и еще нескольких примерах в репозитории.

public async Task Receive()
{
    var taskList = new List<Task>();
    for (var i = 0; i < Subscriptions.Length; i++)
    {
        taskList.Add(this.ReceiveMessages(Subscriptions[i]));
    }

    await Task.WhenAll(taskList);
}

private async Task ReceiveMessages(string subscription)
{
    await using var client = new ServiceBusClient(ServiceBusConnectionString);
    ServiceBusReceiver receiver = client.CreateReceiver(TopicName, subscription);

    // In reality you would not break out of the loop like in this example but would keep looping. The receiver keeps the connection open
    // to the broker for the specified amount of seconds and the broker returns messages as soon as they arrive. The client then initiates
    // a new connection. So in reality you would not want to break out of the loop. 
    // Also note that the code shows how to batch receive, which you would do for performance reasons. For convenience you can also always
    // use the regular receive pump which we show in our Quick Start and in other GitHub samples.
    while (true)
    {
        try
        {
            //IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
            // Note the extension class which is serializing an deserializing messages and testing messages is null or 0.
            // If you think you did not receive all messages, just press M and receive again via the menu.
            IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);

            if (messages.Any())
            {
                foreach (ServiceBusReceivedMessage message in messages)
                {
                    lock (Console.Out)
                    {
                        Item item = message.As<Item>();
                        IReadOnlyDictionary<string, object> myApplicationProperties = message.ApplicationProperties;
                        Console.WriteLine($"StoreId={myApplicationProperties["StoreId"]}");
                        if (message.Subject != null)
                        {
                            Console.WriteLine($"Subject={message.Subject}");
                        }
                        Console.WriteLine(
                            $"Item data: Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}");
                    }

                    await receiver.CompleteMessageAsync(message);
                }
            }
            else
            {
                break;
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }
}

Примечание.

Вы можете управлять ресурсами служебной шины с помощью обозревателя служебной шины. Обозреватель служебной шины позволяет без труда подключаться к пространству имен служебной шины и управлять сущностями обмена сообщениями. Средство предоставляет дополнительные возможности, например функции импорта и экспорта или возможность проверять разделы, очереди, подписки, службы ретрансляции, центры уведомлений и концентраторы событий.

Следующие шаги

При работе с этим руководством вы подготовили ресурсы с помощью портала Azure, а затем отправили и получили сообщения через раздел Служебной шины и подписки на него. Вы научились выполнять следующие задачи:

  • создание раздела служебной шины и одной или нескольких подписок на этот раздел с помощью портала Azure;
  • добавление фильтров раздела в коде .NET;
  • создание двух сообщений с разным содержимым;
  • отправка сообщений и проверка их поступления в ожидаемые подписки;
  • получение сообщений из подписок.

Чтобы изучить дополнительные примеры отправки и получения сообщений, перейдите к описанию примеров для служебной шины на GitHub.

Следующее руководство содержит дополнительные сведения об использовании возможностей публикации и подписки в cлужебной шине Azure.