Практическое руководство. Экспорт и изменение регистраций в пакетном режиме

Существуют сценарии, в которых требуется создание или изменение большого числа регистраций в концентраторе уведомлений. Некоторые сценарии — это обновления тегов после пакетных вычислений или перенос извещающей реализации для использования концентраторов уведомлений.

В этом разделе рассказывается о том, как использовать пакетную поддержку концентраторов уведомлений для выполнения большого числа операций на концентраторе уведомлений или для экспорта всех регистраций.

Поток высокого уровня

Пакетная поддержка обеспечивает долговременные задания, связанные с миллионами регистраций. Чтобы достичь этого масштаба, поддержка пакетной обработки использует служба хранилища Azure для хранения сведений о задании и выходных данных. Для операций пакетного обновления пользователю требуется создать файл в контейнере больших двоичных объектов, содержимое которого является списком регистрации операций обновления. При запуске задания пользователь предоставляет URL-адрес для входного большого двоичного объекта вместе с URL-адресом для выходного каталога (а также для контейнера больших двоичных объектов). После запуска задания пользователь может проверить состояние, создав запрос о расположении URL-адреса, предоставленного в начале задания. Обратите внимание, что каждое задание может выполнять только операции определенного типа (создание, обновление или удаление). Операции экспорта выполняются аналогично.

Импорт

Установка

В этом разделе предполагается наличие следующих компонентов:

  1. Провизионированный концентратор уведомлений.

  2. Контейнер больших двоичных объектов хранилища Azur.

  3. Ссылки на пакеты служба хранилища Azure и Служебная шина Azure NuGet.

Создание входного файла и сохранение его в контейнере больших двоичных объектов

Входной файл содержит список регистраций, сериализованных в XML-коде, по одной на строку. При использовании Azure SDK, следующий пример кода показывает, как проводится сериализация регистраций и передача их в контейнер больших двоичных объектов.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Важно!

Предыдущий код сериализует регистрации в памяти и затем передает весь поток в контейнер больших двоичных объектов. Если вы загрузили файл с несколькими мегабайтами, ознакомьтесь с рекомендациями по выполнению этих действий в большом двоичном объекте Azure. например, блочные BLOB-объекты.

Создание токенов URL-адресов

После передачи входного файла необходимо создать URL-адреса для входного файла и выходного каталога, чтобы предоставить их концентратору уведомления. Обратите внимание, что для входа и выхода можно использовать два разных контейнера больших двоичных объектов.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

отправить задание.

При наличии URL-адресов входа и выхода можно запустить пакетное задание.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

В дополнение к URL-адресам входа и выхода в этом примере создается NotificationHubJobобъект, которые содержитJobType один из следующих объектов:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

После завершения вызова концентратор уведомлений продолжает выполнение задания, состояние которого можно проверить с помощью вызова в GetNotificationHubJobAsync.

По завершении выполнения задания можно проверить результаты, просмотрев следующие файлы в выходном каталоге:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Эти файлы содержат список успешных и неудачных операций из пакета. Файл создается в формате CVS, в котором в каждой строке имеется номер исходного входного файла и выхода операции (обычно созданное или обновленное описание регистрации).

Экспорт

Регистрация экспорта похожа на регистрацию импорта, но имеет следующие отличия:

  1. Необходим только выходной URL-адрес.

  2. Необходимо создать notificationHubJob типа ExportRegistrations.

Полный образец кода

Ниже приведен полный рабочий образец, который импортирует регистрации в концентратор уведомлений.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}