Практическое руководство. Экспорт и изменение регистраций в пакетном режиме
Существуют сценарии, в которых требуется создание или изменение большого числа регистраций в концентраторе уведомлений. Некоторые сценарии — это обновления тегов после пакетных вычислений или перенос извещающей реализации для использования концентраторов уведомлений.
В этом разделе рассказывается о том, как использовать пакетную поддержку концентраторов уведомлений для выполнения большого числа операций на концентраторе уведомлений или для экспорта всех регистраций.
Поток высокого уровня
Пакетная поддержка обеспечивает долговременные задания, связанные с миллионами регистраций. Чтобы достичь этого масштаба, поддержка пакетной обработки использует служба хранилища Azure для хранения сведений о задании и выходных данных. Для операций пакетного обновления пользователю требуется создать файл в контейнере больших двоичных объектов, содержимое которого является списком регистрации операций обновления. При запуске задания пользователь предоставляет URL-адрес для входного большого двоичного объекта вместе с URL-адресом для выходного каталога (а также для контейнера больших двоичных объектов). После запуска задания пользователь может проверить состояние, создав запрос о расположении URL-адреса, предоставленного в начале задания. Обратите внимание, что каждое задание может выполнять только операции определенного типа (создание, обновление или удаление). Операции экспорта выполняются аналогично.
Импорт
Установка
В этом разделе предполагается наличие следующих компонентов:
Провизионированный концентратор уведомлений.
Контейнер больших двоичных объектов хранилища Azur.
Ссылки на пакеты служба хранилища Azure и Служебная шина Azure NuGet.
Создание входного файла и сохранение его в контейнере больших двоичных объектов
Входной файл содержит список регистраций, сериализованных в XML-коде, по одной на строку. При использовании Azure SDK, следующий пример кода показывает, как проводится сериализация регистраций и передача их в контейнер больших двоичных объектов.
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
Важно!
Предыдущий код сериализует регистрации в памяти и затем передает весь поток в контейнер больших двоичных объектов. Если вы загрузили файл с несколькими мегабайтами, ознакомьтесь с рекомендациями по выполнению этих действий в большом двоичном объекте Azure. например, блочные BLOB-объекты.
Создание токенов URL-адресов
После передачи входного файла необходимо создать URL-адреса для входного файла и выходного каталога, чтобы предоставить их концентратору уведомления. Обратите внимание, что для входа и выхода можно использовать два разных контейнера больших двоичных объектов.
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Read
};
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + "/" + filePath + sasToken);
}
отправить задание.
При наличии URL-адресов входа и выхода можно запустить пакетное задание.
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
В дополнение к URL-адресам входа и выхода в этом примере создается NotificationHubJob
объект, которые содержитJobType
один из следующих объектов:
ImportCreateRegistrations
ImportUpdateRegistrations
ImportDeleteRegistrations
После завершения вызова концентратор уведомлений продолжает выполнение задания, состояние которого можно проверить с помощью вызова в GetNotificationHubJobAsync.
По завершении выполнения задания можно проверить результаты, просмотрев следующие файлы в выходном каталоге:
/<hub>/<jobid>/Failed.txt
/<hub>/<jobid>/Output.txt
Эти файлы содержат список успешных и неудачных операций из пакета. Файл создается в формате CVS, в котором в каждой строке имеется номер исходного входного файла и выхода операции (обычно созданное или обновленное описание регистрации).
Экспорт
Регистрация экспорта похожа на регистрацию импорта, но имеет следующие отличия:
Необходим только выходной URL-адрес.
Необходимо создать notificationHubJob типа ExportRegistrations.
Полный образец кода
Ниже приведен полный рабочий образец, который импортирует регистрации в концентратор уведомлений.
using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
private static string CONNECTION_STRING = "---";
private static string HUB_NAME = "---";
private static string INPUT_FILE_NAME = "CreateFile.txt";
private static string STORAGE_ACCOUNT = "---";
private static string STORAGE_PASSWORD = "---";
private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));
static void Main(string[] args)
{
var descriptions = new[]
{
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
};
//write to blob store to create an input file
var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
var container = blobClient.GetContainerReference("testjobs");
container.CreateIfNotExists();
SerializeToBlob(container, descriptions);
// TODO then create Sas
var outputContainerSasUri = GetOutputDirectoryUrl(container);
var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);
//Lets import this file
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
}
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + "/" + filePath + sasToken);
}
static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
{
return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
}
}
}