Udostępnij za pośrednictwem


Instrukcje: eksportowanie i modyfikowanie rejestracji zbiorczo

Istnieją scenariusze, w których wymagane jest utworzenie lub zmodyfikowanie dużej liczby rejestracji w centrum powiadomień. Niektóre z tych scenariuszy to aktualizacje tagów po obliczeniach wsadowych lub migrowanie istniejącej implementacji wypychania do korzystania z usługi Notification Hubs.

W tym temacie wyjaśniono, jak używać obsługi zbiorczej usługi Notification Hubs do wykonywania dużej liczby operacji w centrum powiadomień lub eksportowania wszystkich rejestracji.

High-Level Flow

Obsługa usługi Batch jest przeznaczona do obsługi długotrwałych zadań obejmujących miliony rejestracji. Aby osiągnąć tę skalę, obsługa wsadowa używa usługi Azure Storage do przechowywania szczegółów i danych wyjściowych zadania. W przypadku operacji aktualizacji zbiorczej użytkownik musi utworzyć plik w kontenerze obiektów blob, którego zawartość jest listą operacji aktualizacji rejestracji. Podczas uruchamiania zadania użytkownik udostępnia adres URL wejściowego obiektu blob wraz z adresem URL katalogu wyjściowego (również w kontenerze obiektów blob). Po rozpoczęciu zadania użytkownik może sprawdzić stan, wysyłając zapytanie o lokalizację adresu URL podaną podczas uruchamiania zadania. Należy pamiętać, że określone zadanie może wykonywać tylko operacje określonego rodzaju (tworzy, aktualizuje lub usuwa). Operacje eksportowania są wykonywane analogicznie.

Importuj

Konfigurowanie

W tej sekcji założono, że masz następujące założenia:

  1. Aprowizowane centrum powiadomień.

  2. Kontener obiektów blob platformy Azure Storage.

  3. Odwołania do pakietów usługi Azure Storage i Azure Service Bus NuGet.

Tworzenie pliku wejściowego i przechowywanie go w obiekcie blob

Plik wejściowy zawiera listę rejestracji serializacji w formacie XML, jeden na wiersz. Przy użyciu zestawu Azure SDK poniższy przykład kodu pokazuje, jak serializować rejestracje i przekazywać je do kontenera obiektów blob.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Ważne

Powyższy kod serializuje rejestracje w pamięci, a następnie przekazuje cały strumień do obiektu blob. Jeśli przekazano plik zawierający więcej niż kilka megabajtów, zapoznaj się ze wskazówkami dotyczącymi wykonywania tych kroków na platformie Azure. na przykład blokowe obiekty blob.

Tworzenie tokenów adresu URL

Po przekazaniu pliku wejściowego należy wygenerować adresy URL w celu udostępnienia centrum powiadomień zarówno dla pliku wejściowego, jak i katalogu wyjściowego. Należy pamiętać, że dla danych wejściowych i wyjściowych można użyć dwóch różnych kontenerów obiektów blob.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

Przesyłanie zadania

W przypadku dwóch adresów URL danych wejściowych i wyjściowych możemy teraz uruchomić zadanie wsadowe.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Oprócz adresów URL wejściowych i wyjściowych ten przykład tworzy NotificationHubJob obiekt zawierający JobType obiekt, który może być jednym z następujących elementów:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Po zakończeniu wywołania zadanie jest kontynuowane przez centrum powiadomień i możesz sprawdzić jego stan za pomocą wywołania getNotificationHubJobAsync.

Po zakończeniu zadania możesz sprawdzić wyniki, przeglądając następujące pliki w katalogu wyjściowym:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Te pliki zawierają listę pomyślnych i zakończonych niepowodzeniem operacji z partii. Format pliku to .cvs, w którym każdy wiersz ma numer wiersza oryginalnego pliku wejściowego i dane wyjściowe operacji (zazwyczaj utworzony lub zaktualizowany opis rejestracji).

Eksportowanie

Eksportowanie rejestracji jest podobne do importu z następującymi różnicami:

  1. Potrzebny jest tylko adres URL danych wyjściowych.

  2. Musisz utworzyć obiekt NotificationHubJob typu ExportRegistrations.

Pełny przykładowy kod

Poniżej przedstawiono pełny przykład roboczy, który importuje rejestracje do centrum powiadomień.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}