Compartilhar via


CloudMutex - Synchronize threads across machines using Azure

Problem

I wanted to be able to read and write a particular set of resources, in my sample case this was an Azure blob as well as some associated SQL Database records, from one or more applications, machines, or devices such that only one consumer thread would be allowed to modify these resource at any one time.  For the sake of this article, think of those resources as an atomic unit that should only ever be modified together and not individually, nor by more than one thread at a time.  If I were solving this problem within a single app domain, I could use a simple critical section using the Monitor class, or the lock keyword.  If I were solving this problem within a single machine (OS instance), I could then upgrade to the Mutex class while agreeing upon a shared mutex name for synchronization.  One of the beautiful aspects of these synchronization mechanisms is that they provide near instant notification once the mutual exclusion or lock becomes available for another thread to acquire; this facilitates low latency, event-based collaboration between threads.  They also implement a queued access model whereby access to the shared resource is granted on a first-come, first-serve basis.

These OS-level techniques break down once I introduce the requirement to synchronize in the same way across devices.  While I recognize that there are many ways to use other technologies as surrogates for these locking techniques when dealing with multiple devices, I sought to find one that looked and smelled as similar to .NET's Mutex class as possible.  After some hours of searching, I found nothing I quite liked, so I've invented one that leverages Windows Azure Service Bus on the backend.  The remainder of this article details a very early version of a class I call CloudMutex that implements the WaitOne and ReleaseMutex methods familiar to consumers of .NET's System.Threading.Mutex class.  Please note that this is a very preliminary attempt and I'm hoping for great feedback from my peers before investing too much.  If a ready-made solution already exists, I'd love to see that as well.

This article assumes that you have a basic understanding of Windows Azure Service Bus Queues along with solid C# fundamentals including thread synchronization mechanisms.  For lack of that, please do some preparatory learning before going much further.  Also note that hereafter, I'll just use the word "device" to refer to any OS instances, whether physical or virtual, on any operating system, anywhere with access to the Internet.  Further, for those so inclined to want a more detailed definition of a mutex, think of it as a binary semaphore with thread affinity; that is, only one thread can own it at a time.

Starting from the end

The first most salient unit test that I wrote sought to show that thread synchronization within a single process worked the same way as it would with the out-of-the-box Mutex class.   As such, I drafted the following test case:

[TestMethod]

              public void CanAcquireAndReleaseMutex()

              {

                     string mutexName = Guid.NewGuid().ToString();

                     var mutex = new CloudMutex(false, mutexName);

                     var acquiredByWorker = new ManualResetEvent(false);

                     string workerMessage = null;

 

                     ThreadPool.QueueUserWorkItem(new WaitCallback(o =>

                           {

                                  var workerMutex = new CloudMutex(false, mutexName);

                                  Console.WriteLine("Worker thread \"{0}\" has started.", Thread.CurrentThread.ManagedThreadId);

                                  workerMutex.WaitOne();

                                  Console.WriteLine("Worker thread \"{0}\" has acquired the cloud workerMutex and will wait 2 seconds.", Thread.CurrentThread.ManagedThreadId);

                                  acquiredByWorker.Set();

                                  Thread.Sleep(2 * 1000);

                                  workerMessage = Guid.NewGuid().ToString();

                                  Console.WriteLine("Worker thread \"{0}\" has waited 2 seconds, set the worker message to \"{1}\" and will release the workerMutex.", Thread.CurrentThread.ManagedThreadId, workerMessage);

                                  workerMutex.ReleaseMutex();

                                  Console.WriteLine("Worker thread \"{0}\" has released the workerMutex.", Thread.CurrentThread.ManagedThreadId);

                           }));

 

                     acquiredByWorker.WaitOne();

                     Console.WriteLine("Base thread \"{0}\" notified that the worker thread has acquired the cloud mutex.", Thread.CurrentThread.Name);

                     mutex.WaitOne();

                     Console.WriteLine("Base thread \"{0}\" has acquired the cloud mutex.", Thread.CurrentThread.Name);

                     mutex.Delete();

                     Assert.IsFalse(String.IsNullOrWhiteSpace(workerMessage));

              }

 

In words, the main test thread is taking the following steps:

  • Creating a mutex with a Guid-based, random name that is does not initially have ownership of
  • Spinning up a worker thread from the thread pool whose job is to acquire that mutex and wait two seconds before releasing it
  • Waiting for the worker thread to release the mutex and immediately acquire it
  • Finally, deleting the mutex as a matter of good housekeeping

The worker thread is taking the following steps:

  • Acquiring the mutex
  • Signaling to the main test thread that it has acquired the mutex
  • Waiting two seconds
  • Releasing the mutex

The above design ensures that the worker thread will always acquire the mutex before the main test thread attempts to do so. The output from the test will look approximately as follows:

(T+0 sec) - Worker thread "5" has started.

(T+0 sec) - Worker thread "5" has acquired the cloud workerMutex and will wait 2 seconds.

(T+0 sec) - Base thread "9" notified that the worker thread has acquired the cloud mutex.

(T+2 sec) - Worker thread "5" has waited 2 seconds, set the worker message to "{76B921AA-AF9D-482F-97A9-496B9701A5C7}" and will release the workerMutex.

(T+2 sec) - Base thread "9" has acquired the cloud mutex.

 

Summarily, for this very simple case, the CloudMutex works just as a regular mutex would but uses the cloud as the backing mechanism as opposed to resources on a single machine. I'm currently working on much more sophisticated cross machine test cases but, for now, trust me that this works whether the participants are in the same process or across the world from one another, so long as they both have internet access. and can someone agree on a mutex name (along with other Azure Service Bus details).

 

How it works

The hardest working class is, not suprisingly, the CloudMutex class itself.  The first method to pay attention to is the constructor, which orchestrates the setup and initialization of the CloudMutex and eventually calls the EnsureMetaLockQueue() method that sets up a singleton queue within service bus that acts as a global, namespace-level mutex. It then goes on to intialize other important instance properties followed finally by a call to EnsureMutexQueue(), which makes sure that the Service Bus queue that backs the actual mutex is ready to go.  Without further ado, I'll post the entire contents of the CloudMutex class followed by additional sections and code that explain other pieces of this puzzle as I've implemented it thus far.

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.Diagnostics;

 

using Microsoft.ServiceBus;

using Microsoft.ServiceBus.Messaging;

 

using Yeast.WindowsAzure.ServiceBus;

 

namespace Yeast.WindowsAzure.Threading

{

       public class CloudMutex

              : WaitHandle

       {

              #region Constants

 

              public const string MetaLockQueueName = "cloudmutex_metalock";

              public const string MessageIDProperty = "ID";

              public static readonly Guid MetaLockMessageID = Guid.Parse("65A19C9A-40F3-4018-B16F-1923D3EA0457");

 

              #endregion

 

              #region Fields

 

             [ThreadStatic]

              private static Dictionary<Uri, CloudMutex> _threadOwnedMutexes = new Dictionary<Uri, CloudMutex>();

 

              #endregion

 

              #region Constructors & Initialization

 

              private CloudMutex()

              {

              }

 

              public CloudMutex(bool initiallyOwned, string name)

                     : this(initiallyOwned, name, ServiceBusSettings.FromConfiguration())

              {

              }

 

              public CloudMutex(bool initiallyOwned, string name, ServiceBusSettings settings)

                     : this(initiallyOwned, name, settings, CloudMutexConstructionMode.Full)

              {

              }

 

              protected CloudMutex(bool initiallyOwned, string name, ServiceBusSettings settings, CloudMutexConstructionMode mode)

              {

                     try

                     {

                           Settings = settings;

                           MetaLockQueueClient = Settings.Factory.CreateQueueClient(MetaLockQueueName, ReceiveMode.PeekLock);

                           EnsureMetaLockQueue();

 

                           if (mode == CloudMutexConstructionMode.Full)

                           {

                                  InstanceID = Guid.NewGuid();

                                  InitiallyOwned = initiallyOwned;

                                  Name = name;

                                  var builder = new UriBuilder(Settings.ServiceUri);

                                  builder.Path = Name;

                                  MutexQueueUri = builder.Uri;

                                  NextList = new Dictionary<Guid, CloudMutex>();

                                  MutexQueueClient = Settings.Factory.CreateQueueClient(Name, ReceiveMode.ReceiveAndDelete);

                                  EnsureMutexQueue();

                           }

                     }

                     catch (Exception exception)

                     {

                           throw new Exception(String.Format("Failed to create an instance of {0} on the Service Bus endpoint at {1}.", GetType().Name, Settings.ServiceUri), exception);

                     }

              }

 

              private void SeedMetaLockMessage()

              {

                     var singletonMessage = new BrokeredMessage();

                     singletonMessage.Properties[MessageIDProperty] = MetaLockMessageID;

                     MetaLockQueueClient.Send(singletonMessage);

              }

 

              protected void EnsureMetaLockQueue()

              {

                     try

                     {

                           if (!Settings.NamespaceClient.QueueExists(MetaLockQueueName))

                           {

                                  MetaLockQueueDescription = Settings.NamespaceClient.CreateQueue(MetaLockQueueName);

                                  SeedMetaLockMessage();

                                  CreatedMetaLockQueue = true;

                           }

                           else

                           {

                                  MetaLockQueueDescription = Settings.NamespaceClient.GetQueue(MetaLockQueueName);

                                  if (MetaLockQueueDescription.MessageCount == 0)

                                  {

                                         SeedMetaLockMessage();

                                  }

                           }

                     }

                     catch (Exception exception)

                     {

                           throw new Exception(String.Format("Failed to ensure the existence of, or to gain access to the meta lock queue with name {0}.", MetaLockQueueName), exception);

                     }

              }

 

              protected void EnsureMutexQueue()

              {

                     try

                     {

                           if (!Settings.NamespaceClient.QueueExists(Name))

                           {

                                  using (var scope = new MetaLockScope(MetaLockQueueClient))

                                  {

                                         OwnsMetaLock = true;

                                         if (!Settings.NamespaceClient.QueueExists(Name))

                                         {

                                                Trace.WriteLine(String.Format("{0} - Queue '{1}' does not exist at {2} and will be created.", DateTime.Now, Name, Settings.ServiceUri), GetType().Name);

                                                MutexQueueDescription = Settings.NamespaceClient.CreateQueue(Name);

                                                Trace.WriteLine(String.Format("{0} - Successfully created queue '{1}' at {2}.", DateTime.Now, Name, Settings.ServiceUri), GetType().Name);

                                                if (!InitiallyOwned)

                                                {

                                                       var mutexMessage = new BrokeredMessage();

                                                       MutexQueueClient.Send(mutexMessage);

                                                }

                                         }

                                         else

                                         {

                                                MutexQueueDescription = Settings.NamespaceClient.GetQueue(Name);

                                                if (InitiallyOwned)

                                                {

                                                       WaitOne();

                                                }

                                         }

                                  }

 

                                  OwnsMetaLock = false;

                           }

                           else

                           {

                                  MutexQueueDescription = Settings.NamespaceClient.GetQueue(Name);

                           }

                     }

                     catch (Exception exception)

                     {

                           throw new Exception(String.Format("Failed to ensure the existence of, or to gain access to the mutex queue with name {0}.", Name), exception);

                     }

              }

 

              #endregion

 

              #region Properties

 

              public Guid InstanceID

              {

                     get;

                     private set;

              }

 

              protected BrokeredMessage AcquisitionMessage

              {

                     get;

                     private set;

              }

 

              public CloudMutex Previous

              {

                     get;

                     private set;

              }

 

              public Dictionary<Guid, CloudMutex> NextList

              {

                     get;

                     private set;

              }

 

              public Uri MutexQueueUri

              {

                     get;

                     private set;

              }

 

              public bool OwnsMutex

              {

                     get;

                     private set;

              }

 

              public bool OwnsMetaLock

              {

                     get;

                     private set;

              }

 

              public bool CreatedMetaLockQueue

              {

                     get;

                     private set;

              }

 

              public bool InitiallyOwned

              {

                     get;

                     private set;

              }

 

              public string Name

              {

                     get;

                     private set;

              }

 

              public ServiceBusSettings Settings

              {

                     get;

                     private set;

              }

             

              public MessagingFactory Factory

              {

                     get;

                     private set;

              }

 

              protected QueueClient MetaLockQueueClient

              {

                     get;

                     private set;

              }

 

              protected QueueDescription MetaLockQueueDescription

              {

                     get;

                     private set;

              }

 

              protected QueueClient MutexQueueClient

              {

                     get;

                     private set;

              }

 

              protected QueueDescription MutexQueueDescription

              {

                     get;

                     private set;

              }

 

              #endregion

 

              #region Object Overrides

 

              public override int GetHashCode()

              {

                     return MutexQueueUri.GetHashCode();

              }

 

              public override string ToString()

              {

                     return MutexQueueUri.ToString();

              }

 

              public override bool Equals(object obj)

              {

                     bool areEqual = false;

 

                     if (obj is CloudMutex)

                     {

                           var operand = obj as CloudMutex;

                           areEqual = MutexQueueUri == operand.MutexQueueUri;

                     }

 

                     return areEqual;

              }

 

              #endregion

 

              #region Methods

 

              public static void ReinitializeMetaLock()

              {

                     ReinitializeMetaLock(ServiceBusSettings.FromConfiguration());

              }

 

              public static void ReinitializeMetaLock(ServiceBusSettings settings)

              {

                     settings.NamespaceClient.DeleteQueue(MetaLockQueueName);

                     var mutex = new CloudMutex(false, null, settings, CloudMutexConstructionMode.MetaLockOnly);

              }

 

              public override bool WaitOne(TimeSpan timeout)

              {

                     return WaitOne(timeout, false);

              }

 

              public override bool WaitOne(TimeSpan timeout, bool exitContext)

              {

                     return WaitOne((int)timeout.TotalMilliseconds, exitContext);

              }

 

              public override bool WaitOne()

              {

                     return WaitOne(1000 * 60 * 5);

              }

 

              public override bool WaitOne(int millisecondsTimeout, bool exitContext)

              {

                     if (OwnsMutex)

                     {

                           throw new InvalidOperationException(String.Format("Cannot acquire the mutex backed by queue {0} on the Service Bus endpoint at {1} because it is currently owned by this instance of {2}.", Name, Settings.ServiceUri, GetType().Name));

                     }

 

                     CloudMutex previousMutex;

                     if (_threadOwnedMutexes.TryGetValue(MutexQueueUri, out previousMutex))

                     {

                           Previous = previousMutex;

                           Previous.NextList.Add(InstanceID, this);

                           OwnsMutex = true;

                           AcquisitionMessage = Previous.AcquisitionMessage;

                     }

                     else

                     {

                           AcquisitionMessage = MutexQueueClient.Receive(TimeSpan.FromMilliseconds(millisecondsTimeout));

                           OwnsMutex = true;

                           _threadOwnedMutexes.Add(MutexQueueUri, this);

                     }

 

                     return OwnsMutex;

              }

 

              public void ReleaseMutex()

              {

                     if (!OwnsMutex)

                     {

                           throw new InvalidOperationException(String.Format("Cannot release the mutex backed by queue {0} on the Service Bus endpoint at {1}. It is not owned by this instance of {2}.", Name, Settings.ServiceUri, GetType().Name));

                     }

 

                     if (Previous != null)

                     {

                           if (!Previous.NextList.Remove(InstanceID))

                           {

                                  throw new InvalidOperationException(String.Format("Failed to remove the {0} instance with ID {1} from the previous {0}'s (ID: {2}) next list. The current instance was not found. This may indicate a bug in your software.", GetType().Name, InstanceID, Previous.InstanceID));

                           }

 

                           Previous = null;

                     }

                     else

                     {

                           if (!_threadOwnedMutexes.Remove(MutexQueueUri))

                           {

                                  throw new InvalidOperationException(String.Format("Failed to remove the {0} instance with ID {1} and path {2} from the thread owned mutex list. This may indicate a bug in your software.", GetType().Name, InstanceID, MutexQueueUri));

                           }

                     }

 

                     OwnsMutex = false;

                     var releaseMessage = new BrokeredMessage();

                     MutexQueueClient.Send(releaseMessage);

              }

 

              public void Delete()

              {

                     Trace.WriteLine(String.Format("{0} - About to delete queue '{1}' at {2}.", DateTime.Now, Name, Settings.ServiceUri), GetType().Name);

                     Settings.NamespaceClient.DeleteQueue(Name);

                     Trace.WriteLine(String.Format("{0} - Successfully deleted queue '{1}' at {2}.", DateTime.Now, Name, Settings.ServiceUri), GetType().Name);

              }

 

              #endregion

       }

}

 

The Meta Lock

I wanted this so that I could ensure that the basic infrastructure needed in service bus to support any one mutex could be created in a safe, isolated manner, free from race conditions.  All clients of this meta lock queue use the "peek lock" mechanism for accessing a singleton message within that queue.  The act of receiving the message from the queue is analagous to acquiring a lock on the entire service bus namespace for the purposes of these mutexes.  When the owner of the meta lock abandons the singleton message, that is analogous to releasing the lock. While this lock is held, one can be sure that no other thread is attempting to create service bus queues for mutexes so long as all parties respect the meta lock.  To make the meta lock easy to use in code, I've created a MetaLockScope class that implements the IDisposable interface so that the using keyword can be applied in code to demark the beginning and end of a protected code section.  The class has the following definition:

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Diagnostics;

 

using Microsoft.ServiceBus;

using Microsoft.ServiceBus.Messaging;

 

using Yeast.WindowsAzure.ServiceBus;

 

namespace Yeast.WindowsAzure.Threading

{

       public class MetaLockScope

              : IDisposable

       {

              #region Constants

 

              public const int DefaultServerWaitTimeInSeconds = 60;

 

              #endregion

 

              #region Constructors

 

              public MetaLockScope()

                     : this(ServiceBusSettings.FromConfiguration()

                           .Factory.CreateQueueClient(CloudMutex.MetaLockQueueName, ReceiveMode.PeekLock))

              {

              }

 

              public MetaLockScope(QueueClient metaLockQueueClient)

                     : this(metaLockQueueClient, TimeSpan.FromSeconds(DefaultServerWaitTimeInSeconds))

              {

              }

 

              public MetaLockScope(QueueClient metaLockQueueClient, TimeSpan serverWaitTime)

              {

                     try

                     {

                           MetaLockQueueClient = metaLockQueueClient;

                           Trace.WriteLine(String.Format("{0} - About to receive meta lock queue message from {1}", DateTime.Now, MetaLockQueueClient.Path), GetType().Name);

                           SingletonMessage = MetaLockQueueClient.Receive(serverWaitTime);

                           if (SingletonMessage == null)

                           {

                                  throw new Exception(String.Format("Failed to acquire the meta lock because the timeout expired. The server wait time is currently set to {0}.", serverWaitTime));

                           }

 

                           Trace.WriteLine(String.Format("{0} - Receive meta lock queue message with ID {1} from {2}", DateTime.Now, SingletonMessage.SequenceNumber, MetaLockQueueClient.Path), GetType().Name);

                     }

                     catch (Exception exception)

                     {

                           throw new Exception(String.Format("Failed to acquire the meta lock on queue {0} using {1} as the server wait time.", CloudMutex.MetaLockQueueName, serverWaitTime), exception);

                     }

              }

 

              #endregion

 

              #region Properties

 

              protected QueueClient MetaLockQueueClient

              {

                     get;

                     private set;

              }

 

              protected BrokeredMessage SingletonMessage

              {

                     get;

                     private set;

              }

 

              #endregion

 

              #region IDisposable

 

              public void Dispose()

              {

                     if (SingletonMessage != null)

                     {

                           Trace.WriteLine(String.Format("{0} - About to abandon meta lock queue message with ID {1} from {2}", DateTime.Now, SingletonMessage.SequenceNumber, MetaLockQueueClient.Path), GetType().Name);

                           SingletonMessage.Abandon();

                           Trace.WriteLine(String.Format("{0} - Successfully abandoned lock queue message with ID {1} from {2}", DateTime.Now, SingletonMessage.SequenceNumber, MetaLockQueueClient.Path), GetType().Name);

                     }

              }

 

              #endregion

       }

}

 

Acquiring and releasing the CloudMutex

With all of the scaffolding in place to support this cross machine synchronization mechanism laid out, I'll finally elaborate around how the two other key methods, WaitOne, and ReleaseMutex work internally.

 WaitOne

For each mutex, a unique service bus queue is required.  That queue will be in an "acquireable" state if a single message is present in the queue.  The act of receiving and deleting the message from the queue is done by the WaitOne method in order to acquire the mutex.  Because of the way that service bus queues behave, only one consumer will be allowed to receive that message and thus acquire the mutex.  All other callers will wait to receive a message, or return instantly without a message depending on the caller's choice.  Should multiple calls all attempt to acquire the mutex before it has been released, only the first caller will succeed while the others will themselves be queued up behind one another

 ReleaseMutex

To release the mutex, and this make it acquireable by another caller, a message must be placed back into the queue.  As such, the ReleaseMutex method underlyingly sends a message to the queue to effect the release.

 Notes

  • I am fully aware that this class is far from complete and many misues can be made of these classes to cause their behavior to fail.  Once this idea has been further vetted, I'll invest more time in plugging the many holes.
  • Full semaphore semantics are also possible with this design such that multiple acquisitions and releases could occur on a single "mutex" thus making the behavior much more like the Semaphore class in .NET

 

Supporting actor

The following enum exists in my source base that I'll share for clarity.  It doesn't do a terrible amount so I'll just post it for the curious.

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

 

namespace Yeast.WindowsAzure.Threading

{

       public enum CloudMutexConstructionMode

       {

              Full = 1,

              MetaLockOnly = 2

       }

}

 

 

Next steps

If anyone find this idea of a cloud mutex interesting, please let me know by commenting on this blog post and I'll look for some time to go deeper.

Finally, there is a class called ServiceBusSettings that I've included here along with a sister class called ServiceBusSection (a configuration section), that might deserve a blog post of their own.  That'll be my very next project unless something else catches my fancy first, if only to get feedback on other techniques that have worked well for others.  Suffice it to say though, these classes provide a means by which to access Windows Azure Service Bus credentials in a way that works whether running inside or outside of the compute emulator.  I consider that a general problem for which I have one general solution.

As always, I look forward to your input.   I post to share and I post to learn.

 

ServiceBusSection

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

 

using System.Configuration;

 

namespace Yeast.WindowsAzure.ServiceBus

{

       public class ServiceBusSection

              : ConfigurationSection

       {

              #region Shared Constants

 

              public const string SectionPath = "yeast/windowsAzure/serviceBus";

 

              #endregion

 

              #region Constructors

 

              static ServiceBusSection()

              {

                     _instance = ConfigurationManager.GetSection(SectionPath) as ServiceBusSection;

                     if (_instance == null)

                     {

                           _instance = new ServiceBusSection();

                     }

              }

 

              public ServiceBusSection()

              {

              }

 

              #endregion

 

              #region Properties

 

              private static ServiceBusSection _instance;

              public static ServiceBusSection Instance

              {

                     get

                     {

                           return _instance;

                     }

              }

 

              public const string ServiceNamespaceName = "serviceNamespace";

              [ConfigurationProperty(ServiceNamespaceName)]

              public string ServiceNamespace

              {

                     get

                     {

                           return (string)this[ServiceNamespaceName];

                     }

              }

 

              public const string IssuerNameName = "issuerName";

              [ConfigurationProperty(IssuerNameName)]

              public string IssuerName

              {

                     get

                     {

                           return (string)this[IssuerNameName];

                     }

              }

 

              public const string IssuerKeyName = "issuerKey";

              [ConfigurationProperty(IssuerKeyName)]

              public string IssuerKey

              {

                     get

                     {

                           return (string)this[IssuerKeyName];

                     }

              }

 

              public const string ServicePathName = "servicePath";

              [ConfigurationProperty(ServicePathName)]

              public string ServicePath

              {

                     get

                     {

                           return (string)this[ServicePathName];

                     }

              }

 

              public const string AllowRoleEnvironmentOverrideName = "allowRoleEnvironmentOverride";

              [ConfigurationProperty(AllowRoleEnvironmentOverrideName, DefaultValue = true)]

              public bool AllowRoleEnvironmentOverride

              {

                     get

                     {

                           return (bool)this[AllowRoleEnvironmentOverrideName];

                     }

              }

 

              #endregion

       }

}

  

ServiceBusSettings

  

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

 

using Microsoft.WindowsAzure;

using Microsoft.WindowsAzure.ServiceRuntime;

using Microsoft.ServiceBus;

using Microsoft.ServiceBus.Messaging;

 

namespace Yeast.WindowsAzure.ServiceBus

{

       /// <summary>

       /// Holds settings and methods useful in accessing Windows Azure Service Bus.

       /// </summary>

       /// <remarks>

       /// This class is not thread safe.

       /// </remarks>

       public class ServiceBusSettings

       {

              #region Shared Fields

 

              public const string ServiceBusUriScheme = "sb";

              public const string ConfigurationKeyPrefix = "Yeast.WindowsAzure.ServiceBus.";

 

              #endregion

 

              #region Constructors

 

              public ServiceBusSettings()

              {

                     Source = SettingsSource.Unknown;

              }

 

              #endregion

 

              #region Properties

 

              public const string ServiceNamespaceConfigurationKey = ConfigurationKeyPrefix + "ServiceNamespace";

              public string ServiceNamespace

              {

                     get;

                     set;

              }

 

              public const string IssuerNameConfigurationKey = ConfigurationKeyPrefix + "IssuerName";

              public string IssuerName

              {

                     get;

                     set;

              }

 

              public const string IssuerKeyConfigurationKey = ConfigurationKeyPrefix + "IssuerKey";

              public string IssuerKey

              {

                     get;

                     set;

              }

 

              public const string ServicePathConfigurationKey = ConfigurationKeyPrefix + "ServicePath";

              public string ServicePath

              {

                     get;

                     set;

              }

 

              public SettingsSource Source

              {

                     get;

                     private set;

              }

 

              private TokenProvider _credentials;

              public TokenProvider Credentials

              {

                     get

                     {

                           if (_credentials == null)

                           {

                                  _credentials = TokenProvider.CreateSharedSecretTokenProvider(IssuerName, IssuerKey);

                           }

 

                           return _credentials;

                     }

              }

 

              private Uri _serviceUri;

              public Uri ServiceUri

              {

                     get

                     {

                           if (_serviceUri == null)

                           {

                                  _serviceUri = ServiceBusEnvironment.CreateServiceUri(ServiceBusUriScheme, ServiceNamespace, ServicePath);

                           }

 

                           return _serviceUri;

                     }

              }

 

              private NamespaceManager _namespaceClient;

              public NamespaceManager NamespaceClient

              {

                     get

                     {

                           if (_namespaceClient == null)

                           {

                                  _namespaceClient = new NamespaceManager(ServiceUri, Credentials);

                           }

 

                           return _namespaceClient;

                     }

              }

 

              private MessagingFactory _factory;

              public MessagingFactory Factory

              {

                     get

                     {

                           if (_factory == null)

                           {

                                  _factory = MessagingFactory.Create(ServiceUri, Credentials);

                           }

 

                           return _factory;

                     }

              }

 

              public bool IsValid

              {

                     get

                     {

                           return

                                  !String.IsNullOrWhiteSpace(ServiceNamespace)

                                  && !String.IsNullOrWhiteSpace(IssuerName)

                                  && !String.IsNullOrWhiteSpace(IssuerKey);

                     }

              }

 

              #endregion

 

              #region Overrides

 

              public override bool Equals(object obj)

              {

                     var areEqual = false;

 

                     if (obj is ServiceBusSettings)

                     {

                           var operand = obj as ServiceBusSettings;

                           areEqual = ServiceUri == operand.ServiceUri;

                     }

 

                     return areEqual;

              }

 

              public override string ToString()

              {

                     return ServiceUri.ToString();

              }

 

              public override int GetHashCode()

              {

                     return ServiceUri.GetHashCode();

              }

 

              #endregion

 

              #region Methods

 

              public static ServiceBusSettings FromConfiguration()

              {

                     var settings = new ServiceBusSettings();

                    

                     settings.ServiceNamespace = ServiceBusSection.Instance.ServiceNamespace;

                     settings.IssuerName = ServiceBusSection.Instance.IssuerName;

                     settings.IssuerKey = ServiceBusSection.Instance.IssuerKey;

                     settings.ServicePath = ServiceBusSection.Instance.ServicePath;

                     settings.Source = SettingsSource.AppConfig;

 

                     if (!settings.IsValid)

                     {

                            settings.Source |= SettingsSource.Invalid;

                     }

 

                     if (ServiceBusSection.Instance.AllowRoleEnvironmentOverride && RoleEnvironment.IsAvailable)

                     {

                           var roleSettings = new ServiceBusSettings();

                           roleSettings.ServiceNamespace = RoleEnvironment.GetConfigurationSettingValue(ServiceNamespaceConfigurationKey);

                           roleSettings.IssuerName = RoleEnvironment.GetConfigurationSettingValue(IssuerNameConfigurationKey);

                           roleSettings.IssuerKey = RoleEnvironment.GetConfigurationSettingValue(IssuerKeyConfigurationKey);

                           roleSettings.ServicePath = RoleEnvironment.GetConfigurationSettingValue(ServicePathConfigurationKey);

                           roleSettings.Source = SettingsSource.RoleEnvironment;

 

                           if (roleSettings.IsValid)

                           {

                                  settings = roleSettings;

                           }

                           else

                           {

                                  roleSettings.Source |= SettingsSource.Invalid;

                           }

                     }

 

                     if (!settings.IsValid)

                     {

                           throw new Exception(String.Format("Unable to populate a {0} instance from configuration. The Source property is currently {1}.", settings.GetType().Name, settings.Source));

                     }

 

                     return settings;

              }

 

              #endregion

       }

}

Comments

  • Anonymous
    April 24, 2015
    This seems like an interesting Idea. I wonder why no one commented on this  :) l

  • Anonymous
    January 09, 2016
    Looks similar to: www.nuget.org/.../smarx.WazStorageExtensions. There is no permanent Mutex concept across machine - machines going down should be able to release lcoks automatically. Also, checking if Mutex still exists, is required at regular intervals...