Bagikan melalui


Cara: Mengekspor dan Memodifikasi Pendaftaran secara Massal

Ada skenario saat harus membuat atau mengubah sejumlah besar pendaftaran di hub notifikasi. Beberapa skenario ini adalah pembaruan tag setelah komputasi batch, atau memigrasikan implementasi push yang ada untuk menggunakan Notification Hubs.

Topik ini menjelaskan cara menggunakan dukungan massal Notification Hubs untuk melakukan sejumlah besar operasi di hub pemberitahuan, atau untuk mengekspor semua pendaftaran.

Alur High-Level

Dukungan batch dirancang untuk mendukung pekerjaan jangka panjang yang melibatkan jutaan pendaftaran. Untuk mencapai skala ini, dukungan batching menggunakan Azure Storage untuk menyimpan detail dan output pekerjaan. Untuk operasi pembaruan massal, pengguna harus membuat file dalam kontainer blob, yang isinya adalah daftar operasi pembaruan pendaftaran. Saat memulai pekerjaan, pengguna memberikan URL ke blob input, bersama dengan URL ke direktori output (juga dalam kontainer blob). Setelah pekerjaan dimulai, pengguna dapat memeriksa status dengan mengkueri lokasi URL yang disediakan pada awal pekerjaan. Perhatikan bahwa pekerjaan tertentu hanya dapat melakukan operasi dengan jenis tertentu (membuat, memperbarui, atau menghapus). Operasi ekspor dilakukan secara analog.

Impor

Siapkan

Bagian ini mengasumsikan Anda memiliki hal berikut:

  1. Hub notifikasi yang diprovisi.

  2. Kontainer blob Azure Storage.

  3. Referensi ke paket NuGet Azure Storage dan Azure Service Bus.

Membuat file input dan menyimpannya dalam blob

File input berisi daftar pendaftaran yang dimuat berseri dalam XML, satu per baris. Menggunakan Azure SDK, contoh kode berikut menunjukkan cara membuat serial pendaftaran dan mengunggahnya ke kontainer blob.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Penting

Kode sebelumnya membuat serialisasi pendaftaran dalam memori lalu mengunggah seluruh aliran ke dalam blob. Jika Anda telah mengunggah file lebih dari sekadar beberapa megabyte, silakan lihat panduan blob Azure tentang cara melakukan langkah-langkah ini; misalnya, blob blok.

Membuat token URL

Setelah file input Anda diunggah, Anda harus membuat URL untuk disediakan ke hub pemberitahuan Anda untuk file input dan direktori output. Perhatikan bahwa Anda dapat menggunakan dua kontainer blob yang berbeda untuk input dan output.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

Mengirimkan pekerjaan

Dengan dua URL input dan output, kita sekarang dapat memulai pekerjaan batch.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Selain URL input dan output, contoh ini membuat NotificationHubJob objek yang berisi JobType objek , yang bisa menjadi salah satu dari berikut ini:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Begitu panggilan selesai, pekerjaan dilanjutkan oleh hub notifikasi, dan Anda dapat memeriksa statusnya dengan panggilan ke GetNotificationHubJobAsync.

Saat pekerjaan selesai, Anda bisa memeriksa hasilnya dengan melihat file berikut di direktori output:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

File ini berisi daftar operasi yang berhasil dan gagal dari batch Anda. Format file adalah .cvs, di mana setiap baris memiliki nomor baris file input asli, dan output operasi (biasanya deskripsi pendaftaran yang dibuat atau diperbarui).

Ekspor

Cara mengekspor pendaftaran sama dengan mengimpor, dengan perbedaan berikut:

  1. Anda hanya perlu URL output.

  2. Anda harus membuat NotificationHubJob jenis ExportRegistrations.

Kode Sampel Lengkap

Berikut ini adalah sampel kerja lengkap yang mengimpor pendaftaran ke hub pemberitahuan.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}