通过构建Storage Explorer 应用学习Windows Azure Storage APIs
通过构建Storage Explorer 应用学习Windows Azure Storage APIs
Windows Azure 平台提供不同的存储服务来存储瞬态和持久态的数据:
- 无结构二进制和文本数据Unstructured binary and text data
- 二进制和文本消息Binary and text messages
- 结构化数据Structured data
为了支持这些类型的数据, Windows Azure 提供三种不同的存储服务: Blobs,Queues,和 Tables。
存储类型 |
URL schema |
最大大小 |
推荐 用途 |
https://[StorageAccount].blob.core.windows.net/[ContainerName]/[BlobName] |
Block Blob:最大200GBPage Blob:最大1TB |
存储大尺寸二进制和文本文件 |
https://[StorageAccount].queue.core.windows.net/[QueueName] |
8 KB |
在本地和云端进行可靠的,持久态的消息通信。小尺寸二进制和文本消息 |
https://[StorageAccount].table.core.windows.net/[TableName]?$filter=[Query] |
数据表可以无限大。实体最大1 MB. |
由多个属性组成的可查询的结构化实体 |
本地和云端各种大型客户端应用都可以通过RESTful 接口无缝的访问存储服务。可以使用微软,第三方或者开发技术来实现使用存储服务的应用:
- .NET Framework: 本地或 云端.NET 应用通过Windows Azure SDK中的Windows Azure Storage Client Library来访问存储服务。
- Java: Windows Azure SDK for Java 使得Java程序也能便捷的访问Windows Azure存储服务。该SDK 包含在Windows Azure Tools for Eclipse 项目中。另外, AppFabric SDK for Java 使得Java项目也能使用Windows Azure AppFabric 中的 service bus 和 access control服务。
- PHP: Windows Azure SDK for PHP 使得 PHP 能够访问 Windows Azure 存储服务。
- "About the Storage Services API" on MSDN.
- "Using the Windows Azure Storage Services" on MSDN.
- "Understanding Data Storage Offerings on the Windows Azure Platform" on TechNet.
Blob Service
Blob 服务提供二进制文件和文本文件的存储。通过Blob的 REST API 能够访问2种资源: Containers 和 Blobs。容器可以看做包含多个文件的文件夹,而blob便是属于某个容器的文件。有如下2种blob:
- Block Blobs: 该类型用于流式访问。
- Page Blobs: 该类型用于随机读写操作,能够向blob中写入一部分字节。
容器和blob支持用户定义的元数据,在请求操作的头部填入相应的名值对。使用 REST API,开发者可以创建和文件系统类似的层次性命名。Blob 名可以加入配置路径分隔符。举例,MyGroup/MyBlob1 和 MyGroup/MyBlob2 意味着一个虚拟的组织层级。 Blob的枚举操作支持虚拟层级的遍历。所以,你可以放回某个组下面的所有blob。例如,您可以返回MyGroup下的所有blob。
Block Blobs 可以通过2种方法创建。 不超过64MB 的Block blobs 可以通过调用Put Blob 操作进行上传。大于64 MB的 Block blobs 必须分块上传,每块不超过4MB。当所有的分块成功上传之后,通过调用Put Block List操作进行合并,成为单个连续的blob。Block blob目前最大支持200GB。
Page blobs 可以由调用Put Blob操作来创建和初始化,支持最大尺寸。通过调用Put Page 操作,向page blob写入内容。Page blob 目前最大支持 1 TB。
Blobs 支持条件更新,更多请参见:
- "Understanding Block Blobs and Page Blobs" on MSDN.
- "Blob Service Concepts" on MSDN.
- "Blob Service API" on MSDN.
- "Windows Azure Storage Client Library" on MSDN.
Queue Service
Queue 服务提供本地和云端应用,或同一Windows Azure应用的不同role之间可靠的持久态消息通信。通过REST API 可以访问2种实体: Queues 和 Messages. Queues 支持用户定义的元数据,在请求操作的头部填入相应的名值对。每个存储账户可以拥有无限的消息队列。每个消息队列可以拥有无数的消息。单条消息最大8 KB。当从队列读取消息之后,消耗者应该处理该消息,并随后删除它。消息被读取之后,在特定间隔内,对于其他消耗者是不可见的。当间隔到期,而消息又没有被删除,其他消耗者便能访问该消息。更多请见:
- "Queue Service Concepts" on MSDN.
- "Queue Service API" on MSDN.
- "Windows Azure Storage Client Library" on MSDN.
Table Service
Table 服务提供结构化存储。Table 服务支持与 WCF Data Services REST API 兼容的REST API 。开发者可以使用.NET Client Library for WCF Data Services 来访问 Table 服务。一个存储账号内,开发者可以创建多个表。每个表由唯一的名称和存储账户标示。 Tables 将数据存储为实体。一个实体便是由命名属性和相关的值组成的集合。Tables 被划分在不同存储节点内,以便能够支持负载平衡。每个表的第一个属性: partition key,指明了该实体如何被划分。 第二个属性 : row key,能够确定在某个分区内的实体。Partition key 和row key 结合可以作为实体的主键。任何情况下,都应该把tables当做.NET Dictionary 对象,而不是关系型数据库内的表。实际上,每张table都是相互独立的, Table 服务并不支持多张表的join操作。如果你需要完全的云端关系型数据库,可以试试SQL Azure。另一不同是,table并不像传统关系型数据库那样必须拥有schema。换言之,同一个table内的实体不必暴露相同的属性。更多请见:
- "Table Service Concepts" on MSDN.
- "Table Service API" on MSDN.
- "Windows Azure Storage Client Library" on MSDN.
- "Understanding the Table Service Data Model" on MSDN.
- "Querying Tables and Entities" on MSDN.
- "Client Applications of ADO.NET Data Services" on MSDN.
在冗长而又必须的简介之后,我们试着写一些代码。在调研期间,我发现了几个相当不错的工具来管理Windows Azure 存储服务的数据。其中,Neudesic 编写的 Azure Storage Explorer 最为出色。既然要学习某个技术,最好便是能用其解决一些问题。我打算创建一个Windows Forms 应用来处理 to handle Windows Azure Storage Services: Blobs, Queues 和 Tables。下图是应用的架构:
创建一个 CloudStorageHelper类库,封装和拓展了 Windows Azure Storage Client Library提供的功能。实际上,互联网上有许多sample代码,例如 Windows Azure Code Samples 。
CloudStorageAccount 和 配置
Windows Azure Storage Client Library 包含了 CloudStorageAccount 类,用于检索配置文件中的账户信息和验证凭证。该类的 FromConfigurationSetting 静态方法会用特定的配置设置创建一个新的 CloudStorageAccount 实例。该方法仅当在SetConfigurationSettingPublisher 方法被调用以配置全局的configuration setting publisher 之后才能被调用。以下是我的应用的配置:
<?xml version="1.0"?> <configuration> <configSections> <section name="storageAccounts" type="System.Configuration.DictionarySectionHandler, System, Version=, Culture=neutral, PublicKeyToken=b77a5c561934e089"/> </configSections> <system.diagnostics> <trace autoflush="true" indentsize="4"> <listeners> <clear/> <add name="LogTraceListener" type="Microsoft.AppFabric.CAT.WindowsAzure.Samples.StorageServicesSmartClient.LogTraceListener, StorageServicesSmartClient" initializeData="" /> </listeners> </trace> </system.diagnostics> <storageAccounts> <add key="LocalStorageAccount" value="UseDevelopmentStorage=true"/> <add key="PaoloStorageAccount" value="DefaultEndpointsProtocol=https;AccountName=paolosalvatori;AccountKey=..."/> </storageAccounts> <appSettings> <add key="debug" value="true"/> <add key="retryCount" value="10"/> <add key="retryInterval" value="1"/> <add key="timeout" value="600"/> <add key="pageSize" value="50"/> <add key="parallelOperationThreadCount" value="10"/> </appSettings> <system.net> <connectionManagement> <add address="*" maxconnection="50"/> </connectionManagement> </system.net> <startup> <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/> </startup> </configuration> |
配置文件包含storageAccounts ,该段用于定义存储账户信息。 LocalStorageAccount 项包含开发用存储的连接字符串,开发用存储账户都是一样的,所以只要指明UseDevelopmentStorage=true便可。而第二个元素, PaoloStorageAccount包含了存储账户的连接字符串。 连接字符串如下格式:DefaultEndpointsProtocol=[http|https];AccountName=myAccountName;AccountKey=myAccountKey
- "How to Configure Connection Strings" on MSDN.
应用启动阶段, MainForm 构建函数内创建了一个CloudStorageHelper 实例,之后检索配置文件内的账户信息。
public partial class MainForm : Form { ... public MainForm() { InitializeComponent(); mainForm = this; this.cloudStorageHelper = new CloudStorageHelper(); GetStorageAccountsFromConfiguration(); } ... private void GetStorageAccountsFromConfiguration() { try { if (cloudStorageHelper == null) { return; } Hashtable hashtable = ConfigurationManager.GetSection(StorageAccounts) as Hashtable; if (hashtable == null || hashtable.Count == 0) { WriteToLog(StorageAccountsNotConfigured); } cloudStorageHelper.StorageAccounts = new Dictionary<string, string>(); IDictionaryEnumerator e = hashtable.GetEnumerator(); while (e.MoveNext()) { if (e.Key is string && e.Value is string) { cloudStorageHelper.StorageAccounts.Add(e.Key as string, e.Value as string); } } cboStorageAccount.Items.Add(SelectAStorageAccount); if (cloudStorageHelper.StorageAccounts != null) { cboStorageAccount.Items.AddRange(cloudStorageHelper.StorageAccounts.Keys.ToArray()); } cboStorageAccount.SelectedIndex = 0; } catch (Exception ex) { HandleException(ex); } } } |
选择之后,程序会调用 CreateCloudStorageAccount 方法,依次创建CloudStorageAccount 实例。
public class CloudStorageHelper { ... /// <summary> /// Initializes the CloudStorageAccount object using the information /// contained in the storageAccounts section in the configuration file. /// </summary> public void CreateCloudStorageAccount() { if (storageAccounts == null) { throw new ApplicationException(StorageAccountsDictionaryCannotBeNull); } if (string.IsNullOrEmpty(storageAccount)) { throw new ApplicationException(CurrentStorageAccountCannotBeNull); } if (storageAccounts.ContainsKey(storageAccount)) { string storageAccountNameAndSharedKey = storageAccounts[storageAccount] as string; if (!string.IsNullOrEmpty(storageAccountNameAndSharedKey)) { CloudStorageAccount.SetConfigurationSettingPublisher((configurationSetting, configurationSettingPublisher) => { configurationSettingPublisher(storageAccounts[configurationSetting]); }); cloudStorageAccount = CloudStorageAccount.FromConfigurationSetting(storageAccount); } else { Trace.WriteLineIf(traceEnabled, StorageAccountsNotConfigured); } } } ... } |
当选择了一个存储账户之后,你可以选择窗口下方的按钮,或上方的菜单来选择管理哪类数据 (Blobs, Queues, Tables) 。
当选择 Blob 时,应用会检索账户下的容器列表,并在左侧面板创建TreeView。选择某个容器,右侧面板内的ListView 会显示出该容器内的所有Block (以白色文档图标表示) 和 Page Blobs (以黄色文档图标表示) 。使用Container Commands 内的 New 或者Delete 按钮,或是菜单对应的选项来创建或删除容器。
- 点击 New 按钮,或者是菜单项
- 输入新容器名
- 指定容器访问权限 (Public 或 Private)
如下是 CloudStorageHelper 内列出,创建和删除容器的代码。
public class CloudStorageHelper { ... #region Container Methods /// <summary> /// Lists the available containers on the current storage account. /// </summary> /// <returns>The list of containers on the current storage account.</returns> public List<CloudBlobContainer> ListContainers() { if (cloudBlobClient == null) { if (cloudStorageAccount == null) { CreateCloudStorageAccount(); } if (cloudStorageAccount != null) { cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient(); if (string.Compare(cloudBlobClient.Credentials.AccountName, "devstoreaccount1", true) == 0) { cloudBlobClient.ParallelOperationThreadCount = 1; } else { cloudBlobClient.ParallelOperationThreadCount = parallelOperationThreadCount; } cloudBlobClient.Timeout = TimeSpan.FromSeconds(timeout); cloudBlobClient.RetryPolicy = RetryPolicies.RetryExponential(retryCount, TimeSpan.FromSeconds(retryInterval)); } } if (cloudBlobClient != null) { return new List<CloudBlobContainer>(cloudBlobClient.ListContainers()); } else { return null; } }
/// <summary> /// Creates a container on the current storage account. /// </summary> /// <param name="name">The container name.</param> /// <param name="accessibility">The container accessibility.</param> /// <returns>True if the container is successfully created, false otherwise.</returns> public bool CreateContainer(string name, Accessibility accessibility) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(ContainerNameCannotBeNull); } CloudBlobContainer container = cloudBlobClient.GetContainerReference(name.ToLower()); bool ok = container.CreateIfNotExist(); if (ok) { BlobContainerPermissions blobContainerPermissions = new BlobContainerPermissions(); blobContainerPermissions.PublicAccess = accessibility == Accessibility.Public ? BlobContainerPublicAccessType.Container : BlobContainerPublicAccessType.Off; container.Metadata.Add(new NameValueCollection() { { CreatedBy, Environment.UserDomainName } }); container.SetPermissions(blobContainerPermissions); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, ContainerCreatedFormat, name)); } return ok; }
/// <summary> /// Deletes a container on the current storage account. /// </summary> /// <param name="name">The container name.</param> public void DeleteContainer(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(ContainerNameCannotBeNull); } CloudBlobContainer container = cloudBlobClient.GetContainerReference(name); container.Delete(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, ContainerDeletedFormat, name)); } #endregion ... } |
通过Blob Commands 组内的按钮或者相应的菜单项来管理blob。
如上图,点击 New 按钮,应用会让您选择需要上传的多个文件。
Upload Files 对话框允许您选择Blob 类型 (Block 或 Page) ,以及执行模式 (Parallel 或 Sequential)。当您选择 Parallel 模式,应用会启动独立并发的上传 Task 。相反,当选择 Sequential 模式,应用会顺序上传文件。当你点击 OK 按钮,应用便会开始上传,并显示进度。
你也可以多选blob,然后按Delete 按钮来删除。或者按View 按钮来下载 blobs 。后者同样会出现Download Files 对话框,选择执行模式, Parallel 或 Sequential。
下表包含了CloudStorageHelper 内用于列出,上传,下载和删除block 和 page blobs的函数。 例如 UploadBlob 方法,其参数如下:
- BlobFileInfo 对象列表,表示每个文件的属性 (文件名, 容器名, Blob类型。)
- 上传执行模式: Parallel 或Sequential。
- 显示上传进度的事件处理程序。
- 上传完成时执行的Action 。
当选择 Sequential模式, UploadBlob 方法使用TaskFactory 来创建启动 Task。遍历blobFileInfoList ,为每个文件启动一个新的,独立的 Task 。 相反,当选择Parallel模式时, UploadBlob 同时为每个文件创建独立的Task 。 为了处理上传和下载Page Blobs,我高度定制了如下文章中的代码:
我的 UploadFile 方法创建FileStream 来读取需上传文件的内容, 并封装进 ProgressStream 对象。并将progressChangedHandler 事件处理程序赋值给ProgressStream 的ProgressChanged事件。注意, UploadFile 方法使用CloudBlockBlob 类的 UploadFromStream 方法CloudBlockBlob 类提供了多个上传文件的方法:
- PutBlock: 用于上传block,用于将来的block合并。block 不超过4 MB。上传若干block之后,通过调用PutBlockList 来创建或更新服务器边的blob。每个block在blob内有唯一的ID。
- UploadByteArray: 该方法上传字节数组至 一个 block blob。该方法继承自CloudBlob 类。
- UploadFile: 上传文件至 block blob。该方法继承自CloudBlob 类。
- UploadFromStream: 上传任意数据流至block blob。该方法继承自CloudBlob 类。
- UploadText: 上传文本字符串至block blob。该方法继承自CloudBlob 类。
当blob是Page类型时,UploadFile 使用CloudPageBlob 的WritePages 方法。Page Blobs 包含一系列的page,最小的page大小为512字节,最大4MB(必须为512字节的倍数) 。同样, CloudPageBlob 类包含多个上传方法:
- WritePages: 上传字节数组
- UploadByteArray: 上传字节数组。该方法继承自CloudBlob 类。
- UploadFile: 上传文件。该方法继承自CloudBlob 类。
- UploadFromStream: 上传任意数据流 。该方法继承自CloudBlob 类。
- UploadText: 上传文本字符串。该方法继承自CloudBlob 类。
DownloadBlob 方法实现于UploadFile 类似。DownloadFile 方法使用CloudBlockBlob 的 DownloadToStream 方法下载block blob。对于 Page blob用 BlobStream 对象来读取。更多请见Windows Azure Storage Client Library 。
public class CloudStorageHelper { ... #region Blob Methods /// <summary> /// Lists the blobs in the specified container. /// </summary> /// <param name="name">The container name.</param> /// <returns>The blobs contained in the specified container.</returns> public ResultSegment<IListBlobItem> ListBlobs(string name) { container = null;
if (string.IsNullOrEmpty(name)) { throw new ArgumentException(ContainerNameCannotBeNull); } container = cloudBlobClient.GetContainerReference(name); return container.ListBlobsSegmented(pageSize, continuationToken, blobRequestOptions); }
/// <summary> /// Returns a blob identified by its absolute URI. /// </summary> /// <param name="absoluteUri">The blob absolute URI.</param> /// <returns>The blob identified by its absolute URI.</returns> public CloudBlob GetBlob(string absoluteUri) { if (container == null) { throw new ArgumentException(CurrentContainerCannotBeNull); } CloudBlob cloudBlob = container.GetBlobReference(absoluteUri); cloudBlob.FetchAttributes(blobRequestOptions); return cloudBlob; }
/// <summary> /// Uploads a list of blobs to the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to upload.</param> /// <param name="executionOrder">The execution order: sequential or parallel.</param> public void UploadBlob(List<BlobFileInfo> blobFileInfoList, ExecutionOrder executionOrder) { UploadBlob(blobFileInfoList, executionOrder, null, null); }
/// <summary> /// Uploads a list of blobs to the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to upload.</param> /// <param name="executionOrder">The execution order: sequential or parallel.</param> /// <param name="progressChangedHandler">An event handler to invoke as the upload progresses.</param> /// <param name="action">An action to execute when the last upload has been completed.</param> public void UploadBlob(List<BlobFileInfo> blobFileInfoList, ExecutionOrder executionOrder, EventHandler<ProgressChangedEventArgs> progressChangedHandler, Action action) { try { taskCount = blobFileInfoList.Count; Task task = null;
if (blobFileInfoList == null || blobFileInfoList.Count == 0) { throw new ArgumentException(BlobFileInfoListCannotBeNull); }
if (executionOrder == ExecutionOrder.Sequential) { Task.Factory.StartNew(() => { try { for (int i = 0; i < blobFileInfoList.Count; i++) { task = Task.Factory.StartNew((p) => UploadFile(p), new Parameters(blobFileInfoList[i], progressChangedHandler, action)); Task.WaitAll(task); } } catch (AggregateException ex) { ex.Handle((e) => { HandleException(e); return true; }); } catch (Exception ex) { HandleException(ex); } }); } else { try { for (int i = 0; i < blobFileInfoList.Count; i++) { FileInfo fileInfo = new FileInfo(blobFileInfoList[i].FileName); blobFileInfoList[i].Length = fileInfo.Length; Task.Factory.StartNew((p) => UploadFile(p), new Parameters(blobFileInfoList[i], progressChangedHandler, action)); } } catch (AggregateException ex) { ex.Handle((e) => { HandleException(e); return true; }); } catch (Exception ex) { HandleException(ex); } } } catch (Exception ex) { HandleException(ex); } }
/// <summary> /// Uploads a file described by the argument. /// </summary> /// <param name="info">A BlobFileInfo object describing the file to upload.</param> /// <param name="progressChangedHandler">An event handler to invoke as the download progresses.</param> /// <param name="action">An action to execute when the last upload has been completed.</param> public void UploadFile(BlobFileInfo info, EventHandler<ProgressChangedEventArgs> progressChangedHandler, Action action) { try { CloudBlobContainer container = cloudBlobClient.GetContainerReference(info.ContainerName); CloudBlob cloudBlob = container.GetBlobReference(Path.GetFileName(info.FileName)); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentUICulture, BlobUploadingFormat, info.FileName)); switch (info.BlobType) { case BlobType.Block: CloudBlockBlob cloudBlockBlob = cloudBlob.ToBlockBlob; using (FileStream fileStream = new FileStream(info.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) { using (ProgressStream progressStream = new ProgressStream(info.Index, fileStream)) { progressStream.ProgressChanged += new EventHandler<ProgressChangedEventArgs>(progressChangedHandler); cloudBlockBlob.UploadFromStream(progressStream, new BlobRequestOptions() { Timeout = cloudBlobClient.Timeout }); } } break; case BlobType.Page: long totalUploaded = 0; long vhdOffset = 0; int offsetToTransfer = -1; CloudPageBlob cloudPageBlob = cloudBlob.ToPageBlob; long blobSize = RoundUpToPageBlobSize(info.Length); cloudPageBlob.Create(blobSize, new BlobRequestOptions { Timeout = cloudBlobClient.Timeout }); using (FileStream fileStream = new FileStream(info.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) { using (ProgressStream progressStream = new ProgressStream(info.Index, fileStream)) { progressStream.ProgressChanged += new EventHandler<ProgressChangedEventArgs>(progressChangedHandler); BinaryReader reader = new BinaryReader(progressStream);
while (vhdOffset < info.Length) { byte[] range = reader.ReadBytes(FourMegabytesAsBytes);
int offsetInRange = 0;
// Make sure end is page size aligned if ((range.Length % PageBlobPageSize) > 0) { int grow = (int)(PageBlobPageSize - (range.Length % PageBlobPageSize)); Array.Resize(ref range, range.Length + grow); }
// Upload groups of contiguous non-zero page blob pages. while (offsetInRange <= range.Length) { if ((offsetInRange == range.Length) || IsAllZero(range, offsetInRange, PageBlobPageSize)) { if (offsetToTransfer != -1) { // Transfer up to this point int sizeToTransfer = offsetInRange - offsetToTransfer; MemoryStream memoryStream = new MemoryStream(range, offsetToTransfer, sizeToTransfer, false, false); cloudPageBlob.WritePages(memoryStream, vhdOffset + offsetToTransfer); totalUploaded += sizeToTransfer; offsetToTransfer = -1; } } else { if (offsetToTransfer == -1) { offsetToTransfer = offsetInRange; } } offsetInRange += PageBlobPageSize; } vhdOffset += range.Length; } } } break; } Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentUICulture, BlobUploadedFormat, info.FileName)); } finally { lock (this) { taskCount--; if (taskCount == 0 && action != null) { action(); } } } }
/// <summary> /// Downloads a list of blobs from the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to upload.</param> /// <param name="executionOrder">The execution order: sequential or parallel.</param> public void DownloadBlob(List<BlobFileInfo> blobFileInfoList, ExecutionOrder executionOrder) { DownloadBlob(blobFileInfoList, executionOrder); }
/// <summary> /// Downloads a list of blobs from the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to upload.</param> /// <param name="executionOrder">The execution order: sequential or parallel.</param> /// <param name="progressChangedHandler">An event handler to invoke as the download progresses.</param> /// <param name="action">An action to execute when the last upload has been completed.</param> public void DownloadBlob(List<BlobFileInfo> blobFileInfoList, ExecutionOrder executionOrder, EventHandler<ProgressChangedEventArgs> progressChangedHandler, Action action) { try { taskCount = blobFileInfoList.Count; Task task = null;
if (blobFileInfoList == null || blobFileInfoList.Count == 0) { throw new ArgumentException(BlobFileInfoListCannotBeNull); }
if (executionOrder == ExecutionOrder.Sequential) { Task.Factory.StartNew(() => { try { for (int i = 0; i < blobFileInfoList.Count; i++) { task = Task.Factory.StartNew((p) => DownloadFile(p), new Parameters(blobFileInfoList[i], progressChangedHandler, action)); Task.WaitAll(task); } } catch (AggregateException ex) { ex.Handle((e) => { HandleException(e); return true; }); } catch (Exception ex) { HandleException(ex); } }); } else { try { for (int i = 0; i < blobFileInfoList.Count; i++) { long length = blobFileInfoList[i].CloudBlob.Properties.Length; blobFileInfoList[i].Length = length; Task.Factory.StartNew((p) => DownloadFile(p), new Parameters(blobFileInfoList[i], progressChangedHandler, action)); } } catch (AggregateException ex) { ex.Handle((e) => { HandleException(e); return true; }); } catch (Exception ex) { HandleException(ex); } } } catch (AggregateException ex) { ex.Handle((e) => { HandleException(e); return true; }); } }
/// <summary> /// Uploads a file described by the argument. /// </summary> /// <param name="info">A BlobFileInfo object describing the file to upload.</param> /// <param name="progressChangedHandler">An event handler to invoke as the download progresses.</param> /// <param name="action">An action to execute when the last upload has been completed.</param> public void DownloadFile(BlobFileInfo info, EventHandler<ProgressChangedEventArgs> progressChangedHandler, Action action) { try { CloudBlob cloudBlob = info.CloudBlob; string filePath = Path.Combine(info.LocalFolder, info.FileName); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentUICulture, BlobDownloadingFormat, info.FileName)); switch (info.BlobType) { case BlobType.Block: CloudBlockBlob cloudBlockBlob = cloudBlob.ToBlockBlob; using (FileStream fileStream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None)) { using (ProgressStream progressStream = new ProgressStream(info.Index, (int)info.Length, fileStream)) { progressStream.ProgressChanged += new EventHandler<ProgressChangedEventArgs>(progressChangedHandler); cloudBlockBlob.DownloadToStream(progressStream, new BlobRequestOptions() { Timeout = cloudBlobClient.Timeout }); } } break; case BlobType.Page: CloudPageBlob cloudPageBlob = cloudBlob.ToPageBlob; using (FileStream fileStream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None)) { fileStream.SetLength(info.Length); using (ProgressStream progressStream = new ProgressStream(info.Index, (int)info.Length, fileStream)) { progressStream.ProgressChanged += new EventHandler<ProgressChangedEventArgs>(progressChangedHandler); IEnumerable<PageRange> pageRanges = cloudPageBlob.GetPageRanges(); BlobStream blobStream = cloudPageBlob.OpenRead();
foreach (PageRange range in pageRanges) { int rangeSize = (int)(range.EndOffset + 1 - range.StartOffset);
for (int subOffset = 0; subOffset < rangeSize; subOffset += FourMegabytesAsBytes) { int subRangeSize = Math.Min(rangeSize - subOffset, FourMegabytesAsBytes); blobStream.Seek(range.StartOffset + subOffset, SeekOrigin.Begin); progressStream.Seek(range.StartOffset + subOffset, SeekOrigin.Begin);
byte[] buffer = new byte[subRangeSize];
blobStream.Read(buffer, 0, subRangeSize); progressStream.Write(buffer, 0, subRangeSize); } } } } break; } Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentUICulture, BlobDownloadedFormat, info.FileName)); } finally { lock (this) { taskCount--; if (taskCount == 0 && action != null) { action(); } } } }
/// <summary> /// Delets a a blob from the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to delete.</param> public void DeleteBlob(BlobFileInfo info) { if (info != null) { CloudBlob cloudBlob = new CloudBlob(info.AbsoluteUri as string, cloudBlobClient); cloudBlob.DeleteIfExists(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, BlobDeletedFormat, info.FileName)); } }
/// <summary> /// Delets a list of blobs from the current storage account. /// </summary> /// <param name="blobFileInfoList">The list of blobs to delete.</param> public void DeleteBlob(List<BlobFileInfo> blobFileInfoList) { if (blobFileInfoList != null) { for (int i = 0; i < blobFileInfoList.Count; i++) { try { CloudBlob cloudBlob = new CloudBlob(blobFileInfoList[i].AbsoluteUri as string, cloudBlobClient); cloudBlob.DeleteIfExists(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, BlobDeletedFormat, blobFileInfoList[i].FileName)); } catch (Exception ex) { HandleException(ex); } } } } #endregion ... } |
为了控制并行度,你可以设置CloudBlobClient的 ParallelOperationThreadCount 属性。一般而言,设置为逻辑处理器的数量。
当选择Queue 时,应用会检索账户下所有的队列,并在左侧显示TreeView。选中某个队列后,右侧会显示该队列所有的信息。
以下包含了CloudStorageHelper 内列出,创建和删除队列的代码。
public class CloudStorageHelper { ... #region Queue Methods /// <summary> /// Lists the available queues on the current storage account. /// </summary> /// <returns>The list of queues on the current storage account.</returns> public List<CloudQueue> ListQueues() { if (cloudQueueClient == null) { if (cloudStorageAccount == null) { CreateCloudStorageAccount(); } if (cloudStorageAccount != null) { cloudQueueClient = cloudStorageAccount.CreateCloudQueueClient(); cloudQueueClient.Timeout = TimeSpan.FromSeconds(timeout); cloudQueueClient.RetryPolicy = RetryPolicies.RetryExponential(retryCount, TimeSpan.FromSeconds(retryInterval)); } } if (cloudQueueClient != null) { return new List<CloudQueue>(cloudQueueClient.ListQueues()); } else { return null; } }
/// <summary> /// Creates a queue on the current storage account. /// </summary> /// <param name="name">The queue name.</param> /// <returns>True if the queue is successfully created, false otherwise.</returns> public bool CreateQueue(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); }
CloudQueue queue = cloudQueueClient.GetQueueReference(name.ToLower()); bool ok = queue.CreateIfNotExist(); if (ok) { queue.Metadata.Add(new NameValueCollection() { { CreatedBy, Environment.UserDomainName } }); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, QueueCreatedFormat, name)); } return ok; }
/// <summary> /// Deletes a queue on the current storage account. /// </summary> /// <param name="name">The queue name.</param> public void DeleteQueue(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); }
CloudQueue queue = cloudQueueClient.GetQueueReference(name); queue.Delete(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, QueueDeletedFormat, name)); }
#endregion ... } |
点击事件处理程序调用CloudStorageHelper 的ListMessages 方法 ,依次执行CloudQueue 的GetMessages 的方法来检索当前队列的消息。GetMessages 检索特定数量的消息,单次调用最多返回32条消息。GetMessages 和 GetMessage 方法并不消耗该消息。因此,应用通过GetMessage 或GetMessages调用检索消息时,应用应该处理该消息,并显式的通过CloudQueue 的DeleteMessage 方法删除该消息。当消息被检索之后,其PopReceipt 属性被设置为一个透明值,以标示该消息已经被读取。该值用于验证消息是否被删除的消息是否为同一条。 当应用检索回一条消息时,消息会被保留用于删除,保留的时间由消息的 NextVisibleTime 属性决定,期间无法被其他应用检索。如果时间过期之后,消息还没有被删除,则其他应用便能检索该消息。如果随后的应用并没有删除它,则先前的应用已经可以检索并删除。CloudQueue 类提供2个方法 : PeekMessage 和PeekMessages ,用于检索一条和多条消息,而不是消耗这些消息。换言之, PeekMessage 或 PeekMessages 方法读取消息之后,其余应用也能检索该消息。调用PeekMessage 并不更新消息的 PopReceipt 值。删除一条或多条消息,简单的选中他们,并点击Delete按钮。
以下包含CloudStorageHelper 列出,创建,查看和删除消息的代码:
public class CloudStorageHelper { ... #region Message Methods /// <summary> /// Gets a set of messages from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <returns>The messages contained in the specified container.</returns> public List<CloudQueueMessage> GetMessages(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); return new List<CloudQueueMessage>(queue.GetMessages(CloudQueueMessage.MaxNumberOfMessagesToPeek, TimeSpan.FromSeconds(1))); }
/// <summary> /// Peeks a set of messages from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <returns>The messages contained in the specified container.</returns> public List<CloudQueueMessage> PeekMessages(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); return new List<CloudQueueMessage>(queue.PeekMessages(CloudQueueMessage.MaxNumberOfMessagesToPeek)); }
/// <summary> /// Writes a messages to the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <param name="name">The message text.</param> public void AddMessage(string name, string text) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } if (string.IsNullOrEmpty(text)) { throw new ArgumentException(MessageTextCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); CloudQueueMessage message = new CloudQueueMessage(text); queue.AddMessage(message); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, MessageAddedFormat, name)); }
/// <summary> /// Gets a message from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <returns>A message from the queue.</returns> public CloudQueueMessage GetMessage(string name) { CloudQueueMessage message = null;
if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); if (queue != null) { message = queue.GetMessage(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, MessageReadFormat, message.Id, name)); return message; } return message; }
/// <summary> /// Peeks a message from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <returns>A message from the queue.</returns> public CloudQueueMessage PeekMessage(string name) { CloudQueueMessage message = null;
if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); if (queue != null) { message = queue.PeekMessage(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, MessageReadFormat, message.Id, name)); return message; } return message; }
/// <summary> /// Deletes a message from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <param name="messageList">The message to delete.</param> public void DeleteMessage(string name, CloudQueueMessage message) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } if (message == null) { throw new ArgumentException(MessageCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); if (queue != null) { string id = message.Id; queue.DeleteMessage(message); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, MessageDeletedFormat, id, name)); } }
/// <summary> /// Deletes a set of messages from the specified queue. /// </summary> /// <param name="name">The queue name.</param> /// <param name="messageList">The list of messages to delete.</param> public void DeleteMessage(string name, List<CloudQueueMessage> messageList) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(QueueNameCannotBeNull); } CloudQueue queue = cloudQueueClient.GetQueueReference(name); if (queue != null && messageList != null && messageList.Count > 0) { string id; foreach (var message in messageList) { id = message.Id; queue.DeleteMessage(message); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, MessageDeletedFormat, id, name)); } } } #endregion ... } |
以下包含CloudStorageHelper 列出,创建和删除table的代码:
public class CloudStorageHelper { ... #region Table Methods /// <summary> /// Lists the available tables on the current storage account. /// </summary> /// <returns>The list of tables on the current storage account.</returns> public List<string> ListTables() { if (cloudTableClient == null) { if (cloudStorageAccount == null) { CreateCloudStorageAccount(); } if (cloudStorageAccount != null) { cloudTableClient = cloudStorageAccount.CreateCloudTableClient(); cloudTableClient.RetryPolicy = RetryPolicies.RetryExponential(retryCount, TimeSpan.FromSeconds(retryInterval)); } } if (cloudTableClient != null) { return new List<string>(cloudTableClient.ListTables()); } else { return null; } }
/// <summary> /// Creates a table on the current storage account. /// </summary> /// <param name="name">The table name.</param> /// <returns>True if the table is successfully created, false otherwise.</returns> public bool CreateTable(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } bool ok = cloudTableClient.CreateTableIfNotExist(name); if (ok) { Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, TableCreatedFormat, name)); } return ok; }
/// <summary> /// Deletes a table from the current storage account. /// </summary> /// <param name="name">The table name.</param> /// <returns>True if the table is successfully deleted, false otherwise.</returns> public bool DeleteTable(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } bool ok = cloudTableClient.DeleteTableIfExist(name); if (ok) { Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, TableDeletedFormat, name)); } return ok; } #endregion ... } |
选择某个实体,点击View 按钮,显示并修改其属性。
当然,您可以点击Delete 按钮来删除实体。
为了给泛型和无类型实体建模,我创建了一个继承自TableServiceEntity 的CustomEntity 类。 CustomEntity 类使用Dictionary<string, object> 集合来存储实体的属性。实体上,我先前试过用 DynamicObject 来表示实体类,但是DataServiceContext 类, 并不支持动态对象,所以只能退而求其次了。
通过配置文件appSettings 部分来调节页大小或其他设置,也可以通过Edit菜单下的Settings 来设置。如下图:
点击More 按钮来检索下一页的内容。
CloudStorageHelper 的ListEntities 方法使用 ResultContinuation 对象来维护一个延续令牌,并继续查询直到检索回所有的结果。
该应用可以通过提交查询来获取某个表的子集数据。为了创建和执行 CloudQueryTable 对象,应该按如下步骤:
- 创建 TableServiceContext 实例。该对象允许对数据服务执行查询操作。更多请见:
"Querying the Data Service (WCF Data Services)" on MSDN。
- 用DataServiceContext 的 CreateQuery<T>() 方法创建 DataServiceQuery<T>实例, T 是查询返回的实体类型。
- 使用 AddQueryOption 方法为 DataServiceQuery<T> 对象增加查询选项。更多请见:
- "Query Entities" on MSDN.
- "OData: URI Conventions" on the OData site.
- 使用TableServiceExtensionMethods 的拓展方法 AsTableServiceQuery() 创建IQueryable<T> 对象,T 是查询返回的实体类型。
- 最后,使用同步的 Execute 方法,或者异步 BeginExecuteSegmented 方法来返回查询结果。 更多请见如下 QueryEntities 方法。
注意当实体类型是继承自TableServiceEntity 的定制类,并暴露每个实体的公有属性,那么便可以通过如下LINQ 语句进行查询: var query = from entity in context.CreateQuery<Customer>("SpecificCustomer") where entity.LastName.Equals("Smith") && entity.FirstName.Equals("John") select entity; 更多请见:
以下包含CloudStorageHelper 类用于列出,创建,查看,更新,以及查出实体的方法代码。
public class CloudStorageHelper { ... #region Entity Methods /// <summary> /// Lists the entities in the specified table. /// </summary> /// <param name="name">The table name.</param> /// <returns>The entities contained in the specified table.</returns> public List<CustomEntity> ListEntities(string name) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } IAsyncResult asynResult = null; CreateTableServiceContext(); ResultSegment<CustomEntity> entitySegment = null; CloudTableQuery<CustomEntity> cloudTableQuery = tableServiceContext.CreateQuery<CustomEntity>(name).Take<CustomEntity>(pageSize).AsTableServiceQuery<CustomEntity>(); asynResult = cloudTableQuery.BeginExecuteSegmented(continuationToken, new AsyncCallback(result => { var query = result.AsyncState as CloudTableQuery<CustomEntity>; entitySegment = query.EndExecuteSegmented(result); continuationToken = entitySegment.ContinuationToken; }), cloudTableQuery); asynResult.AsyncWaitHandle.WaitOne(); return entitySegment != null && entitySegment.Results.Count<CustomEntity>() > 0 ? entitySegment.Results.ToList<CustomEntity>() : new List<CustomEntity>();
/// <summary> /// Executes a query against the specified table. /// </summary> /// <param name="name">The table name.</param> /// <param name="query">The query to execute.</param> /// <returns>The entities contained in the specified table.</returns> public List<CustomEntity> QueryEntities(string name, string query) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } IAsyncResult asynResult = null; CreateTableServiceContext(); ResultSegment<CustomEntity> entitySegment = null; DataServiceQuery<CustomEntity> dataServiceQuery = tableServiceContext.CreateQuery<CustomEntity>(name); dataServiceQuery = AddQueryOptions(dataServiceQuery, query); CloudTableQuery<CustomEntity> cloudTableQuery = dataServiceQuery.Take<CustomEntity>(pageSize).AsTableServiceQuery<CustomEntity>(); asynResult = cloudTableQuery.BeginExecuteSegmented(continuationToken, new AsyncCallback(result => { var query = result.AsyncState as CloudTableQuery<CustomEntity>; entitySegment = query.EndExecuteSegmented(result); continuationToken = entitySegment.ContinuationToken; }), cloudTableQuery); asynResult.AsyncWaitHandle.WaitOne(); return entitySegment != null && entitySegment.Results.Count<CustomEntity>() > 0 ? entitySegment.Results.ToList<CustomEntity>() : new List<CustomEntity>(); }
/// <summary> /// Adds an entitiy to the specified table. /// </summary> /// <param name="name">The table name.</param> /// <param name="entity">The entity to add.</param> public void AddEntity(string name, CustomEntity entity) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } if (entity == null) { throw new ArgumentException(EntityCannotBeNull); } tableServiceContext.AddObject(name, entity); tableServiceContext.SaveChanges(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, EntityAddedFormat, entity.PartitionKey, entity.RowKey, name)); }
/// <summary> /// Updates an entitiy in the specified table. /// </summary> /// <param name="name">The table name.</param> /// <param name="entity">The entity to add.</param> public void UpdateEntity(string name, CustomEntity entity) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } if (entity == null) { throw new ArgumentException(EntityCannotBeNull); } tableServiceContext.UpdateObject(entity); tableServiceContext.SaveChangesWithRetries(SaveChangesOptions.ReplaceOnUpdate); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, EntityUpdatedFormat, entity.PartitionKey, entity.RowKey, name)); }
/// <summary> /// Deletes a list of entities in the specified table. /// </summary> /// <param name="name">The table name.</param> /// <param name="entityList">The list of entities.</param> public void DeleteEntity(string name, List<CustomEntity> entityList) { if (string.IsNullOrEmpty(name)) { throw new ArgumentException(TableNameCannotBeNull); } if (entityList == null || entityList.Count == 0) { throw new ArgumentException(EntityListCannotBeNull); } foreach (var entity in entityList) { tableServiceContext.DeleteObject(entity); tableServiceContext.SaveChanges(); Trace.WriteLineIf(traceEnabled, string.Format(CultureInfo.CurrentCulture, EntityDeletedFormat, entity.PartitionKey, entity.RowKey, name)); } } #endregion ... } |
从 这里 下载本文的源代码。我希望它能对您开发Windows Azure 应用有所帮助。
Storage Services
- "Exploring Windows Azure Storage" on MSDN.
- "Windows Azure Code Samples" on CodePlex.
- "Windows Azure Storage Architecture Overview" on Windows Azure Storage Team Blog.
Storage Account
- "Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "SetConfigurationSettingPublisher() - Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "Blobs - Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "Access Control for Azure Blobs" on Neil Mackenzie's Blog.
- "Using Windows Azure Page Blobs and How to Efficiently Upload and Download Page Blobs" on Windows Azure Storage Team Blog.
- "Queues - Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "Entities in Azure Tables" on Neil Mackenzie's Blog.
- "Tables - CloudTableClient - Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "More on Azure Tables - Azure Storage Client v1.0" on Neil Mackenzie's Blog.
- "Queries in Azure Tables" on Neil Mackenzie's Blog.
- "Partitions in Windows Azure Table" on Neil Mackenzie's Blog.
- "Paging with Windows Azure Table Storage" on Scott Densmore's Blog.
- "Table Storage Backup & Restore for Windows Azure" on CodePlex.
- "Protecting Your Tables Against Application Errors" on Windows Azure Storage Team Blog.
- "Azure Links and Some Cloudy Links"on Neil Mackenzie's Blog.
- "Cloud Cover" on Channel9.