Jaa


Use TPL with Azure SDK 1.8 Table Storage

In a previous post I showed how I updated one of my helper classes for accessing table storage to the new Azure SDK. I saw some questions online about how to use TPL with the new SDK so here is a rewritten version of my helper class. This is designed to use the async and await keywords. Here is the full class:

 using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Microsoft.WindowsAzure.Storage.Table;
using System;
using System.Threading.Tasks;

public class AzureTableHelper
{
    CloudStorageAccount storageAccount;
    CloudTableClient cloudTableClient;
    CloudTable cloudTable;

    const string TableName = "testruns";

    public class Run : TableEntity
    {
        public Run() : this(Guid.NewGuid(), RunStatus.Unknown) { }

        public Run(Guid runId, RunStatus status) : this(runId.ToString(), status.ToString(), runId.ToString() + ".xml") { }

        public Run(string runId, string status, string outputBlob)
            : base(TableName, runId)
        {
            this.RunId = runId;
            this.Status = status;
            this.OutputBlob = outputBlob;
        }

        public string RunId { get; set; }
        public string Status { get; set; }
        public string OutputBlob { get; set; }
    }

    public AzureTableHelper()
    {
        string accountName = CloudConfigurationManager.GetSetting("StorageAccountName");
        string accountKey = CloudConfigurationManager.GetSetting("StorageAccountKey");

        if (!string.IsNullOrWhiteSpace(accountName) && !string.IsNullOrWhiteSpace(accountKey))
        {
            this.storageAccount = new CloudStorageAccount(new StorageCredentials(accountName, accountKey), false);
        }
        else
        {
            this.storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
        }

        this.cloudTableClient = this.storageAccount.CreateCloudTableClient();
        this.cloudTable = this.cloudTableClient.GetTableReference(TableName);
        this.cloudTable.CreateIfNotExists();
    }

    public async Task<TableResult> AddRun(Guid guid, RunStatus runStatus)
    {
        Task<TableResult> task = Task<TableResult>.Factory.FromAsync(
            this.cloudTable.BeginExecute,
            this.cloudTable.EndExecute,
            TableOperation.Insert(new Run(guid, runStatus)),
            new TableRequestOptions() { RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(3), 3) });
        return await task;
    }

    public async Task<TableResult> UpdateRun(Guid guid, RunStatus runStatus)
    {
        Run run = await this.GetRun(guid);
        Task<TableResult> task = Task<TableResult>.Factory.FromAsync(
                this.cloudTable.BeginExecute,
                this.cloudTable.EndExecute,
                TableOperation.Replace(run),
                new TableRequestOptions() { RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(3), 3) });
        return await task;
    }

    public async Task<Run> GetRun(Guid guid)
    {
        Task<TableResult> task = Task<TableResult>.Factory.FromAsync(
            this.cloudTable.BeginExecute,
            this.cloudTable.EndExecute,
            TableOperation.Retrieve<Run>(TableName, guid.ToString()),
            new TableRequestOptions() { RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(3), 3) });
        TableResult tr = await task;
        return tr.Result as Run;
    }
}

UPDATE - Previously I wrote this class using the EntityResolver. That's not necessary when you're using a TableEntity. The original code looked like this:

     public async Task<Run> GetRun(Guid guid)
    {
        Task<TableResult> task = Task<TableResult>.Factory.FromAsync(
            this.cloudTable.BeginExecute,
            this.cloudTable.EndExecute,
            TableOperation.Retrieve<Run>(
                TableName,
                guid.ToString(),
                new EntityResolver<Run>(
                    (partitionKey, rowKey, timeStamp, properties, etag) =>
                    {
                        return new Run(
                            rowKey,
                            properties["Status"].StringValue,
                            properties["OutputBlob"].StringValue) { ETag = etag, Timestamp = timeStamp };
                    })),
            new TableRequestOptions() { RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(3), 3) });
        TableResult tr = await task;
        return tr.Result as Run;
    }

The SDK is not returning Tasks. But it is easy to create Tasks from the standard async begin/end method pattern in .Net. With async and await, we can chain a bunch of tasks with ease.

This can also be applied to WCF. For instance, say I have a service that allows you to get the RunStatus for one of the above runs. I can now indicate in my service contract interface that I want to return a Task instead of RunStatus.

 using System;
using System.ServiceModel;
using System.ServiceModel.Web;
using System.Threading.Tasks;

[ServiceContract]
public interface IRunService
{
    [OperationContract]
    [WebInvoke(Method = "GET", UriTemplate = "Status?runId={runId}")]
    Task<RunStatus> GetRunStatus(Guid runId);
}

Then in the implementation I can do this:

 using System;
using System.ServiceModel;
using System.Threading.Tasks;

[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class PerfRunService : IPerfRunService
{
    private AzureTableHelper tableHelper;

    public PerfRunService()
    {
        this.tableHelper = new AzureTableHelper();
    }

    public async Task<PerfRunStatus> GetPerfRunStatus(Guid perfRunId)
    {
        CloudPerfDriverBase.AzureTableHelper.Run run = await this.tableHelper.GetRun(perfRunId);
        return run != null ? (PerfRunStatus)Enum.Parse(typeof(PerfRunStatus), run.Status) : PerfRunStatus.Unknown;
    }
}

I'm always happy to see WCF code written in a way where the thread is not blocked and TPL with async and await make that so much easier than ever before.