Testing Azure Service Bus Topics on local computer
Article
Using Service Bus Topics allows us to communicate with loosely coupled components or systems using publish/subscribe model. The difference between Azure Queues and Topics is that using Topics the message can be easily routed to the selected subscriber by using filters while using only the queues this is hard to achieve without adding extra logic to the subscriber's adapters or connectors.
When publishing a message we can choose to route it to all subscribers or to apply filters for each subscription resulting in each subscriber receiving messages that are addressed to him. With Service Bus topics we can easily scale distributed applications communicating with each other within or across multiple networks.
In this article I will show you how to build and test Service Bus topic on your local computer. In our example we will simulate sending messages from the web, mobile and service application to the Service Bus Topic. These messages will be then routed to relevant subscriptions based on defined filters we assigned for each of them. Subscription for messages from the web application will be using multiple auto scalable worker roles to process the business logic. Same will apply for service messages. If we don’t expect a lot of traffic coming from mobile application, we can then use single worker role.
Autoscaling worker roles can be performed using Enterprise Library 5.0 – Autoscaling Application Block (aka WASABi). This will ensure that appropriate number of worker roles will be automatically started when traffic increases and stopped if the traffic will ease.
Diagram below shows the high level architecture
In order to test Service Bus Topics locally, we need to install “Service Bus 1.0 for Windows Server” (runs on Win7 as well). After installation we also need to configure it in order to set-up a farm on the local machine. Setting up the farm will allow us to create Service Bus namespace that we will use to communicate with the Topics.
Next, we need to create Topic, subscriptions and filters. In our case we want to simulate multiple subscribers receiving incoming messages from different sources; web, mobile and service application.
publicvoidCreateServiceBusTopicAndSubscriptions(NamespaceManager namespaceManager){ #region Configure and create Service Bus Topicvar serviceBusTestTopic =newTopicDescription(TopicName); serviceBusTestTopic.MaxSizeInMegabytes=5120; serviceBusTestTopic.DefaultMessageTimeToLive=newTimeSpan(0,1,0);if(!namespaceManager.TopicExists(TopicName)){ namespaceManager.CreateTopic(serviceBusTestTopic);} #endregion #region Create filters and subsctiptions//create filtersvar messagesFilter_Web =newSqlFilter("MessageOrigin = 'Web'");var messagesFilter_Mobile =newSqlFilter("MessageOrigin = 'Mobile'");var messagesFilter_Service =newSqlFilter("MessageOrigin = 'Service'");if(!namespaceManager.SubscriptionExists(TopicName,"WebMessages")){ namespaceManager.CreateSubscription(TopicName,"WebMessages", messagesFilter_Web);}if(!namespaceManager.SubscriptionExists(TopicName,"MobileMessages")){ namespaceManager.CreateSubscription(TopicName,"MobileMessages", messagesFilter_Mobile);}if(!namespaceManager.SubscriptionExists(TopicName,"WCfServiceMessages")){ namespaceManager.CreateSubscription(TopicName,"WCfServiceMessages", messagesFilter_Service);} #endregion}
Finally, we want to simulate receiving messages. We will do it by starting 3 subscriptions and waiting for incoming messages separately for each of them. After an message arrives, we will display it's source type and message body.
public override voidRun(){ Parallel.ForEach(SubscriptionClients, currentSubscrtiption =>{while(!IsStopped){ #region Receive messagestry{// Receive the messagevar receivedMessage = currentSubscrtiption.Receive();if(receivedMessage !=null){var messageFrom = receivedMessage.Properties["MessageOrigin"].ToString();switch(messageFrom){case"Web"://send it to web processing logicbreak;case"Mobile"://send it to mobile processing logicbreak;case"Service"://send it to service processing logicbreak;default:break;}// Process the message Trace.WriteLine(Environment.NewLine+"--------------------------"+Environment.NewLine); Trace.WriteLine(string.Format("{0} message content: {1}", messageFrom, receivedMessage.GetBody<string>())); receivedMessage.Complete();}}catch(MessagingException e){if(!e.IsTransient){ Trace.WriteLine(e.Message);throw;}Thread.Sleep(10000);}catch(OperationCanceledException e){if(!IsStopped){ Trace.WriteLine(e.Message);throw;}} #endregion}});}
The outcome of our test is as follows. Please note that we are differentiating messages by setting up message.Properties["MessageOrigin"] property string.