Como: Exportar e Modificar Registos a granel

Existem cenários em que é necessário criar ou modificar um grande número de registos num centro de notificação. Alguns destes cenários são atualizações de etiquetas após cálculos de lotes, ou migram uma implementação de push existente para usar Os Centros de Notificação.

Este tópico explica como utilizar o suporte a granel do Notification Hubs para realizar um grande número de operações num centro de notificação ou para exportar todos os registos.

High-Level Flow

O apoio ao lote destina-se a apoiar empregos de longa duração que envolvam milhões de registos. Para alcançar esta escala, o suporte ao lote utiliza o Azure Armazenamento para armazenar detalhes e saídas de trabalho. Para operações de atualização a granel, o utilizador é obrigado a criar um ficheiro num recipiente blob, cujo conteúdo é a lista de operações de atualização de registo. Ao iniciar o trabalho, o utilizador fornece um URL à bolha de entrada, juntamente com um URL para um diretório de saída (também num recipiente de bolhas). Após o início do trabalho, o utilizador pode verificar o estado consultando uma localização URL fornecida no início do trabalho. Note que um trabalho específico só pode realizar operações de tipo específico (cria, atualizações ou eliminações). As operações de exportação são efetuadas análogamente.

Importar

Configuração

Esta secção pressupõe que tem o seguinte:

  1. Um centro de notificação provisionado.

  2. Um recipiente de bolhas Azure Armazenamento.

  3. Referências aos pacotes Azure Armazenamento e Azure Service Bus NuGet.

Crie um ficheiro de entrada e guarde-o numa bolha

Um ficheiro de entrada contém uma lista de registos serializados em XML, um por linha. Utilizando o Azure SDK, o seguinte exemplo de código mostra como serializar os registos e carregá-los para o recipiente blob.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Importante

O código anterior serializa os registos na memória e, em seguida, envia todo o fluxo para uma bolha. Se tiver carregado um ficheiro de mais do que apenas alguns megabytes, consulte a orientação da bolha de Azure sobre como executar estes passos; por exemplo, blocos de bolhas.

Criar fichas URL

Uma vez que o seu ficheiro de entrada é carregado, tem de gerar os URLs para fornecer ao seu centro de notificação tanto para o ficheiro de entrada como para o diretório de saída. Note que pode utilizar dois recipientes de bolhas diferentes para entrada e saída.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

Submeter o trabalho

Com os dois URLs de entrada e saída, podemos agora iniciar o trabalho de lote.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Além dos URLs de entrada e saída, este exemplo cria um NotificationHubJob objeto que contém um JobType objeto, que pode ser um dos seguintes:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Uma vez concluída a chamada, o trabalho é continuado pelo centro de notificação, e pode verificar o seu estado com a chamada para GetNotificationHubJobAsync.

No final do trabalho, pode verificar os resultados analisando os seguintes ficheiros no seu diretório de saída:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Estes ficheiros contêm a lista de operações bem sucedidas e falhadas do seu lote. O formato de ficheiro é .cvs, no qual cada linha tem o número de linha do ficheiro de entrada original, e a saída da operação (normalmente a descrição de registo criada ou atualizada).

Exportar

O registo de exportação é semelhante ao importador, com as seguintes diferenças:

  1. Só precisa do URL de saída.

  2. Tem de criar um NotificationHubJob do tipo ExportRegistrations.

Código de amostra completo

Segue-se uma amostra de funcionamento completa que importa os registos num centro de notificação.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}