如何:大量匯出和修改註冊
在某些情況下,您必須在通知中心裡建立或修改大量的註冊。 例如,批次計算之後的標記更新,或是移轉現有推播實作以使用通知中心的作業,都屬於此情況。
本主題將說明如何使用通知中心大量支援在通知中心上執行大量作業,或匯出所有註冊。
高階流程
批次支援的用途,是要支援牽涉到數百萬項註冊的長時間執行工作。 為了達到此規模,批次處理支援會使用Azure 儲存體來儲存作業詳細資料和輸出。 進行大量更新作業時,使用者必須在 Blob 容器中建立一個內容為註冊更新作業清單的檔案。 工作開始時,使用者會先提供輸入 Blob 的 URL,以及輸出目錄的 URL (也是在 Blob 容器中)。 在工作開始後,使用者可查詢在工作開始時提供的 URL 位置,以查看狀態。 請注意,特定工作只能執行特定類型的作業 (建立、更新或刪除)。 匯出作業會以類似的方式執行。
匯入
安裝程式
本節假設您已具備:
已佈建的通知中心。
Azure 儲存體 Blob 容器。
Azure 儲存體和Azure 服務匯流排NuGet套件的參考。
建立輸入檔案並將其儲存在 Blob 中
輸入檔案包含以 XML 序列化的註冊清單,每列一個。 下列程式碼範例會使用 Azure SDK,說明如何序列化註冊並將其上傳至 Blob 容器。
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
重要
前述程式碼會序列化記憶體中的註冊,並將整個資料流上傳至 Blob 中。 如果您上傳的檔案不只是數 MB,請參閱 Azure Blob 指引,以瞭解如何執行這些步驟;例如, 區塊 Blob。
建立 URL 權杖
在輸入檔案上傳後,您必須產生 URL,以提供給通知中心的輸入檔案和輸出目錄。 請注意,您的輸入和輸出可以使用兩種不同的 Blob 容器。
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
Permissions = SharedAccessBlobPermissions.Read
};
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
return new Uri(container.Uri + "/" + filePath + sasToken);
}
提交工作
具有輸入和輸出 URL 後,您即可開始進行批次工作。
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
除了輸入和輸出 URL 以外,此範例也會建立包含 NotificationHubJob
物件的 JobType
物件,這可能是下列其中之一:
ImportCreateRegistrations
ImportUpdateRegistrations
ImportDeleteRegistrations
呼叫完成後,通知中樞會繼續作業,您可以使用 GetNotificationHubJobAsync的呼叫來檢查其狀態。
工作完成時,您可以檢視輸出目錄中的下列檔案,以檢查結果:
/ < hub > / < jobid > /Failed.txt
/ < hub > / < jobid > /Output.txt
這些檔案會列出批次作業中的成功和失敗作業。 其檔案格式為 .cvs;在此格式中,每一列都會有原始輸入檔案的行號,和作業的輸出 (通常是已建立或已更新的註冊說明)。
匯出
註冊的匯出與匯入類似,但有以下差異:
您只需要輸出 URL。
您必須建立ExportRegistrations類型的NotificationHubJob。
完整範例程式碼
以下是將註冊匯入通知中心內的完整範例。
using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
private static string CONNECTION_STRING = "---";
private static string HUB_NAME = "---";
private static string INPUT_FILE_NAME = "CreateFile.txt";
private static string STORAGE_ACCOUNT = "---";
private static string STORAGE_PASSWORD = "---";
private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));
static void Main(string[] args)
{
var descriptions = new[]
{
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
};
//write to blob store to create an input file
var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
var container = blobClient.GetContainerReference("testjobs");
container.CreateIfNotExists();
SerializeToBlob(container, descriptions);
// TODO then create Sas
var outputContainerSasUri = GetOutputDirectoryUrl(container);
var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);
//Lets import this file
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
createTask.Wait();
var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
getJobTask.Wait();
job = getJobTask.Result;
Thread.Sleep(1000);
i--;
}
}
private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(RegistrationDescription.Serialize());
}
var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
inputBlob.UploadFromStream(stream);
}
}
static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + sasContainerToken);
}
static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
//Set the expiry time and permissions for the container.
//In this case no start time is specified, so the shared access signature becomes valid immediately.
SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
{
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
Permissions = SharedAccessBlobPermissions.Read
};
//Generate the shared access signature on the container, setting the constraints directly on the signature.
string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
//Return the URI string for the container, including the SAS token.
return new Uri(container.Uri + "/" + filePath + sasToken);
}
static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
{
return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
}
}
}