Delen via


Implementing Reliable Messaging and Communications with the Cloud

patterns & practices Developer Center

On this page:
Scenario and Context | Communicating with Transport Partners | Choosing a Communications Mechanism - Electronic Data Interchange (EDI), Web Services (Push Model), Web Services (Pull Model), Azure Storage Queues, Azure Service Bus Queues, Azure Service Bus Topics and Subscriptions | How Trey Research Communicates with Transport Partners - Sending Messages to a Service Bus Queue Asynchronously, Receiving Messages from a Service Bus Queue and Processing Them Asynchronously, Sending Messages to a Service Bus Topic, Subscribing to a Service Bus Topic, Receiving Messages from a Topic and Processing Them Asynchronously, Implementing Adapters and Connectors for Translating and Reformatting Messages, Correlating Messages and Replies, Securing Message Queues, Topics, and Subscriptions, Securing Messages | Sending Orders to the Audit Log | Choosing a Mechanism for Sending Orders to the Audit Log | How Trey Research Sends Orders to the Audit Log | Verifying Orders to Ensure Regulatory Compliance | Choosing Where to Host the Compliance Application | How Trey Research Hosted the Compliance Application | Summary | More Information

After Trey Research moved the web application that enables customers to place orders to the Microsoft Azure™ technology platform, migrated the various databases used by the application, and secured access to the application so that only authenticated customers can place orders, the next step was to consider how the details of orders could be passed to the various transport partners for shipping, and recorded for audit and compliance purposes. This aspect of the system is critical as it supports the core business function of Trey Research fulfilling customers' orders; the order handling process requires a reliable mechanism for transmitting orders securely and reliably.

Hh868041.note(en-us,PandP.10).gifJana Says:
Jana
                Reliable communication between the Orders application and the transport partners is essential. If the communication mechanism is prone to failure, messages could be lost, orders might not be fulfilled, and customers may go elsewhere.</td>

In this chapter, you will see how Trey Research addressed the various challenges associated with implementing the messaging and communications layer capable of handling orders in the cloud by using Azure Service Bus and Azure Connect.

Note

Chapter 5, "Processing Orders in the Trey Research Solution," describes how Trey Research use this communications and messaging layer as a foundation supporting the business logic for actually processing orders placed by customers.

Scenario and Context

In the original implementation of the Orders system, the elements of the Orders application ran on-premises, and the order processing workflow was performed in an environment that was completely controlled by Trey Research. Figure 1 illustrates the original application, with the components that handle the order processing highlighted.

Hh868041.E440BF74721AF888D03006F8249EEEC9(en-us,PandP.10).png

Figure 1

The order processing components in the on-premises application

In the on-premises solution, when a customer places an order the application stores the order details in the Orders table in the on-premises database. The Audit Log table in the on-premises database holds a range of information including runtime and diagnostic information, together with details of unusual orders such as those over a specific total value. The Orders application then sends a message to the appropriate transport partner. This message indicates the anticipated delivery date and packaging information for the order (such as the weight and number of packages). The transport partner sends a message back to the Orders application after the delivery is completed so that the Orders database table can be updated.

Due to the nature of the products Trey Research manufactures, it must also ensure that it meets legal requirements for the distribution of certain items, particularly for export to other countries and regions. These requirements include keeping detailed records of the sales of certain electronic components that may be part of Trey Research' products, and hardware items that could be used in ways not originally intended. Analyzing the contents of orders is a complex and strictly controlled process accomplished by a legal compliance application from a third party supplier, and it runs on a separate specially configured server.

When the Orders application moved to the cloud, Trey Research had to consider how to implement this business logic using the new architecture. As you will recall from Chapter 2, "Deploying the Orders Application and Data in the Cloud," Trey Research deployed the data to the SQL Azure™ technology platform. The Orders database was replicated in each datacenter, and the Orders application was modified to access the database co-located in the same datacenter that the user is connected to. Additionally, with the expectation that the volume of orders was likely to increase exponentially, the compliance application was relocated to the cloud to take advantage of the inherent scalability of Azure; the compliance application is multi-threaded and can take full advantage of the power of the platform on which it runs, so it was considered appropriate only to deploy it to a single datacenter. However, for regulatory reasons, it was necessary to retain the audit log on-premises. Figure 2 shows the structure of the resulting hybrid solution, again with the order processing elements highlighted.

Hh868041.AEF0D068DD5D443A7432DBA2B04D6370(en-us,PandP.10).png

Figure 2

The hybrid version of the Trey Research solution

As far as a customer is concerned, the Orders application works in similar way to the original solution, but the logic that processes an order is now implemented as follows:

  • When a customer places an order, the Orders application:
    • Stores the order details in the Orders table of the database in the local SQL Azure datacenter. All orders are synchronized across all Azure datacenters so that the order status information is available to visitors irrespective of the datacenter to which they are routed.
    • Sends an advice message to the appropriate transport partner. The transport company chosen depends on the delivery location.
    • Sends any required audit information, such as orders with a value over $10,000, to the Audit Log table of the database located in the head office datacenter. The on-premises management and monitoring applications can examine this information.
  • The third-party compliance application running in the cloud continually validates the orders in the Orders table for conformance with legal restrictions and sets a flag in the database table on those that require attention by managers. It also generates a daily report that it stores in a secure location in the head office datacenter.
  • When transport partners deliver an order to the customer they send a message to the Orders application (running in the datacenter that originally sent the order advice message) so that it can update the Orders table in the database.

Keep in mind that, for simplicity, some of the features and processes described here are not fully implemented in the example we provide for this guide, or may work in a slightly different way. This is done to make it easier for you to install and configure the example, without requiring you to obtain and configure Azure accounts in multiple datacenters, and for services such as Azure Data Sync and SQL Server Reporting Services.

In its simplest terms, the high-level structure of the cloud-based elements of the hybrid solution is reasonably straightforward. The Orders application running in the cloud naturally maps to a Azure web role, as described in Chapter 1, "The Trey Research Scenario," while the business logic that actually processes orders can be implemented as a Azure worker role. The operation of the order processing logic must be scalable to handle the expected growth in demand as Trey Research expand their customer base, and it must be reliable because orders must not be mislaid or duplicated.

The order processing business logic divides itself naturally into three specific areas; how to communicate with transport partners, how to determine which orders to audit, and how to pass orders to the compliance application to ensure that they do not violate regulatory requirements. Trey Research considered the implementation options for each of these areas in turn.

Communicating with Transport Partners

A key part of the order processing mechanism concerns the communication with the transport partners. The worker role must examine each order and direct the order details to the most appropriate transport partner. The definition of "most appropriate" is application-specific and may change over time, but currently it is an economic decision based on the proximity of the customer to the Trey Research manufacturing plant from where the orders are shipped. Orders for local customers (customers based in the same or a neighboring state as the Trey Research manufacturing plant) use a local transport partner, while orders for more remote customers require a distance transport partner capable of shipping goods by rail or air if necessary.

Note

After much negotiation and evaluation, Trey Research decided to use Contoso, Inc. to provide the local transportation services, while Fabrikam, Inc. was selected as the distance transport partner.

During the design phase, the development team at Trey Research insisted that all communications with the transport partners had to fulfill a number of criteria:

  • The solution must be responsive. All communication must operate in a timely manner that does not block the operations in the Orders application or adversely affect the experience of customers placing orders.
  • The communication mechanism must be robust and reliable. Once a customer places an order, and that order is confirmed, it must be fulfilled. The system must not be able to lose orders as messages are passed from the Trey Research solution to the appropriate transport partner, even if the connection with the transport partner fails or the transport partner's system suffers a failure.
  • The solution must be scalable. It must be possible to easily add further transport partners to the solution without needing to rewrite the order processing logic. Furthermore, it must be possible to host the Orders application at multiple sites in the cloud, again without requiring that this code is rewritten to handle multiple instances of the application.
  • The solution must be flexible. The actual list of transport partners may vary over time, and the rationale for selecting which partner to use for a specific order may also change as delivery costs fluctuate.
  • All data should be considered sensitive and must be protected appropriately. All reasonable security measures should be taken to prevent an unauthorized third party from intercepting the details of orders placed by a customer.
Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus
                The way in which messages are passed between distributed components in a hybrid application must be reliable, robust, responsive, scalable, and secure.</td>

Choosing a Communications Mechanism

In the original on-premises application, communication between Trey Research and the transport partners was achieved through a set of web service operations. Some of the calls to these operations originated from within Trey Research's network, but others required Trey Research to expose an on-premises web service that partners called to update the delivery status. After moving to the cloud, given the challenges listed above, Trey Research considered several options for implementing the connectivity between the equivalent business logic hosted in the worker role and the transport partners. The following sections summarize some of these options, together with their perceived advantages and limitations.

Electronic Data Interchange (EDI)

The worker role could connect to the transport partner over an interface, protocol, and format that the worker role and the transport partner both understand, such as EDIFACT, RosettaNet, cXML, and BASDA. These are commonly accepted and well understood standards that many larger organizations employ to exchange data with other businesses. Furthermore, most modern EDI protocols are asynchronous as the corresponding business processes are expected to be long-lived; this can help to ensure that the worker role remains responsive during times of high demand.

However, the worker role may require additional software and infrastructure to connect to an EDI interface. Microsoft BizTalk® Server provides adapters for many well-known protocols and formats, but this solution requires passing all orders through BizTalk Server running on-premises. For more information, see "Hybrid Reference Implementation Using BizTalk Server, Azure, Service Bus and SQL Azure " Each transport partner may expose a different EDI interface, making it difficult to easily extend the Trey Research business logic to additional partners, although it may be possible to abstract these differences into a connector layer within the worker role.

This approach would still require implementing a connector for each partner. Trey Research also had to consider that not all transport partners would necessarily provide an EDI interface to their systems. Finally, in this approach security and message protection is governed by the transport partner rather than by Trey Research.

Web Services (Push Model)

If the transport partner exposes a web services interface into its delivery system, the worker role could utilize this interface and invoke the appropriate operations to push the details of orders across to the transport partner. Web services are a common, well-understood, and maturing technology. Additionally, it is usually a straightforward task to invoke web service operations from a worker role. From a security perspective, requests can be easily encrypted, although the degree of security and message protection available is managed by the transport partner providing the web service.

There are some possible issues with this approach. Primarily, if the transport partner does not provide a web service interface, then this approach cannot be used. If a transport partner does provide such an interface, Trey Research also had to assess the possible complexity arising in the highly likely scenario that different transport partners may implement a different set of web service operations, and expect request messages in different formats. These differences could be abstracted by building a connector layer within the worker role and constructing custom connectors for each transport partner, but it adds complexity to the Trey research solution.

Another issue is that the web service may not provide an appropriate level of reliability. The worker role may be unaware if the transport partner's system raises an error that causes the details of the order to be lost; the transport partner may not know to call the operation that provides the details of the order again.

Connectivity is also an issue; if the web service at the transport partner is temporarily unavailable or a connection cannot be established, then Trey Research will not be able to send the details of any orders.

Web Services (Pull Model)

Web services can provide a secure, scalable, and reliable communication mechanism if they are implemented correctly. For this reason, Trey Research considered turning the previous option around by implementing a web service as part of the worker role, and exposing operations that transport partners invoke to retrieve (or pull) information about orders from the Trey Research application. This approach would give Trey Research full control over the degree of message protection and security available, the web service can take advantage of the inherent scalability of the worker role hosted by Azure, and it affords better reliability; if the transport partner's system fails while processing an order, it can reconnect to the web service and retrieve the details of the order again when it restarts.

The worker role can expose the same web services interface to all transport partners. New transport partners can be easily integrated without modifying the worker role. The worker role does not have to wait while the transport partner retrieves the details of orders so the system remains responsive to customers. Furthermore, the worker role can take advantage of Service Bus Relay to build location-independence and security into the solution; the transport partner can connect to a well-known endpoint advertised through Service Bus Relay which can authenticate the transport partner through the Azure Access Control Service (ACS), and then transparently route messages to the web service endpoints published by the worker role.

However, transport partners would be expected to develop their own software to connect to the web service; they may be unwilling or unable to develop and deploy custom software specifically for integrating with Trey Research. If the transport partner is willing to connect to the web service, it is the responsibility of the transport partner to query whether there are any orders to be shipped. If the transport partner does not perform this query often enough, then orders may not be dispatched in a timely manner, leading to customer complaints. If the transport partner's system fails to query the web service successfully, orders will not be shipped. Scalability within the transport partners systems may also be an issue. As the volume of orders increases, transport partners may not query the web service sufficiently often, causing a backlog of orders to build up.

Azure Storage Queues

The Orders application runs in a Azure role, so Trey Research considered posting the details of orders as messages to a Azure storage queue; transport partners could connect to this queue to retrieve the orders to be shipped. Delivery acknowledgement messages could be posted back to the worker role through another queue. This mechanism is relatively simple, reliable, scalable, and secure; Azure storage queues are managed and maintained within a datacenter, and Trey Research has full control over which transport partners would have access rights to connect to the queue to retrieve and post messages. Additionally, the semantics of the retrieve operation can be implemented in a reliable manner; if the transport partner fails with an error after receiving a message but before processing it, the message can be transparently returned to the queue from where it can be received when the transport partner restarts.

Azure provides a REST API for accessing Azure storage queues, so the transport partner can implement their system by using any technology that can connect to the web and transmit REST requests.

Again, there are some issues surrounding this approach. As with some of the preceding options, each transport partner must be willing to implement software that connects to the Azure storage queue and integrate it into their own solution. Also, to prevent a transport partner from receiving an order intended for a different transport partner, Trey Research must create a separate queue for each partner. This approach may complicate the logic inside the worker role, and also makes it more difficult to add or remove transport partners. Finally, security is controlled by using storage account keys rather than ACS, so each transport partner would have to be granted access to the entire storage account rather than an individual queue. To ensure that each transport partner only has access to the relevant queue, each queue must be created within a different storage account with its own key.

Azure Service Bus Queues

To counter some of the complexity issues of using Azure storage queues, a similar but more advantageous option is to post the details of orders as messages to a Azure Service Bus queue; transport partners can connect to this queue to retrieve the orders to be shipped. Delivery acknowledgement messages can be posted back to the worker role through another queue. This approach is highly scalable; the worker role can post messages to the Service Bus queue as quickly as orders are placed. The Service Bus infrastructure can buffer messages until they have been retrieved by the transport partner. It also offers improved reliability; after the worker role has posted a message to a Service Bus queue, it will not be lost. It will either remain on the queue until it expires (the expiration period for a message is configurable) or a transport partner retrieves it. Like a Azure storage queue, Service Bus queues support reliable retrieve operations so if the transport partner fails after receiving a message, the message can be transparently returned to the queue. Security is highly configurable and flexible, especially when compared to that available for Azure storage queues. It is managed through the Azure Access Control Service (ACS).

However, as before, each transport partner must be prepared to connect to the appropriate Service Bus queues to retrieve messages and send delivery acknowledgement messages. If the transport partner is not amenable to this approach, and instead insists that orders are passed across using its own system-defined interfaces (such as a set of web services), then Trey Research may need to build a custom component to retrieve messages from the queue and convert them into the appropriate format for the transport partner, and transmit them using an agreed protocol. The same component can include the logic for waiting for a delivery acknowledgement from the transport partner and posting a message to the worker role. Again, as before, to prevent a transport partner from receiving an order intended for a different transport partner, Trey Research may need to create a separate queue for each partner. This approach may complicate the logic inside the worker role, and also makes it more difficult to add or remove transport partners.

Note

For information comparing the features and possible uses of Azure storage queues and Azure Service Bus queues, see the article "Azure Queues and Azure Service Bus Queues - Compared and Contrasted" on MSDN.

Azure Service Bus Topics and Subscriptions

Service Bus queues provide an attractive and scalable alternative, except for the need to create and manage a separate queue for each partner. Therefore, the final option assessed by Trey Research was to post the details of orders as messages to a Azure Service Bus topic; transport partners subscribe to this topic to retrieve the orders to be shipped. Messages acknowledging receipt of the order details and messages indicating that delivery was completed are posted back to the worker role through a Service Bus queue.

Like Service Bus queues, Service Bus topics and subscriptions are highly scalable and reliable, with configurable security. However, they are more flexible than using Service Bus queues to transmit messages to a transport partner; the worker role can add metadata to messages that indicate which transport partner should process them, and then post these messages to a Service Bus topic. Each transport partner can connect to the Service Bus topic through its own subscription, which can filter the messages based on this metadata so that each transport partner receives only the orders that it should process. Topics also enable messages to be routed to multiple destinations, so orders with a value over $10,000 can additionally be directed to the Audit Log Listener.

The only real drawback to this approach, in common with most of the options described previously, is that each transport partner must be prepared to connect to the appropriate Service Bus topic to retrieve messages. Alternatively, Trey Research can build custom connectivity components to integrate with the transport partners' systems. There are also some limitations imposed on topics; for example, a topic can currently have a maximum of 2000 subscriptions and can support up to 100 concurrent connections (the limit of 100 concurrent connections also applies to queues). However, Trey Research considered that the Orders system was unlikely to hit either of these two limits.

How Trey Research Communicates with Transport Partners

In the end, Trey Research decided to send orders from the worker role to the transport partners by using a Service Bus topic. Each transport partner receives messages by using a subscription that filters the orders. In this way, each transport partner receives only the orders that it should ship. For more information about using Service Bus queues, topics, and subscriptions, see "Queues, Topics, and Subscriptions" on MSDN.

Hh868041.note(en-us,PandP.10).gifJana Says:
Jana
                Service Bus queues provide a reliable and scalable mechanism for communicating between services running in the cloud and applications running on-premises.</td>

To bridge the potential technology gap between the systems implemented by the transport partners and the Service Bus, Trey Research constructed a set of connectivity components to translate messages retrieved from the Service Bus and convert them into format expected by the transport partner. The location of these connectivity components depends on the relationship that Trey Research has with the transport partner:

In the case of the local transport partner, Contoso, Trey Research was able to convince the partner to install a connector and integrate it into their own proprietary system. Trey Research provided credentials necessary to enable the connector to listen to the appropriate Service Bus subscription. The transport partner's own system uses this connector to retrieves the details of orders from the subscription. Additionally, the connector exposes an interface that the transport partner's system uses to post acknowledgment messages back to the Service Bus queue that the Orders application listens on.

Note

Implementing a connector as part of the transport partner's system does not force the transport partner to incorporate .NET Framework code into their solution. The features of the Azure Service Bus are exposed through a series of HTTP REST APIs (the Azure SDK simply provides a .NET Framework wrapper around these APIs), so the transport partner can use any familiar technology that can generate REST requests and consume REST responses, including the Java programming language.

The distance transport partner, Fabrikam, is a multinational organization, and the operations staff were not willing to allow Trey Research to install software on their own servers, preferring Trey Research to connect using the interfaces that they provide to their systems. To accommodate this requirement, Trey Research implemented an adapter for posting orders to Fabrikam, and this adapter is hosted within the worker role. This mechanism enables the logic that posts messages to transport partners to remain the same, regardless of whether the partner is the local or a distance partner. If Trey research decides to add a new distance transport partner in the future, Trey Research simply needs to create and install an appropriate adapter.

Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus
                Implementing adapters and connectors enables applications to remain independent of the communication mechanism. If necessary, the Service Bus topic used by Trey Research could be switched to a different means of transferring information, and only the adapters and connectors would need to change.</td>

Figure 3 summarizes the technologies that Trey Research decided to use to implement messaging between the Orders application and the transport partners. The following sections describe the approach that Trey Research used to build their messaging solution based on these technologies.

Hh868041.65FC7366F38B77E15D92A52EC3E62D33(en-us,PandP.10).png

Figure 3

Messaging technologies used by Trey Research to communicate with transport partners

The sample Trey Research application that you can download for this guide implements many of the technologies and techniques described here. However, to simplify installation and setup, and reduce the prerequisites and the requirements for users to establish extensive Azure accounts, the feature set and some of the implementation details differ from the text of this guide.

Using the Azure SDK, you can implement applications that send and receive messages by using the MessageSender and MessageReceiver classes in the Microsoft.ServiceBus.Messaging namespace. However, these operations are synchronous. For example, the Send method of the MessageSender class waits for the send operation to complete before continuing, and similarly the Receive method of the MessageReceiver class either waits for a message to be available or until a specified timeout period has expired. These methods are really just façades in front of a series of HTTP REST requests, and the Service Bus queues and topics are remote services being accessed over the Internet. Therefore, your applications should assume that:

  • Send and receive operations may take an arbitrarily long time to complete, and your application should not block waiting for these operations to finish.
  • A sender can post messages at any time, and a receiver may need to listen for messages on more than one queue.
  • Send and receive operations could fail for a variety of reasons, such as a failure in connectivity between your application and the Service Bus in the cloud, a security violation caused by a change in the security implemented by the Service Bus queue or topic (an administrator might decide to revoke or modify the rights of an identity for some reason), the queue being full (they have a finite size), and so on. Some of these failures might the result of transient errors, while others may be more permanent.

Trey Research decided to implement a library that added wrappers around the Service Bus queue and topic functionality available in the Microsoft.ServiceBus.Messaging namespace. This library is provided with the sample solution, in the Orders.Shared project. The classes located in the Communication folder of this project encapsulate the existing MessageSender, MessageReceiver, and BrokeredMessage classes (amongst others). The purpose of the new classes is to abstract the send and receive functionality so that all send and receive operations are performed asynchronously. This library also incorporates elements of the security model implemented by Trey Research; for more information, see the section "Securing Messages, Queues, Topics, and Subscriptions" later in this chapter.

Note

For additional information and guidelines on optimizing performance when using Azure Service Bus messaging, see the topic "Best Practices for Performance Improvements Using Service Bus Brokered Messaging" on MSDN.

The following sections describe the structure of this library, the classes that it provides, and how these classes extend the functionality provided by Service Bus queues, topics, and subscriptions.

Sending Messages to a Service Bus Queue Asynchronously

Trey Research uses a Service Bus queue to enable transport partners to communicate with the Orders application. To send a message to a Service Bus queue by using the Orders.Shared library, an application performs the following steps:

  1. Create a BrokeredMessage object and populate it with the required information. The BrokeredMessage class is the type provided by Microsoft in the Microsoft.ServiceBus.Messaging namespace.

  2. Create a ServiceBusQueueDescription object and specify the Service Bus namespace, the queue name, and a set of valid credentials in the form of an access key and the name of the associated identity. The ServiceBusQueueDescription class is a member of the Orders.Shared project.

  3. Create a ServiceBusQueue object using the ServiceBusQueueDescription object. The ServiceBusQueue type encapsulates asynchronous functionality for sending messages. Creating an instance of the ServiceBusQueue type connects to the underlying Service Bus queue in PeekLock mode.

  4. Call the Send method of the ServiceBusQueue object. The parameter to the Send method must be a BrokeredMessageAdapter object that wraps the BrokeredMessage object created earlier. The ServiceBusQueue class contains an instance of the MessageSenderAdapter class (defined in the Communication\Adapters folder in the Orders.Shared project) which implements the IMessageSenderAdapter interface. The Send method uses this MessageSenderAdapter object to send the message.

    Note

    The MessageSenderAdapter class is actually just a wrapper class that was created to simplify unit testing with mock objects.

    For an example of using the ServiceBusQueue type to send messages, see the SendToUpdateStatusQueue method in the OrderProcessor class in the TransportPartner project.

Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus
                The <strong>ServiceBusTopic</strong> and <strong>ServiceBusSubscription</strong> classes in the <strong>Orders.Shared</strong> project implement a similar approach to <strong>ServiceBusQueue</strong>, encapsulating asynchronous functionality based on the <strong>MessageSender</strong> and <strong>MessageReceiver</strong> classes respectively. <br />The <strong>MessageSenderAdapter</strong>, <strong>MessageReceiverAdapter</strong>, and <strong>BrokeredMessageAdapter</strong> classes enable the unit tests (in the <strong>Orders.Shared.Tests</strong> project) to construct mock senders, receivers, and brokered messages.</td>

The following code fragment shows the implementation of the Send method in the ServiceBusQueue class, together with the relevant members used by the Send method:

Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus
                The <strong>Guard</strong> method that is used by methods in the <strong>ServiceBusQueue</strong> class and elsewhere checks that the named parameter has been initialized; it should not be null or an empty string.</td>
public class ServiceBusQueue
{
  private readonly ServiceBusQueueDescription description;
  ...
  private readonly IMessageSenderAdapter senderAdapter;
  ...

  public ServiceBusQueue(
    ServiceBusQueueDescription description)
  {
    Guard.CheckArgumentNull(description, "description");
    this.description = description;
    ...
    var sender = messagingFactory.CreateMessageSender(
      this.description.QueueName.ToLowerInvariant());
    this.senderAdapter = new MessageSenderAdapter(sender);
    ...
  }
  ...

  public void Send(IBrokeredMessageAdapter message)
  {
    Guard.CheckArgumentNull(message, "message");

    this.Send(message, this.senderAdapter); 
  }

  public void Send(IBrokeredMessageAdapter message,
                                        IMessageSenderAdapter sender)
  {
    Guard.CheckArgumentNull(message, "message");
    Guard.CheckArgumentNull(sender, "sender");

    Task.Factory                  
      .FromAsync(sender.BeginSend, sender.EndSend, message,
                 null,TaskCreationOptions.AttachedToParent)
      .ContinueWith(
        taskResult =>
        {
          try
          {
            if (taskResult.Exception != null)
            {
              TraceHelper.TraceError(
                taskResult.Exception.ToString());
            }
          }
          finally
          {
            message.Dispose();
          }
        });
  }
  ...
}
Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus Make sure your code correctly disposes of a BrokeredMessage instance you create after use to ensure that all of the resources it uses are released.

In the ServiceBusQueue class, the processing performed by the Send method requires attaching the processing as a child task by using the TaskCreationOptions.AttachedToParent option. In this way, a failure in the child task while sending the message can be detected and handled by the parent, enabling the parent to abandon the Receive operation more easily. In this example, any exceptions are simply logged by using the static TraceError message of the TraceHelper class. The TraceHelper class is defined in the Helpers folder in the Orders.Shared project. This class simply acts as a wrapper around the trace event handlers provided by the System.Diagnostics library and is described in more detail in Chapter 7, "Monitoring and Managing the Orders Application."

Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus Notice that the ServiceBusQueue class does not utilize the Transient Fault Handling Exception Block. This is because using the Transient Fault Handling Application Block to start asynchronous processes does not provide the same flexibility as using a Task object. When considering using the Transient Fault Handling Application Block, you should weigh up the advantages of the declarative neatness of the way in which critical code can be executed and retried, against the fine control that you may require when running this code as a background task.

Receiving Messages from a Service Bus Queue and Processing Them Asynchronously

The ServiceBusQueue class creates and exposes a MessageReceiver object that you can use to receive messages, through the GetReceiver method. This is an ordinary message receiver object with no additional functionality, and calling the Receive method on this object performs a synchronous receive operation. In its simplest form, a receiver using this technique may spend a lengthy period of time being blocked while waiting for messages to appear. Additionally, when a message arrives, it may require significant effort to perform the required processing, during which time more messages may arrive. These messages will not be processed until the receiver finishes its current work and retrieves the next message. If a message is urgent, this response may not be acceptable.

The MessageReceiver class also supports asynchronous operations through the BeginReceive and EndReceive operations. The ServiceBusReceiverHandler type, also in the Orders.Shared project, extends this functionality to provide a class that can receive and process messages asynchronously while decoupling the business logic and exception-handling process from the code that connects to the queue.

The ServiceBusReceiverHandler class provides a method called ProcessMessages that an application can use to asynchronously wait for messages arriving on a Service Bus queue and process them (the application specifies the queue to listen on as a parameter to the constructor of this class.) The following code sample shows the constructor and the implementation of the ProcessMessages method.

public class ServiceBusReceiverHandler<T>
{
  private readonly IMessageReceiverAdapter receiver;
  private Func<T, ServiceBusQueueDescription, string, Task> 
    messageProcessingTask;

  public ServiceBusReceiverHandler(
    IMessageReceiverAdapter receiver)
  {
    ...
    this.receiver = receiver;
  }

  ...

  // The Func parameter (that returns the Task) allows the
  // caller more control on the task result and the 
  // exception handling
  public void ProcessMessages(Func<T, 
      ServiceBusQueueDescription, string, Task> 
    taskForProcessingMessage, 
    CancellationToken cancellationToken)
  {
    ...
    this.messageProcessingTask = taskForProcessingMessage;

    this.ReceiveNextMessage(cancellationToken);
  }

  ...
}

The ProcessMessages method expects a delegate as its first parameter. This delegate should reference a method that will be run each time a message is received. The purpose of this delegated method is to perform whatever business logic the application requires on receipt of each message (for a detailed example of this logic, see the section "Receiving and Processing an Order in a Transport Partner" in Chapter 5, "Processing Orders in the Trey Research Solution"). The ProcessMessages method stores this delegate locally and then calls the local ReceiveNextMessage method, as shown in the following code sample.

...
public TimeSpan? MessagePollingInterval { get; set; }
...
private void ReceiveNextMessage(
  CancellationToken cancellationToken)
{
  if (this.MessagePollingInterval.HasValue)
  {
    Thread.Sleep(this.MessagePollingInterval.Value);
  }

  Task.Factory
    .FromAsync<TimeSpan, 
      IBrokeredMessageAdapter>(this.receiver.BeginReceive,
                               this.receiver.EndReceive,
                               TimeSpan.FromSeconds(10),
                               null,
                               TaskCreationOptions.None)
    .ContinueWith(
      taskResult =>
      {
        // Start receiving the next message as soon as we
        // received the previous one.
        // This will not cause a stack overflow because the
        // call will be made from a new Task.
                    
        this.ReceiveNextMessage(cancellationToken);

        if (taskResult.Exception != null)
        {
          TraceHelper.TraceError(
            taskResult.Exception.Message);
        }

        this.ProcessMessage(taskResult.Result);
      },
      cancellationToken);
}

The ReceiveNextMessage method implements a simple polling strategy; it sleeps for a configurable period of time before attempting to receive a message from the queue (the message queue is read in PeekLock mode). The receive operation is performed asynchronously, and if a message is available the method starts a new task to listen for any subsequent messages and then calls the ProcessMessage method to process the newly received message.

private void ProcessMessage(
  IBrokeredMessageAdapter message)
{
  if (message != null)
  {
    ...
    this.messageProcessingTask(message.GetBody<T>(), 
                               queueDescription, token)
     .ContinueWith(
       processingTaskResult =>
       {
         if (processingTaskResult.Exception != null)
         {
           if (message.DeliveryCount <= 3 &&
               !(processingTaskResult.Exception.
                 InnerException is InvalidTokenException))
           {
             // If the abandon fails, the message will 
             // become visible anyway after the lock 
             // times out
             Task.Factory.FromAsync(message.BeginAbandon, 
                 message.EndAbandon, message, 
                 TaskCreationOptions.AttachedToParent)
               .ContinueWith(
                 taskResult =>
                 {
                   if (taskResult.Exception != null)
                   {        
                     TraceHelper.TraceError(
                       "Error while message abandon: {0}",
                        taskResult.Exception.
                          InnerException.Message);
                   }

                   var msg = taskResult.AsyncState 
                     as BrokeredMessage;
                   if (msg != null)
                   {
                     msg.Dispose();
                   }
                 });
           }
           else
           {
             Task.Factory.FromAsync(
                 message.BeginDeadLetter, 
                 message.EndDeadLetter, message, 
                 TaskCreationOptions.AttachedToParent)
               .ContinueWith(
                 taskResult =>
                 {
                   if (taskResult.Exception != null)
                   {
                     TraceHelper.TraceError(
                       "Error while sending message to "
                       + "the DeadLetter queue: {0}", 
                       taskResult.Exception.
                         InnerException.Message);
                   }

                   var msg = taskResult.AsyncState 
                     as BrokeredMessage;
                   if (msg != null)
                   {
                     msg.Dispose();
                   }
                 });
             TraceHelper.TraceError(
               processingTaskResult.
                 Exception.TraceInformation());
           }
         }
         else
         {
           Task.Factory
             .FromAsync(message.BeginComplete, 
                message.EndComplete, message,  
                TaskCreationOptions.AttachedToParent)
             .ContinueWith(
               taskResult =>
               {
                 if (taskResult.Exception != null)
                 {
                   TraceHelper.TraceError(
                     "Error while executing "
                     + "message. Complete: {0}", 
                     taskResult.Exception.
                       InnerException.Message);
                 }
                 var msg = taskResult.AsyncState 
                   as BrokeredMessage;
                 if (msg != null)
                 {
                   msg.Dispose();
                 }
               });
         }
    });
  }
}
Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus The polling interval acts as a regulator to help prevent parts of the system becoming overloaded. The ideal value for the polling interval depends on the computing resources available at the transport partner, the expected volume of orders, and the number of worker role instances. For example, specifying a small polling interval if the transport partners have limited computing resources is probably a poor choice, especially during periods of high demand when a large number of orders are generated. In this case a lengthier interval between messages allows the transport partners' systems to function more effectively, with the topic effectively acting as a load-leveling buffer.

The ProcessMessage method invokes the delegated method provided by the receiving application, which it has stored in messageProcessingTask property, to process the message. The ProcessMessage method implements a simple but effective policy for handling exceptions raised while receiving or processing messages. For example, if a system exception occurs while receiving the message, the ProcessMessage method will attempt to abandon the message and release any locks; a subsequent invocation of the ReceiveNextMessage method may be able to read the message successfully if the error was only transient. However, if the same message fails to be received three times, or processing fails as the result of an authentication failure (if the simple web token received from the transport partner is invalid), the message is posted to the dead letter queue. If the message is processed successfully, the ProcessMessage method calls the asynchronous version of the Complete method to remove the message from the queue.

Figure 4 illustrates the flow of control through a ServiceBusReceiverHandler object when a receiving application retrieves and processes a message.

Follow link to expand image

Figure 4

Flow of control when receiving messages through a ServiceBusReceiverHandler object

Sending Messages to a Service Bus Topic

Although transport partners send messages to the Orders application through a Service Bus queue, the worker role in the Trey Research implementation uses a Service Bus topic to send orders to each transport partner. Service Bus topics are similar to queues with one important difference; the messages posted to a topic can be filtered by using a Service Bus subscription and directed to a specific listener attached to that subscription. The filtering is based on metadata added to the message before it is sent, and only subscriptions that specify a filter that matches the value of this metadata will receive the message.

Trey Research used this mechanism to add a property called TransportPartnerName to each order message that specifies the transport partner that should process the message. In this way, each order message will be received only by the intended transport partner that should ship the order. Additionally, Trey Research also added a property called OrderAmount to each message. The auditing application subscribes to the same topic as the transport partners, but filters messages, retrieving and auditing the details of all orders with a value of more than $10,000 as specified by this property. The following code, taken from the Execute method of the NewOrderJob class in the Jobs folder of the Orders.Workers project, shows an example of how the Trey Research solution populates the properties of a message to direct it to a specific transport partner.

var brokeredMessage = new BrokeredMessage(msg)
{
  ...
  Properties = { 
    { "TransportPartnerName", transportPartnerName }, 
    ... 
    { "OrderAmount", orderProcess.Order.Total } },
  ...
};

Note

The NewOrderJob class is described in more detail in Chapter 5, "Processing Orders in the Trey Research Solution."

The Service Bus methods used to send messages to a topic are very similar to those used to send messages to a queue. However, to manage the minor differences in these methods, the developers at Trey Research created two custom classes specially targeted at making it easier to use Service Bus topics. These classes are ServiceBusTopicDescription and ServiceBusTopic, and are located in the Communication folder of the Orders.Shared project.

In the Trey Research sample solution there are only small differences between the ServiceBusQueueDescription class and the ServiceBusTopicDescription class. The ServiceBusQueue class (which encapsulates the functionality of a Service Bus queue) and the ServiceBusTopic class (which provides similar functionality, but for a Service Bus topic) also differ, but in a few more significant ways, primarily due to the way in which Trey Research uses these types rather than any underlying differences in the mechanisms that they expose for sending messages:

  • Unlike the ServiceBusQueue class, the ServiceBusTopic class does not instantiate a receiver. Clients will subscribe to a topic when required by creating a suitable receiver. This removes coupling between the sender and receiver, and allows different clients to subscribe and receive messages without needing to reconfigure the topic. It also separates the business logic from the message routing concerns.
  • The ServiceBusQueue class sends the message and only raises an exception (which it logs) if sending the message fails. In contrast, the ServiceBusTopic class accepts two Action delegates that it executes when the message has been sent, or when there is an error. This approach enables Trey Research to incorporate more extensive exception handling when sending order details to a transport partner than was deemed necessary when posting order status messages back to the Orders application.
  • The ServiceBusQueue class uses the static FromAsync method of the Task.Factory class to send messages asynchronously. In contrast, the ServiceBusTopic class uses the Enterprise Library Transient Fault Handling Application Block to detect transient errors when posting a message to a topic, and transparently retries the Send operation when appropriate. The rationale behind this approach is similar to that described in the previous point.

The following code shows the definition of the Send method in the ServiceBusTopic class. As described above, two Action methods are passed to the method as parameters (one to execute after a message is sent and one to execute if sending fails), together with a function that creates the message, and a state object.

public void Send(Func<BrokeredMessage> createMessage,
                 object objectState, 
                 Action<object> afterSendComplete,
                 Action<Exception, object> processError)
{
  ...
}

Code can only read the body of a BrokeredMessage instance once. When you implement a method that uses a BrokeredMessage instance and may be executed more than once, as is the case when using the Transient Fault Handling Application Block, you must create and populate the BrokeredMessage instance each time you call the method. This is why the Send method accepts a function that creates the message, instead of accepting an existing message instance.

The Send method uses the Transient Fault Handling Block. The constructor for the ServiceBusTopic class initializes the block, loads the default policy for Azure Service Bus connectivity, and sets up a handler for the Retrying event that writes information to Azure diagnostics.

this.serviceBusRetryPolicy = RetryPolicyFactory.
  GetDefaultAzureServiceBusRetryPolicy();

this.serviceBusRetryPolicy.Retrying += (sender, args) => 
   TraceHelper.TraceWarning("Retry in ServiceBusTopic - "
     + "Count:{0}, Delay:{1}, Exception:{2}",
     args.CurrentRetryCount, args.Delay,
     args.LastException);

The Send method then calls one of the asynchronous overloads of the ExecuteAction method of the Transient Fault Handling Block and passes in the required parameters, as shown in the following code extract. These parameters are the asynchronous start and end delegates, an action to execute after the process completes successfully, and an action to execute if it fails after retrying a number of times (the retry policy parameters are specified in the configuration file).

this.serviceBusRetryPolicy.ExecuteAction<BrokeredMessage>(
  ac =>
  {
    var message = createMessage();
    var dictionary 
        = (objectState as Dictionary<string, object>);
    if (dictionary.ContainsKey("message"))
    {
      dictionary["message"] = message;
    }
    else
    {
      dictionary.Add("message", message);
    }
    this.sender.BeginSend(message, ac, objectState);
  },
  ar =>
  {
    this.sender.EndSend(ar);
    return (ar.AsyncState as Dictionary<string,
            object>)["message"] as BrokeredMessage;
  },
  (message) =>
  {
    try
    {
      afterSendComplete(objectState);
    }
    catch (Exception ex)
    {
      TraceHelper.TraceError(ex.Message);
    }
    finally
    {
      message.Dispose();
    }
  },
  e =>
  {
    processError(e, objectState);
    var message = (objectState as Dictionary<string,
                object>)["message"] as BrokeredMessage;
    message.Dispose();
  });

The asynchronous start delegate (ac) first calls the function passed to the Send method as the createMessage parameter to create the BrokeredMessage instance. Next, it obtains a reference to a Dictionary stored in the object state (which is also passed to the Send method as a parameter) and adds to it a reference to the BrokeredMessage instance. It must hold on to a reference to the BrokeredMessage so that it can be disposed correctly afterwards. The code then calls the BeginSend method of the Service Bus MessageSender instance referenced by the sender property of the ServiceBusTopic class. It passes as parameters the BrokeredMessage instance to send, a reference to the callback provided by the Transient Fault Handling Application Block, and a Dictionary containing a copy of the message as the object state. A reference to this copy of the brokered message is maintained so that the code can dispose it and correctly release the resources it uses. This occurs in the actions that are executed after sending the message, regardless of whether the send operation is successful or if it fails.

Note

The Dictionary provides a thread-safe object that holds the state information referenced by the BeginSend and EndSend methods that send a message to the queue asynchronously.

The asynchronous end delegate (ar) first calls the EndSend method of the Service Bus MessageSender instance. Next, it extracts the Dictionary containing the message from the object state and returns it as a BrokeredMessage instance. This is passed to the action that is executed when the message is successfully sent.

If the process successfully posts the message to the topic, it invokes the Action referenced by the afterSendComplete parameter. If it fails to execute this action, the code uses the TraceHelper class to log an error message.

If the process fails to post the message to the topic, it invokes the Action referenced by the processError parameter. The code passes to the processError action the exception returned from the MessageSender class and the object state containing the message. After the processError action completes, the code obtains a reference to the BrokeredMessage instance stored in the objectState variable and disposes it.

Subscribing to a Service Bus Topic

One of the major advantages of using Service Bus topics to distribute messages is that it provides a level of decoupling between the sender and receivers. The sender can construct a message with additional properties that filters within the topic use to redirect the message to specific receivers. However, receivers must subscribe to receive messages, and the number of subscribers is independent of the topic. For example, Trey Research can add new transport partners and arrange to send messages to these new partners simply by editing the filter criteria in the topic. New receivers can subscribe to a topic and receive messages that match the appropriate filtering criteria. Trey Research could add additional subscribers that listen for messages and pass them to auditing or other types of services.

In the Orders application, Trey Research created a single Service Bus topic for each deployed instance of the application (in other words, there is one topic per datacenter). All of the transport partners subscribe to all of these topics, and receive messages destined for them based on the filter rules Trey Research established for the choice of transport partner.

The Service Bus subscriptions and filters themselves are created by the SetupServiceBusTopicAndQueue method in the setup program in the TreyResearch.Setup project. The following code shows the relevant parts of this method.

private static void SetupServiceBusTopicAndQueue()
{
  ...
  // Create one subscription per transport partner with 
  // corresponding filter expression.
  var transportPartners = new[] { 
    "Contoso", "Fabrikam" };
  for (int i = 0; i <= 1; i++)
  {
    string transportPartnerName = transportPartners[i];
    string formattedName = transportPartnerName.Replace(
      " ", string.Empty).ToLowerInvariant();
    ...

    var serviceBusTopicDescription = 
      new ServiceBusSubscriptionDescription
      {
        Namespace = ServiceBusNamespace,
        TopicName = TopicName,
        SubscriptionName = string.Format(
          "{0}Subscription", formattedName),
        Issuer = Issuer,
        DefaultKey = DefaultKey
      };

    var serviceBusSubscription = 
      new ServiceBusSubscription(
        serviceBusTopicDescription);

    string filterExpression = string.Format(
      "TransportPartnerName = '{0}'", 
        transportPartnerName);
    serviceBusSubscription.CreateIfNotExists(
     filterExpression);
    ...
  }
}

Trey Research implemented two custom classes (ServiceBusSubscriptionDescription and ServiceBusSubscription, located in the Communication folder of the Orders.Shared project) for connecting to subscriptions. The ServiceBusSubscriptionDescription class specifies the properties for a subscription, indicating which topic in which Service Bus namespace to connect to and the name of the subscription to use. The following code example shows the definition of this class. Note that in the Trey Research example, the code populates the SubscriptionName property with the name of the transport partner; remember that each Service Bus subscription filters messages by using this property.

public class ServiceBusSubscriptionDescription
{
  public string Namespace { get; set; }
  public string TopicName { get; set; }
  public string SubscriptionName { get; set; }
  ...
}

The constructor of the ServiceBusSubscription class accepts a populated instance of the ServiceBusSubscriptionDescription class and connects to the specified topic and subscription, as shown in the following code. This method also creates a MessageReceiver for the topic subscription.

public ServiceBusSubscription(
       ServiceBusSubscriptionDescription description)
{
  ...
  var runtimeUri 
    = ServiceBusEnvironment.CreateServiceUri("sb",
                    this.description.Namespace, 
                    string.Empty);
  var messagingFactory 
    = MessagingFactory.Create(runtimeUri, ...);

  this.receiver 
    = messagingFactory.CreateMessageReceiver(
             this.description.TopicName.ToLowerInvariant() 
             + "/subscriptions/" +
             this.description.SubscriptionName
                    .ToLowerInvariant(),
             ReceiveMode.PeekLock);
}

Receiving Messages from a Topic and Processing Them Asynchronously

To receive a message from a topic, an application can use a ServiceBusReceiverHandler object initialized with the receiver encapsulated within a ServiceBusSubscription object. For more information about the ServiceBusReceiverHandler class, see the section "Receiving Messages from a Service Bus Queue and Processing Them Asynchronously" earlier in this chapter. The following code example shows how an application can create a ServiceBusReceiverHandler object to receive NewOrderMessage messages from the subscription (the NewOrderMessage class is described in Chapter 5, "Processing Orders in the Trey Research Solution.")

var serviceBusSubscription = new ServiceBusSubscription(
            ...);
var receiverHandler 
  = new ServiceBusReceiverHandler<NewOrderMessage>
          (serviceBusSubscription.GetReceiver())
{
  MessagePollingInterval = TimeSpan.FromSeconds(2)
};
...
Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus The message polling interval you specify for receiving messages from a queue or topic must take into account variables specific to your environment (such as CPU processing power) and the expected volume of work (such as the number of orders to process and number of worker role instances).

An application can then call the ProcessMessages method of the ServiceBusReceiverHandler instance it just retrieved, and pass it a delegate specifying the code to be executed as each message is received. Again, this process was described in the section "Receiving Messages from a Service Bus Queue and Processing Them Asynchronously" earlier in this chapter. The following code shows an example:

  ...
receiverHandler.ProcessMessages(
  (message, queueDescription, token) =>
  {
    return Task.Factory.StartNew(
      () => this.ProcessMessage(message,
                                queueDescription),
      this.tokenSource.Token,
      TaskCreationOptions.None,
      context);
  },
  this.tokenSource.Token);

Implementing Adapters and Connectors for Translating and Reformatting Messages

As described in the section "Selected Option for Communicating with Transport Partners" earlier in this chapter, Trey Research uses connectors and adapters for retrieving messages from the Service Bus subscription for each transport partner, and then translates these messages into a format that the transport partner understands before handing the message off for processing.

Note

In the solution code provided with this guide, mock versions of the local and distance transport partners are both implemented by means of a Windows Forms application. For the local transport partner, Contoso, the connector is integrated into the Windows Forms. For the distance transport partner, Fabrikam, the connector is implemented in a similar manner as part of the Windows Forms code for this partner. However, this is for simplicity and demonstration purposes only; in the real implementation Trey Research incorporates the adapter for the distance partner into the worker role as described in the second bullet point.

The Trey Research solution includes two sample transport partners; one that handles deliveries to local customers that reside in the same state or neighboring states as Trey Research and another that delivers goods to more distant customers. These transport partners are defined in the ContosoTransportPartner and FabrikamTransportPartner Windows Forms classes in the TransportPartner project. Both transport partners implement their own systems for tracking and delivering packages.

Contoso, the local transport partner runs a connector on its own infrastructure that connects directly to the Azure Service Bus to retrieve and send messages. This functionality is implemented in the Connector class in the Connectivity folder. Fabrikam, the distance transport partner exposes a service interface, and an adapter running as part of the Trey Research solution interacts with the Service Bus and reformats messages into service calls; responses from the service are reformatted as messages and posted back to the Service Bus. The adapter is implemented in the Adapter class, also located in the Connectivity folder.

When the transport partner receives a request to deliver an order, the connector or adapter (depending on the transport partner) posts an acknowledgement message to a Service Bus queue. This queue constitutes a well-known but secure endpoint, available to all transport partners. The Connector and Adapter classes are both descendants of the OrderProcessor class (defined in the Connectivity folder in the TransportPartner project), and this class actually handles the connectivity between the transport partner and the Service Bus. In the FabrikamTransportPartner Windows Forms class, the flow of control is:

  • The OnLoad method instantiates the Adapter object and invokes its Run method. The Run method of the Adapter class is inherited from the OrderProcessor class.
  • The Run method in the OrderProcessor class creates a ServiceBusReceiverHandler object to connect to the Service Bus subscription on which it expects to receive orders, and calls the ProcessMessages method of this object.
  • The first parameter to the ProcessMessages method in the ServiceBusReceiverHandler class is a delegated function (specified as a lambda expression in the sample code) that provides the business logic to be performed when an order is received from the topic.
  • The ServiceBusReceiverHandler object invokes this function after it has received each message. This strategy enables you to decouple the mechanics for receiving a message from a queue or topic (as implemented by the ServiceBusReceiverHandler class) from the logic for converting this message into the format expected by the transport partner and sending this request to the internal system implemented by the partner.

The following example, taken from the OrderProcessor.cs file, shows how this code is structured.

public void Run()
{
  ...
  foreach (...)
  {
    ...
    var receiverHandler = new  
      ServiceBusReceiverHandler<...>(...);

    receiverHandler.ProcessMessages(
      (message, ..., ...) =>
      {
        return Task.Factory.StartNew(
          // Message conversion logic goes here. 
          // The message parameter contains the body of
          // the message received from the topic.
          () => this.ProcessMessage(
                  message, ...),
          ...);
      }, ...);
  }
}

In the OrderProcessor class, the lambda expression invokes the local ProcessMessage method (not to be confused with ServiceBusReceiverHandler.ProcessMessages) to pass the message to the local partner's internal system and wait for a response by calling the ProcessOrder method (this method provides logic that is specific to the transport partner and is implemented in the Adapter class.) Because the ProcessMessage method runs by using a separate task, it can wait synchronously for the ProcessOrder method to complete without affecting the responsiveness of the application. The following code example shows part of the implementation of the ProcessMessage method in the OrderProcessor class.

Note

Many of the details of the ProcessMessage method, such as the purpose of the trackingId variable, and the operations performed by the ProcessOrder method in the Connector and Adapter classes provided in the sample solution are explained in detail in Chapter 5, "Processing Orders in the Trey Research Solution".

protected virtual void ProcessMessage(
  NewOrderMessage message, 
  ServiceBusQueueDescription queueDescription)
{
  var trackingId = this.ProcessOrder(
    message, queueDescription);

  if (trackingId != Guid.Empty)
  {
    ...
    this.SendOrderReceived(message, 
      queueDescription, statusMessage, trackingId, token);
  }
}

When the order has been processed by the transport partner, the ProcessMessage method invokes the local SendOrderReceived method of the OrderProcessor object to send an appropriate response message back to the Order application through the Service Bus queue specified as the second parameter to the ProcessMessage method.

Note

The details of the SendOrderReceived method are also described in Chapter 5, "Processing Orders in the Trey Research Solution".

Correlating Messages and Replies

Unlike web service operations, messaging implemented by using Service Bus Message queues and topics is an inherently one-way mechanism. Although sometimes viewed as a limitation, this is actually what makes this form of messaging extremely responsive; a sender does not have to wait for a response from a distant, possible unreliable receiver whenever it posts a message. However, there will inevitably be cases when a sender expects some form of reply, even if it is only an acknowledgement that the receiver has actually received the message. This is precisely the situation in the Trey Research scenario. When the Orders application posts the details of an order to a topic, the application expects to receive a response that indicates the order has been received.

However, there may be a significant time between these two events, and the Orders application must not block waiting for the response to arrive. To address this situation, Trey Research implements two-way messaging by using a combination of Service Bus topics and queues. The Orders application posts order messages to a Service Bus topic, and expects the responses from the various transport partners to appear on a separate Service Bus queue. The key question is how does the Orders application know which response belongs to which order message? The answer lies in using message correlation.

When the worker role for the Orders application sends an order message to a transport partner, it populates the MessageId property with the identifier for the order (this identifier is generated when the order is created), and it also specifies the name of the queue on which the Orders application expects a response in the ReplyTo property, as shown in the following code sample taken from the Execute method in the NewOrderJob class.

var brokeredMessage = new BrokeredMessage(msg)
{
  MessageId = msg.OrderId.ToString(),
  ...
  ReplyTo = this.replyQueueName
};
Hh868041.note(en-us,PandP.10).gifMarkus Says:
Markus Passing the address to which a receiving application should post a response by using the ReplyTo property of a message decouples the receiving application from using a specific hard-coded queue.

The transport partner constructs an OrderStatusUpdateMessage object as a reply and then posts this message to the queue specified by the ReplyTo property of the original order message. In the Trey Research example, this logic occurs in the SendToUpdateStatusQueue method (invoked by the SendOrderReceived method) in the OrderProcessor class. Chapter 5, "Processing Orders in the Trey Research Solution" describes the message flow of through the transport partners in more detail.

The worker role receives the response on the specified Service Bus queue. When a response message is received it is used to update the status of the order in the Orders database. This functionality is implemented in the StatusUpdateJob class in the worker role, which is also described in detail in Chapter 5, "Processing Orders in the Trey Research Solution."

Securing Message Queues, Topics, and Subscriptions

A key requirement of the messaging solution is that all messages should be protected from unauthorized access. As described in Chapter 3, "Authenticating Users in the Orders Application," the Azure Service Bus uses the ACS to protect Service Bus queues, topics, and subscriptions. To connect to a queue, topic, or subscription, an application must present a valid authentication token.

To secure the communication channels Trey Research defined rules in ACS to allow the local and distance transport partners to connect to the Service Bus subscription on which the Orders application posts order messages; the transport partners are granted the "Listen" claim to the subscription to enable them to receive messages only. The worker role in the Orders application is granted the "Send" claim over the topic to enable it to post messages only.

Note

For more information about these claims and how to configure ACS to authenticate clients and authorize access to Service Bus artifacts, see "Service Bus Authentication and Authorization with the Access Control Service" on MSDN.

For the Service Bus queue that the worker role listens to for response messages, the privileges are reversed; the transport partners are granted the "Send" claim to the queue while the worker role has the "Listen" claim.

Note

The various ACS rules and rule groups and identities used by the Orders application and the transport partners are created by the setup program in the TreyResearch.Setup project.

For completeness, the following table summarizes how Trey Research configured ACS to enable Service Bus authentication for applications and services connecting to the various Service Bus queues, topics, and subscriptions in the Orders application.

Service Bus artifact

Setting

Service identities.

AuditLogListener, Fabrikam, HeadOffice, Contoso, NewOrderJob, NewOrdersTopic, owner, StatusUpdateJob.

Default Service Bus (relying party).

Name: ServiceBus.

Realm: http://treyresearch.servicebus.windows.net/

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group containing: If name identifier="owner" emit "Manage", "Send", and "Listen" action claims.

Service Bus topic (relying party) for sending new order details to transport partners and the on-premises audit log.


Name: NewOrdersTopic

Realm: http://treyresearch.servicebus.windows.net/neworderstopic

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group for ServiceBus

Rule group containing: If name identifier="NewOrderJob" emit "Send" action claim.

Subscriptions:

Local ("Contoso") and distance ("Fabrikam") shipping partners.

Audit log service.

Service Bus queue (relying party) that transport partners use to send messages to the Orders application that:

Acknowledge receipt of new order details.

Indicate that the order has been delivered.

Name: OrderStatusUpdateQueue

Realm: http://treyresearch.servicebus.windows.net/

orderstatusupdatequeue

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group for Service Bus.

Rule group containing: If name identifier="Contoso" emit "Send" action claim.

Rule group containing: If name identifier="Fabrikam" emit "Send" action claim.

Rule group containing: If name identifier="StatusUpdateJob" emit "Listen" action claim.

Transport partner (relying party) for local deliveries (Contoso, Inc.)

Name: Contoso

Realm: http://treyresearch.servicebus.windows.net/

neworderstopic/subscriptions/contososubscription

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group for ServiceBus.

Rule group containing: If name identifier="Contoso" emit "Listen" action claim.

Transport partner (relying party) for distance deliveries (Fabrikam Inc.)

Name: Fabrikam

Realm: http://treyresearch.servicebus.windows.net/

neworderstopic/subscriptions/fabrikamsubscription

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group for ServiceBus.

Rule group containing: If name identifier="Fabrikam" emit "Listen" action claim.

On-premises management and monitoring application (relying party). Subscribes to Topic to collect audit log messages.

Name: AuditLogListener

Realm: http://treyresearch.servicebus.windows.net/

neworderstopic/subscriptions/ auditloglistenersubscription

Claim issuer: ACS. Token type: SWT.

Rule groups:

Default rule group for ServiceBus.

Rule group containing: If name identifier="AuditLogListener" emit "Listen" action claim.

The worker role and transport partners are each configured with the appropriate storage account key, and they present this information when they connect to the Service Bus queue, topic, or subscription in the form of a simple web token. For example, the Run method in the NewOrderJob class in the worker role uses the following code to extract the key and issuer details from the application configuration and store them in a ServiceBusTopicDescription object, which is in turn used to create a ServiceBusTopic object.

public void Run()
{
  ...
  this.serviceBusNamespace = CloudConfiguration.
    GetConfigurationSetting("serviceBusNamespace", 
      string.Empty);
  this.acsNamespace = CloudConfiguration.
    GetConfigurationSetting("acsNamespace", string.Empty);
  var topicName = CloudConfiguration.
    GetConfigurationSetting("topicName", string.Empty);
  var issuer = CloudConfiguration.
    GetConfigurationSetting("newOrdersTopicIssuer", 
      string.Empty);
  var defaultKey = CloudConfiguration.
    GetConfigurationSetting("newOrdersTopicKey", 
      string.Empty);
  ...

  var serviceBusTopicDescription = 
    new ServiceBusTopicDescription
    {
      Namespace = this.serviceBusNamespace,
      TopicName = topicName,
      Issuer = issuer,
      DefaultKey = defaultKey
    };

  this.newOrderMessageSender = 
       new ServiceBusTopic(serviceBusTopicDescription);
  ...  
}

The constructor in the ServiceBusTopic class uses this information to create a token provider for a MessageFactory object. The MessageFactory object is used to construct the MessageSender object that the ServiceBusTopic object utilizes to actually post messages to the underlying Service Bus topic.

...
private readonly ServiceBusTopicDescription description;
private readonly TokenProvider tokenProvider;
private readonly MessageSender sender;
...

public ServiceBusTopic(
  ServiceBusTopicDescription description)
{
  ...            

  this.description = description;
  this.tokenProvider = TokenProvider.
    CreateSharedSecretTokenProvider(           
      this.description.Issuer, 
      this.description.DefaultKey);

  var runtimeUri = ServiceBusEnvironment.
    CreateServiceUri("sb", this.description.Namespace, 
                     string.Empty);
  var messagingFactory = MessagingFactory.Create(
    runtimeUri, this.tokenProvider);
  this.sender = messagingFactory.CreateMessageSender(
    this.description.TopicName.ToLowerInvariant());
  ...
}

The constructors of the ServiceBusQueue and ServiceBusSubscription classes follow a similar pattern.

Securing Messages

To help prevent spoofing, Trey Research also implements a mechanism to verify the identity of a transport partner posting messages to the Service Bus queue on which the Orders application listens. This helps to ensure that a rogue third party is not somehow impersonating a valid transport partner and sending fake messages. To accomplish this, each time a transport partner sends a message, it adds a simple web token in the header of the message that indicates the identity of the sender, and the receiver in the Orders application validates the token when each message arrives.

Adding tokens to the header and validating them cannot be achieved just by configuring the Service Bus artifacts and ACS. Instead, Trey Research uses the following code to obtain a token from ACS. This code is taken from the OrderProcessor class; this is the base class from which the Adapter and Connector classes used by the transport partners descend.

private string GetToken(ServiceBusQueueDescription
                        queueDescription)
{

  var realm = string.Format("urn:{0}/{1}",
        queueDescription.QueueName,   
        HttpUtility.UrlEncode(this.acsServiceIdentity));

  var token = GetTokenFromAcs(string.Format(
          "https://{0}.accesscontrol.windows.net/", 
          queueDescription.SwtAcsNamespace), 
        this.acsServiceIdentity, this.acsPassword, realm);

  return token;
}

private string GetTokenFromAcs(string acsNamespace, 
               string serviceIdentity, string password, 
               string relyingPartyRealm)
{
  // request a token from ACS
  var client = new WebClient();
  client.BaseAddress = acsNamespace;
  var values = new NameValueCollection();
  values.Add("wrap_name", serviceIdentity);
  values.Add("wrap_password", password);
  values.Add("wrap_scope", relyingPartyRealm);
  byte[] responseBytes = client.UploadValues(
         "WRAPv0.9/", "POST", values);
  string response = Encoding.UTF8.GetString(responseBytes);
  return HttpUtility.UrlDecode(
      response
        .Split('&')
        .Single(value =>
          value.StartsWith("wrap_access_token=",
            StringComparison.OrdinalIgnoreCase))
        .Split('=')[1]);
}

The GetTokenFromAcs method (also shown in the previous code example) sends a request for a token to ACS. The GetToken method passes to it values extracted from the application's configuration file for the service identity name, password, and realm to create the appropriate token for this sender's identity.

After obtaining a suitable token, the transport partner can add it to the message that it posts to the Service Bus queue.

When a message is received, the receiver can extract the token and use it to verify the identity of the sender. For example, the worker role uses the IsValidToken method in the StatusUpdateJob class shown in the following code example to establish whether the token extracted from a message is valid.

private bool IsValidToken(Guid orderId, string token)
{
  string transportPartner;
  ...

  string acsServiceNamespace = CloudConfiguration.
    GetConfigurationSetting("acsNamespace", null);
  string acsUsername = CloudConfiguration.
    GetConfigurationSetting("acsUsername", null);
  string acsPassword = CloudConfiguration.
    GetConfigurationSetting("acsUserKey", null);

  var acsWrapper = new ServiceManagementWrapper(
    acsServiceNamespace, acsUsername, acsPassword);
  var relyingParty = acsWrapper.
    RetrieveRelyingParties().
    SingleOrDefault(
      rp => rp.Name.Contains(transportPartner));

  var keyValue = string.Empty;

  if (relyingParty != null)
  {
    var key = relyingParty.
      RelyingPartyKeys.
      FirstOrDefault();
      ...
    keyValue = Convert.ToBase64String(key.Value);
  }

  // Values for trustedAudience: 
  //  urn:[queue-name]/[partner-name]
  var trustedAudience = string.Format("urn:{0}/{1}",
    CloudConfiguration.GetConfigurationSetting(
      "orderStatusUpdateQueue", string.Empty),
    HttpUtility.UrlEncode(transportPartner));

  var validator = new TokenValidator(
    "accesscontrol.windows.net",      
    RoleEnvironment.GetConfigurationSettingValue(
      "acsNamespace"), 
    trustedAudience, keyValue);

    return validator.Validate(token);
}

The IsValidToken method program uses the classes in the ACS.ServiceManagementWrapper project to retrieve information about the various relying parties configured in ACS. For more information about the ACS.ServiceManagementWrapper, see "Access Control Service Samples and Documentation" at https://acs.codeplex.com/releases/view/57595.

The IsValidToken method also uses a separate class named TokenValidator in the Orders.Workers project to actually validate the token given the ACS hostname, the service namespace, the audience value, and the signing key.

Sending Orders to the Audit Log

Currently, all orders with a value over $10,000 must be audited, and the audit log is held on-premises. The order processing logic must be able to quickly determine the total cost of an order and direct the details to the audit log. This processing must happen quickly without impinging on the responsiveness to customers, it must be scalable as the volume of orders increases, and it must be flexible enough to allow the auditing criteria to be quickly changed, again without extensively rewriting the code for the worker role. As with orders sent to transport partners, all audit information is extremely sensitive and must be protected against unauthorized access, especially as it traverses the network.

Choosing a Mechanism for Sending Orders to the Audit Log

Once Trey Research had settled on the use of Service Bus topics as the mechanism for communicating with transport partners, they decided to use the same approach for auditing messages. When a customer places an order, the total value of the order is calculated and added as a property called OrderAmount to the message. If the value is more than $10,000, it is picked up by the audit subscription and sent to the on-premises auditing application at Trey Research. Figure 5 highlights how this technology fits into the Trey Research solution. Notice that the audit log uses the same Service Bus topic as the transport partners, but with a subscription that applies a different filter.

Note

Remember that if a message posted to a topic satisfies the filter associated with more than one subscription, a copy of the message will be routed to all matching subscriptions.

Hh868041.19F2E1ABE3EC50A51594B698639E3A75(en-us,PandP.10).png

Figure 5

Messaging technology used by Trey Research to route orders to the audit log

How Trey Research Sends Orders to the Audit Log

The total value of the order is added as the OrderAmount property to every order message posted by the worker role to the Service Bus topic. The Trey Research application identifies all orders that require auditing by creating a Service Bus subscription with an appropriate filter. The code that creates this Service Bus subscription is located in the SetupAuditLogListener method in the setup program in the TreyResearch.Setup project. The following code example shows the parts of this method that configure the filter.

private static void SetupAuditLogListener()
{
  var formattedName = AuditLogListener.Replace(" ", 
    string.Empty).ToLowerInvariant();
  ...
  var serviceBusTopicDescription = 
    new ServiceBusSubscriptionDescription
    {
      Namespace = ServiceBusNamespace,
      TopicName = TopicName,
      SubscriptionName = string.Format(
        "{0}Subscription", formattedName),
      Issuer = Issuer,
      DefaultKey = DefaultKey
    };

  var serviceBusSubscription = 
    new ServiceBusSubscription(serviceBusTopicDescription);
  const int AuditAmount = 10000;
  var filterExpression = string.Format(
    "OrderAmount > {0}", AuditAmount);
  serviceBusSubscription.CreateIfNotExists(
    filterExpression);

  ...
}

The Trey Research Head Office web application in the HeadOffice project includes the AuditController class, which connects to this subscription and retrieves orders to be audited. The DownloadLogs method in this class contains the code that actually retrieves the details of the orders to be audited. Note that this method connects to the Service Bus topic in each datacenter in which the Orders application runs; each instance of the Orders application posts messages to the topic in its local datacenter. The name of the subscription to use, the name of the topic, and the security keys are stored in the configuration file with the application.

public ActionResult DownloadLogs()
{
  ...
  var serviceBusNamespaces = WebConfigurationManager.
    AppSettings["AuditServiceBusList"].Split(',').ToList();
  ...

  foreach (var serviceBusNamespace in serviceBusNamespaces)
  {
    // Connect to servicebus, download messages from the 
    // Audit log subscription, save to database.
    var serviceBusTopicDescription = 
      new ServiceBusSubscriptionDescription
      {
        Namespace = serviceBusNamespace,
        TopicName = WebConfigurationManager.
          AppSettings["topicName"],
        SubscriptionName = WebConfigurationManager.
          AppSettings["subscriptionName"],
        Issuer = WebConfigurationManager.
          AppSettings["issuer"],
        DefaultKey = WebConfigurationManager.
          AppSettings["defaultKey"]
      };

    var serviceBusSubscription = new 
      ServiceBusSubscription(serviceBusTopicDescription);

    // MessagePollingInterval should be configured taking 
    // into consideration variables such as CPU 
    // processing power, expected volume of orders to 
    // process and number of worker role instances
    var receiverHandler = 
      new ServiceBusReceiverHandler<NewOrderMessage>(
        serviceBusSubscription.GetReceiver()) {   
          MessagePollingInterval = 
           TimeSpan.FromSeconds(2) };

    receiverHandler.ProcessMessages(
      (message, queueDescription, token) =>
      {
        return Task.Factory.StartNew(
          () => this.ProcessMessage(
            message, queueDescription),
          ...);
      },
      ...);
  }

  return RedirectToAction("Index");
}

The ProcessMessage method (called by the ProcessMessages method of the ServiceBusReceiverHandler object) simply saves the order message data to a local SQL Server database.

public void ProcessMessage(NewOrderMessage message, 
  ServiceBusQueueDescription queueDescription)
{
  // Save the AuditLog to the database
  var auditLog = new AuditLog
  {
    OrderId = message.OrderId,
    OrderDate = message.OrderDate,
    Amount = Convert.ToDecimal(message.Amount),
    CustomerName = message.CustomerName
  };

  this.auditLogStore.Save(auditLog);
}

Verifying Orders to Ensure Regulatory Compliance

The final challenge concerns integration with the compliance application. This application examines orders for compliance with export restrictions and government regulations for technical products. The compliance application communicates with the Orders database using a standard SQL Server connection string, and executes queries to determine compliance on a pre-determined schedule. Additionally, the compliance application generates reports that are stored in a secure on-premises location within the Trey Research Head Office.

When the application was deployed on-premises, it accessed the Orders database and the secure reporting location that were also located on-premises. Now that the Orders database is located in the cloud, the compliance application must connect to a SQL Azure instance. This is a simple configuration issue that can be easily resolved. However, the volume of traffic between the compliance application and the Orders database is considerable as the compliance application executes its many data queries and searches. These factors led Trey Research to consider in more depth how the application itself should be deployed.

The source code for this application is confidential and not available; a government department specifies the processes it must follow and certifies the operation. This makes it difficult if not impossible to refactor the application as a worker role. In addition, the reporting functionality requires authenticated connectivity to the appropriate server, and all data transmitted over this connection must be secure.

Choosing Where to Host the Compliance Application

For hosting the compliance application, Trey Research decided to install and configure the application using the Azure Virtual Machine (VM) role. This solution balances the need to configure, deploy, and maintain a VM role in the cloud, close to the orders data being examined, against the alternative of retaining the compliance application on-premises and either connecting to the orders data in the cloud or transferring the data from the cloud to an on-premises database.

Hh868041.note(en-us,PandP.10).gifJana Says:
Jana When deciding whether to deploy an application to a VM role, you need to consider the benefits of reducing the network overhead of a chatty application such as the compliance application connecting to a database in the cloud against the cost of maintaining and managing the VM role.

The compliance application needs to access the secure location where it stores the various reports that it generates. This location is on an on-premises server, and Trey Research decided to use Azure Connect to provide an authenticated, secure virtual network connection between the VM role and this server.

Trey Research chose to deploy the VM role to the US North Data Center as it is the closest datacenter to the Head Office, hopefully minimizing any network latency that may result from connecting across the on-premises/cloud boundary.

How Trey Research Hosted the Compliance Application

This section is provided for information only, showing how a solution could be implemented. The Trey Research example application does not actually include the compliance application or the corresponding VM role.

The VM role that hosts the compliance application examines data held in the SQL Azure Orders database. The VM role is deployed to the US North Data Center, but the compliance application generates reports that are stored in a secure on-premises location within the Trey Research head office infrastructure. The compliance application also sends data to the monitoring application, which is also located on-premises; this application exposes a series of Distributed Component Object Model (DCOM) interfaces to which the compliance application connects for this purpose.

Trey Research implemented a separate small domain with its own Domain Name System (DNS) service in the on-premises infrastructure specifically for hosting the Azure Connect endpoint software, the reporting data, and the monitoring application. Reports are stored in a share protected by using an Access Control List (ACL). Access is granted to an account defined within the domain. The compliance application, which is joined to the domain, provides these credentials when writing reports to this share. The same approach is used to protect the DCOM interface exposed by the monitoring application.

This domain has a trust relationship with the primary domain within Trey Research, and the management application running in the primary domain can periodically retrieve the reporting data and analyze the information logged by the monitoring application. Figure 6 shows the structure of the compliance system.

Hh868041.CE472309570E47D21D62EFBAB22D3C85(en-us,PandP.10).png

Figure 6

Structure of the compliance system

Summary

This chapter has looked at how Trey Research used two important Azure technologies to implement a reliable cross-boundary communication layer based on Service Bus topics, subscriptions, and queues. These technologies provide a foundation that you can use to construct elegant hybrid solutions comprising components that need to communicate across the cloud/on-premises divide.

Service Bus queues enable you to implement asynchronous messaging that helps to remove the temporal dependency between the client application posting a request and the service receiving the request. Message-oriented applications are highly suited to use in cloud environments as they can more easily handle the variable volumes and peak loading that is typical of many commercial systems, and can easily be made robust enough to handle network and communications failure. Using Service Bus queues, you can implement a number of common messaging patterns and adapt them as appropriate to the requirements of your system.

Service Bus topics and subscriptions enable you to intelligently route messages to services. An application can post messages to a topic and include metadata that a filter can use to determine which subscriptions to route the message through. Services listening on these subscriptions then receive all matching messages. This simple but powerful mechanism enables you to address a variety of scenarios, and easily construct elegant solutions for these scenarios.

Finally, Azure Connect enables you to establish a virtual network connection between a role hosted in the cloud and your on-premises infrastructure, and is suitable for situations where you need a direct connection between components rather than a message-oriented interface. You can share data across this network connection in a similar manner to accessing resources shared between computers running on-premises.

More Information

All links in this book are accessible from the book's online bibliography available at: https://msdn.microsoft.com/en-us/library/hh871440.aspx.

Next Topic | Previous Topic | Home

Last built: June 4, 2012