Cómo: Exportar y modificar registros de forma masiva
Existen escenarios en los que es necesario crear o modificar gran cantidad de registros en un centro de notificaciones. Algunos de estos escenarios son actualizaciones de etiqueta posteriores a cálculos por lotes, o la migración de una implementación de inserción existente para usar Notification Hubs.
En este tema se explica cómo usar la compatibilidad del centro de notificaciones con las acciones en bloque para realizar una gran cantidad de acciones en un centro de notificaciones o cómo exportar todos los registros.
Flujo de nivel alto
El soporte técnico por lotes está diseñado para respaldar los trabajos de ejecución prolongada que implican millones de registros. Para lograr esta escala, la compatibilidad con el procesamiento por lotes usa Azure Storage para almacenar los detalles y la salida del trabajo. Para las operaciones de actualización masiva, el usuario debe crear un archivo en un contenedor de blobs, cuyo contenido sea la lista de las operaciones de actualización de registro. Al iniciar el trabajo, el usuario proporciona una dirección URL para el blob de entrada, junto con una dirección URL a un directorio de salida (también en un contenedor de blobs). Después de iniciada la tarea, el usuario puede comprobar el estado mediante una consulta con una ubicación de URL proporcionada al inicio de la tarea. Tenga en cuenta que una tarea específica solo puede realizar acciones de un tipo específico (crear, actualizar o eliminar). Las operaciones de exportación se realizan de forma análoga.
Importar
Configurar
En esta sección se da por supuesto que tiene lo siguiente:
Un centro de notificaciones aprovisionado.
Un contenedor de blobs de Azure Storage.
Referencias a los paquetes de Azure Storage y Azure Service Bus NuGet.
Crear el archivo de entrada y almacenarlo en un blob
Un archivo de entrada contiene una lista de registros que se serializan en XML, uno por fila. Con el SDK de Azure, el ejemplo de código siguiente muestra cómo serializar los registros y cargarlos en el contenedor de blobs.
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
Importante
El código anterior serializa los registros en la memoria y, a continuación, carga toda la secuencia en un blob. Si ha cargado un archivo de más de unos pocos megabytes, consulte la guía del blob de Azure sobre cómo realizar estos pasos; por ejemplo, blobs en bloques.
Crear tokens de URL
Una vez cargado el archivo de entrada, es necesario que genere las URL a fin de proporcionarlas al centro de notificaciones para el archivo de entrada y el directorio de salida. Tenga en cuenta que puede usar dos contenedores de blobs diferentes para la entrada y la salida.
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Read
};
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + "/" + filePath + sasToken);
}
Enviar el archivo
Ahora que tenemos las dos URL de entrada y salida, podemos empezar la tarea por lotes.
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
Además de las URL de entrada y salida, este ejemplo permite crear un objeto NotificationHubJob
que contiene un objeto JobType
, que puede ser uno de los siguientes:
ImportCreateRegistrations
ImportUpdateRegistrations
ImportDeleteRegistrations
Una vez finalizada la llamada, el centro de notificaciones continúa con el trabajo, y puede comprobar su estado con una llamada a GetNotificationHubJobAsync.
Al completar el trabajo, puede inspeccionar el trabajo. Para ello, consulte los archivos siguientes en el directorio de salida:
/<hub>/<jobid>/Failed.txt
/<hub>/<jobid>/Output.txt
Estos archivos contienen la lista de operaciones correctas y erróneas del lote. El formato de archivo es .cvs. Cada fila tiene el número de línea del archivo de entrada original y el resultado de la operación (normalmente, la descripción del registro creado o actualizado).
Exportación
La exportación del registro es similar a la importación, con las diferencias siguientes:
Solo necesita la dirección URL de salida.
Debe crear un NotificationHubJob de tipo ExportRegistrations.
Código de ejemplo completo
A continuación tiene un ejemplo de trabajo que permite importar registros a un centro de notificaciones.
using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
private static string CONNECTION_STRING = "---";
private static string HUB_NAME = "---";
private static string INPUT_FILE_NAME = "CreateFile.txt";
private static string STORAGE_ACCOUNT = "---";
private static string STORAGE_PASSWORD = "---";
private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));
static void Main(string[] args)
{
var descriptions = new[]
{
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
};
//write to blob store to create an input file
var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
var container = blobClient.GetContainerReference("testjobs");
container.CreateIfNotExists();
SerializeToBlob(container, descriptions);
// TODO then create Sas
var outputContainerSasUri = GetOutputDirectoryUrl(container);
var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);
//Lets import this file
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
}
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + "/" + filePath + sasToken);
}
static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
{
return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
}
}
}