Postupy: Hromadný export a úprava registrací
Existují scénáře, ve kterých je potřeba vytvořit nebo upravit velký počet registrací v centru oznámení. Některé z těchto scénářů jsou aktualizace značek po dávkových výpočtech nebo migraci existující nabízené implementace pro použití Notification Hubs.
Toto téma vysvětluje, jak používat hromadnou podporu notification Hubs k provádění velkého počtu operací v centru oznámení nebo k exportu všech registrací.
High-Level Flow
Podpora služby Batch je navržená tak, aby podporovala dlouhotrvající úlohy zahrnující miliony registrací. K dosažení tohoto škálování využívá podpora dávkování Azure Storage k ukládání podrobností a výstupu úlohy. Pro operace hromadné aktualizace je uživatel nutný k vytvoření souboru v kontejneru objektů blob, jehož obsah je seznam operací aktualizace registrace. Při spuštění úlohy uživatel poskytne adresu URL vstupnímu objektu blob spolu s adresou URL výstupního adresáře (také v kontejneru objektů blob). Po spuštění úlohy může uživatel zkontrolovat stav dotazováním umístění adresy URL zadaného při spuštění úlohy. Všimněte si, že konkrétní úloha může provádět pouze operace určitého druhu (vytváří, aktualizuje nebo odstraní). Operace exportu se provádějí analogicky.
Import
Nastavení
V této části se předpokládá, že máte následující:
Zřízené centrum oznámení.
Kontejner objektů blob Azure Storage.
Odkazy na balíčky Azure Storage a Azure Service Bus NuGet.
Vytvoření vstupního souboru a jeho uložení do objektu blob
Vstupní soubor obsahuje seznam registrací serializovaných v JAZYCE XML, jeden na řádek. Následující příklad kódu pomocí sady Azure SDK ukazuje, jak serializovat registrace a nahrát je do kontejneru objektů blob.
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
Důležité
Předchozí kód serializuje registrace v paměti a potom nahraje celý datový proud do objektu blob. Pokud jste nahráli soubor s více než několika megabajtemi, projděte si pokyny k objektům blob v Azure, jak tyto kroky provést; Například objekty blob bloku.
Vytvoření tokenů adresy URL
Po nahrání vstupního souboru je nutné vygenerovat adresy URL, které vám poskytnou centrum oznámení pro vstupní soubor i výstupní adresář. Všimněte si, že pro vstup a výstup můžete použít dva různé kontejnery objektů blob.
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Read
};
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + "/" + filePath + sasToken);
}
Odeslání úlohy
Se dvěma vstupními a výstupními adresami URL teď můžeme spustit dávkovou úlohu.
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
Kromě vstupních a výstupních adres URL tento příklad vytvoří NotificationHubJob
objekt, který obsahuje JobType
objekt, který může být jedním z následujících:
ImportCreateRegistrations
ImportUpdateRegistrations
ImportDeleteRegistrations
Po dokončení hovoru bude úloha pokračovat centrem oznámení a můžete zkontrolovat její stav voláním GetNotificationHubJobAsync.
Při dokončení úlohy můžete výsledky zkontrolovat tak, že se podíváte na následující soubory ve výstupním adresáři:
/<hub>/<jobid>/Failed.txt
/<hub>/<jobid>/Output.txt
Tyto soubory obsahují seznam úspěšných a neúspěšných operací z vaší dávky. Formát souboru je .cvs, ve kterém každý řádek má číslo řádku původního vstupního souboru a výstup operace (obvykle vytvořený nebo aktualizovaný popis registrace).
Export
Export registrace se podobá importu s následujícími rozdíly:
Potřebujete jenom výstupní adresu URL.
Musíte vytvořit NotificationHubJob typu ExportRegistrations.
Úplný ukázkový kód
Následuje úplná funkční ukázka, která importuje registrace do centra oznámení.
using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
private static string CONNECTION_STRING = "---";
private static string HUB_NAME = "---";
private static string INPUT_FILE_NAME = "CreateFile.txt";
private static string STORAGE_ACCOUNT = "---";
private static string STORAGE_PASSWORD = "---";
private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));
static void Main(string[] args)
{
var descriptions = new[]
{
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
};
//write to blob store to create an input file
var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
var container = blobClient.GetContainerReference("testjobs");
container.CreateIfNotExists();
SerializeToBlob(container, descriptions);
// TODO then create Sas
var outputContainerSasUri = GetOutputDirectoryUrl(container);
var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);
//Lets import this file
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
}
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + "/" + filePath + sasToken);
}
static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
{
return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
}
}
}