Udostępnij za pośrednictwem


Processing Orders in the Trey Research Solution

patterns & practices Developer Center

On this page:
Scenario and Context | Processing Orders and Interacting with Transport Partners | How Trey Research Posts Messages to a Topic in a Reliable Manner - Recording the Details of an Order, Sending an Order to a Service Bus Topic from the Orders Application, The NewOrderJob Class, Locking Orders for Processing, Handling Orders with Expired Locks, Posting Orders to the Service Bus Topic, Creating a New Order Message, Sending the New Order Messages, Completing the Reliable Send Process | How Trey Research Decouples the Order Process from the Transport Partners' Systems - Receiving and Processing an Order in a Transport Partner, Acknowledging an Order or Indicating that it has Shipped in a Transport Partner, Receiving Acknowledgement and Status Messages in the Orders Application | Summary | More Information

Having established an asynchronous messaging layer for communicating between the Orders application and the transport partners based on Service Bus topics, subscriptions, and queues, the developers at Trey Research were able to turn their attention to implementing the business logic for processing orders. This is the primary function of the Orders application running on the Microsoft Azure™ technology platform, and it was important for Trey Research to establish a robust mechanism that ensures orders will not be mislaid.

In this chapter, you will see how Trey Research designed the order processing and reporting logic to take full advantage of the scalability of the messaging framework, while ensuring that customers' orders are managed correctly and reliably.

Scenario and Context

Trey Research utilizes the services of external transport partners to ship orders to customers. These transport partners may implement their own systems, and no two transport partners necessarily follow the same procedures for handling orders; they simply guarantee that once Trey Research has provided them with the details of an order and a shipping address, they will collect the goods from the Trey Research manufacturing plant and deliver the goods to the customer.

For the reasons described in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," Trey Research decided to use Service Bus topics, subscriptions, and queues to communicate with the transport partners. Using this infrastructure, after storing the details of a new order in the local database running on the SQL Azure™ technology platform, the Orders application posts these details to a well-known Service Bus topic and the transport partners each use their own Service Bus subscription to retrieve orders and ship them (each subscription has a filter that ensures that a transport partner receives only the orders that it should ship, and it does not have access to the orders intended for other transport partners.) Each transport partner responds with one or more messages indicating the current state of the shipping process; these messages are posted to a Service Bus queue. The Orders application retrieves these messages and uses them to update the status of the orders in the local SQL Azure database. Customers can use the Orders application to examine the status of their orders.

Figure 1 illustrates the logical flow of messages and data through the Orders application and the transport partners. Note that although the Order application always sends a single order message to a transport partner for each new order placed, the message or messages returned by the transport partner depend entirely on the internal systems implemented by that transport partner, and the Orders application simply records the details of each response as a status message in the SQL Azure database. This variation is reflected in the Trey Research sample solution. The local transport partner, Contoso, sends two messages; the first message acknowledges receipt of the order and responds with a tracking ID for the order while the second message is sent when the order has actually been shipped. The distance transport partner, Fabrikam, only sends a reply acknowledging the order, again containing a tracking ID.

Note

Many commercial partners provide their own web applications that customers can use to query the delivery status of an order; the customer simply has to provide the tracking ID. This functionality is outside the scope of Trey Research.

Hh868039.8B149161D70EB604780D1170C1EFABF8(en-us,PandP.10).png

Figure 1

Logical flow of messages and data when a customer places an order

Note

Orders with a value above $10,000 must be audited, and they are retrieved from the Service Bus topic by using a separate Service Bus subscription and directed to the audit log. The implementation of this part of the order process is described in the section "How Trey Research Sends Orders to the Audit Log" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud." Additionally, each order is subjected to checking against export regulations by a separate compliance application. The section "How Trey Research Hosted the Compliance Application" (also in Chapter 4) describes how Trey Research configured the solution to support this requirement.
Neither of these two aspects of the order process is covered further in this chapter.

Processing Orders and Interacting with Transport Partners

Although simple in theory, there were a number of technical and logistical challenges, spread across two main areas of concern, which Trey Research had to address when implementing the order process.

  • The process must be reliable and scalable.

    Once a customer has placed an order, the order must be fulfilled and should not be lost. Service Bus topics and queues provide a reliable mechanism for routing and receiving messages and, once posted, a message will not be lost. However, the act of posting a message involves connecting to a topic or queue across the Internet, and this is a potential source of failure. The order process must guarantee that orders are posted successfully.

    Additionally, the volume of orders may vary significantly over time, so the business logic implementing the order handling process must be scalable to enable it to post messages quickly without consuming excessive resources.

  • The order process must be decoupled from the internal logic, number, and location of transport partners, as well as the location of the datacenter running the Orders application.

    The transport partners internal systems may be unable to connect directly to the Service Bus subscriptions and queues provided by Trey Research. Administrators at transport partners may be unwilling to install Trey Research's code on their systems.

    Furthermore, Trey Research may work with the services of different transport partners over time. The solution must be flexible enough to allow transport partners to be enrolled or removed from the system quickly and easily.

    Finally, the Orders application may be running in more than one datacenter, each hosting its own set of Service Bus topics, subscriptions, and queues. Each transport partner must be prepared to receive orders from a subscription in any of these datacenters, and post response messages back to the queue in the correct datacenter.

The following sections describe how Trey Research resolved these issues in their implementation.

How Trey Research Posts Messages to a Topic in a Reliable Manner

When a customer places an order the Orders application, the web role saves the order to a database. Subsequently, the worker role retrieves the order, posts the delivery details as a message to a transport partner through a Service Bus topic, and updates the status of the order to indicate that it has been sent. It is vital that the send and update operations both succeed, or that a corresponding recovery action or notification occurs if one of these operations fails; this is essentially the definition of a transaction. However, at the time of writing, Service Bus messaging only provides transactional behavior within the Service Bus framework. Azure Service Bus does not currently support the use of the Microsoft Distributed Transaction Coordinator (DTC), so you cannot combine SQL Azure database operations with a Service Bus send or receive operation within the same transaction.

For this reason, Trey Research decided to implement a custom implementation that keeps track of the success or failure of each send operation that posts order messages to the Service Bus topic, and maintains the status of the order accordingly. This mechanism implements a pseudo-transaction; it is arguably a more complex process than may be considered necessary, but it can be more successful in countering transient faults (such as those caused by an intermittent connection to a Service Bus topic) and when using asynchronous messaging operations. This approach is also highly extensible; it is likely to be most useful when there are many operations to perform, or where there is a complex inter-relationship between operations.

Hh868039.note(en-us,PandP.10).gifMarkus Says:
Markus
                It is vital to design your messaging code in such a way that it can cope with failures that may arise at any point in the entire messaging cycle, and not just when code fails to send or receive a message.<br />Frameworks, components, and documentation are available to help you implement retry logic for Azure messaging, such as "<a href="http://windowsazurecat.com/2011/09/best-practices-leveraging-windows-azure-service-bus-brokered-messaging-api/">Best Practices for Leveraging Azure Service Bus Brokered Messaging API</a>." </td>

Figure 2 shows a high-level overview of the solution Trey Research implemented to ensure that orders placed by customers are stored in the database and the delivery request is successfully sent to a transport partner and the audit log.

Hh868039.801334C99C2D16798B8B164736EE0B49(en-us,PandP.10).png

Figure 2

The custom transactional and retry mechanism implemented for the Trey Research Orders application

The implementation uses separate database tables that store the details of the order and the current status of each order. When a customer places an order, the web role populates these tables using a database transaction to ensure that they all succeed. If there is an error, it notifies the administrator.

This approach separates the task of saving the order from the tasks required to process the order, which are carried out by the worker role, and releases the web role to handle other requests. It also means that the web role can display information such as the order number to the customer immediately, and the customer will be able to view the current status of all of their orders without needing to wait until the order processing tasks have completed.

Hh868039.note(en-us,PandP.10).gifMarkus Says:
Markus
                If you need to perform complex processing on messages before posting them to a queue, or handle multiple messages and perhaps combine them into one message, you might consider doing this in the web role and then storing the resulting message in the database. This approach can improve performance and reduce the risk of failure for worker roles, but can have a corresponding negative impact on the responsiveness of the web roles. It also splits the logic for the order processing task across the worker roles and web roles, with a resulting loss of separation of responsibility. </td>

The worker role then carries out the tasks required to complete the process. It sends each message to a Service Bus topic that passes it to the transport partner to notify that a delivery is pending, and to the audit log when the order value exceeds a specific amount.

The worker role also listens for a response from a transport partner that indicates the message was received. This response will contain information such as the delivery tracking number that the worker role stores in the database (this part of the process is outside the scope of the custom retry mechanism and is not depicted in Figure 2).

At each stage of the overall process, the worker role updates the order status in the database to keep track of progress. For example, the status details may indicate that an order has been accepted and sent to a transport partner, but an acknowledgement has not yet been received.

Typically, the status rows will also contain a count of the number of times the send process has been attempted for each item, so the worker role can abandon the process and raise an exception, or place the message in a dead letter queue for human intervention, if a specified number of retries has been exceeded.

The following sections provide more detail on how Trey Research designed and implemented this mechanism using SQL Azure and the messaging layer defined in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud."

The sample Trey Research application that you can download for this guide implements many of the technologies and techniques described here. However, to simplify installation and setup, and reduce the prerequisites and the requirements for users to establish extensive Azure accounts, the feature set and some of the implementation details differ from the text of this guide.

Recording the Details of an Order

A customer places an order by using the "Checkout" facility in the Orders web application implemented by the Orders.Website project. This operation invokes the AddressAndPayment action handler in the CheckoutController class. The AddressAndPayment method creates an Order object using the data entered by the customer, and uses the Entity Framework (EF) to add this order to the SQL Azure database. The following code sample shows the relevant sections of the AddressAndPayment method. Note that the customer and address information is added to the Order object by using the TryUpdateModel method.

public ActionResult AddressAndPayment(
  FormCollection values)
{
  var order = new Order();
  this.TryUpdateModel(order);

  var identity = User.Identity as IClaimsIdentity;
  var userName = identity.GetFederatedUsername();

  var cartId = ShoppingCart.GetCartId(this.HttpContext);
  var cartItems = this.cartStore.FindCartItems(cartId);

  order.OrderDetails = cartItems.Select(
    i => new OrderDetail 
      { ProductId = i.ProductId, Quantity = i.Count,
        Product = i.Product });
  order.UserName = userName;
  order.OrderDate = DateTime.Now;

  // Save the order
  this.ordersStore.Add(order);

  ...
}

For each order that is created, the Add method of the underlying entity model also creates a collection of OrderDetail objects and populates these with the information about each item in the order. The information that describes the order (the name, address, and contact details for the customer together with the date the order was placed, the total value of the order, and a unique order ID) is saved to the Order table in the SQL Azure database. The line items that comprise the order (the product and the quantity required) are saved to the OrderDetail table. The following code sample shows the Add method.

public void Add(Order order)
{
  ...
  var orderId = Guid.NewGuid();

  var orderToSave = new Entities.Order
    {
      OrderId = orderId, 
      UserName = order.UserName, 
      OrderDate = order.OrderDate, 
      Address = order.Address, 
      City = order.City, 
      State = order.State, 
      PostalCode = order.PostalCode, 
      Country = order.Country, 
      Phone = order.Phone, 
      Email = order.Email, 
      Total = order.OrderDetails.Sum(
              d => d.Quantity * d.Product.Price)
    };

  using (var database 
         = TreyResearchModelFactory.CreateContext())
  {
    database.Orders.AddObject(orderToSave);

    foreach (var orderDetail in order.OrderDetails)
    {
      var detailToSave = new OrderDetail
        {
          ProductId = orderDetail.ProductId, 
          OrderId = orderId, 
          Quantity = orderDetail.Quantity 
        };
      database.OrderDetails.AddObject(detailToSave);
    }
    ...
  }
}

Processing an order requires the Orders application to keep track of the status of an order. The Orders application records information in two tables; OrderStatus and OrderProcessStatus. These rows indicate the current processing status of the order and the most recent operation carried out by the order processing code. After it has created the Order and OrderDetail entities, the Add method continues by creating rows for the OrderStatus and OrderProcessStatus tables.

    ...
    var status = new Entities.OrderStatus
      {
        OrderId = orderId, 
        Status = "TreyResearch: Order placed", 
        Timestamp = DateTime.UtcNow 
      };
    database.OrderStatus.AddObject(status);

    var orderProcess = new OrderProcessStatus
      {
        OrderId = orderId, 
        ProcessStatus = "pending process" 
      };
    database.OrderProcessStatus.AddObject(orderProcess);
    ...

Finally, the Add method saves all of the changes to the tables by calling the SaveChanges method of the data model. Notice that it does so by calling the ExecuteAction method of the object defined in the sqlCommandRetryPolicy property of the class.

    ...
    this.sqlCommandRetryPolicy.ExecuteAction(
      () => database.SaveChanges());
    order.OrderId = orderId;
    ...

Note

The sqlCommandRetryPolicy property implements an example of a Transient Fault Handling Application Block policy for accessing a SQL Azure database. For more information and further examples, see the section "Customer Details Storage and Retrieval" in Chapter 3, "Authenticating Users in the Orders Application." Additional information is also available online; see the topic "The Transient Fault Handling Application Block" on MSDN.

At this point, the order is ready to be sent for processing.

Sending an Order to a Service Bus Topic from the Orders Application

The order processing logic is initiated by the worker role in the Orders.Workers project. The worker role uses two classes that encapsulate the logic for sending orders to transport partners, and for receiving order status and acknowledgement messages from transport partners. These two classes, NewOrderJob and StatusUpdateJob, are referred to as "job processors." The constructor for the WorkerRole class (defined in the Orders.Workers project) executes the CreateJobProcessors method to instantiate these objects.

public class WorkerRole : RoleEntryPoint
{
  private readonly IEnumerable<IJob> jobs;
  ...

  public WorkerRole()
  {
    ...
    this.jobs = this.CreateJobProcessors();
  }

  ...  

  private IEnumerable<IJob> CreateJobProcessors()
  {
    return new IJob[]
    {
      new NewOrderJob(), 
      new StatusUpdateJob(),
    };
  }
}

The job processors both implement the IJob interface (defined in the Jobs folder of the Orders.Workers project). This interface defines methods for starting and stopping long-running jobs:

public interface IJob
{
  void Run();
  void Stop();
}

Having created the job processors, the Run method in the worker role starts each job processor executing by using a separate Task. The worker role keeps track of the state of each task, polling every 30 seconds, and if necessary restarting any job processors that have failed. The following code example shows the Run method of the worker role.

public class WorkerRole : RoleEntryPoint
{
  private readonly IEnumerable<IJob> jobs;
  private readonly List<Task> tasks;
  private bool keepRunning;
  
  ...

  public override void Run()
  {
    this.keepRunning = true;

    // Start the jobs
    foreach (var job in this.jobs)
    {
      var t = Task.Factory.StartNew(job.Run);
      this.tasks.Add(t);
    }

    // Control and restart a faulted job
    while (this.keepRunning)
    {
      for (int i = 0; i < this.tasks.Count; i++)
      {
        var task = this.tasks[i];
        if (task.IsFaulted)
        {
          // Observe unhandled exception
          if (task.Exception != null)
          {
            TraceHelper.TraceError(
              "Job threw an exception: " +  
              task.Exception.InnerException.Message);
          }
          else
          {
            TraceHelper.TraceError(
              "Job Failed and no exception thrown.");
          }

          var jobToRestart = this.jobs.ElementAt(i);
          this.tasks[i] = 
            Task.Factory.StartNew(jobToRestart.Run);
        }
      }

      Thread.Sleep(TimeSpan.FromSeconds(30));
    }
  }
}

The following section describes how the NewOrderJob class sends orders to transport partners. The UpdateStatusJob class is described in the section "Receiving Acknowledgement and Status Messages in the Orders Application," later in this chapter.

The NewOrderJob Class

The worker role uses the NewOrderJob job processor to query the database for new orders and dispatch them to the appropriate transport provider.

The NewOrderJob class implements a reliable mechanism that that sends messages to the Service Bus topic. It tracks whether the send operation was successful, and if necessary can retry failed send operations later. If (and only if) the message was sent successfully is the status of the order updated in the database.

Note

The mechanism implemented by the NewOrderJob class is a simplified adaptation of the Scheduler-Agent-Supervisor pattern documented by Clemens Vasters. For more information, see the article "Cloud Architecture - The Scheduler-Agent-Supervisor Pattern."

Figure 3 shows a high-level view of the tasks that the NewOrderJob accomplishes and the ancillary classes it uses. The ServiceBusTopic and ServiceBusTopicDescription classes are described in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," but the remaining classes are described later in this section.

Hh868039.947052A94642B5D9080DBE99480C78C5(en-us,PandP.10).png

Figure 3

Message flow for the order processing system

Recall that the worker role starts the NewOrderJob job processor by calling the Run method. This method creates an instance of the ServiceBusTopicDescription class and populates it with the required values for the target topic, then uses this to create an instance of the ServiceBusTopic class named newOrderMessageSender.

var serviceBusTopicDescription 
    = new ServiceBusTopicDescription
{
  Namespace = this.serviceBusNamespace,
  TopicName = topicName,
  Issuer = issuer,
  DefaultKey = defaultKey
};
this.newOrderMessageSender 
     = new ServiceBusTopic(serviceBusTopicDescription);

The NewOrderJob job processor can then use the ServiceBusTopic instance to send the message, which it does by calling its Execute method.

while (this.keepRunning)
{
  this.Execute();
  Thread.Sleep(TimeSpan.FromSeconds(10));
}

Note

The Execute method processes orders in batches, up to 32 at a time. Each order in a batch is posted asynchronously by a separate task created by the Send method of a ServiceBusTopic object. The NewOrderJob job processor sleeps for 10 seconds between invocations of the Execute method to prevent resource starvation. This value was selected based the expected volume of orders and profiling the performance of the application, but it may be changed if the number of orders increases significantly.

Locking Orders for Processing

The details of orders are held in the local SQL Azure database (in tables named Order and OrderDetails), but to help keep track of the status of orders and ensure that they are processed reliably Trey Research defined two further tables in the same database.

The first of these, the OrderStatus table, contains the status rows for each order. Each row indicates the most recent publicly visible status of the order as displayed in the website page when customers view the list of their current orders. New rows are added with a timestamp; to maintain history information existing rows are not updated. The following table describes the columns in the OrderStatus table.

Column

Description

OrderID

The foreign key that links the row to the related row in the Order table.

Status

A value that indicates the most recent publicly visible status of the order. Possible values are Order placed, Order sent to transport partner, Order received (by transport partner), and Order shipped (when the goods have been delivered).

Timestamp

The UCT date and time when this change to the status of the order occurred.

The other table, OrderProcessStatus, contains the data that the retry mechanism uses to determine whether an order message has been successfully posted to the Service Bus topic. The following table describes the columns in the OrderProcessStatus table.

Column

Description

OrderID

The foreign key that links the row to the related row in the Order table.

ProcessStatus

A value used internally by the worker role that indicates the status of the process. Possible values are pending process, processed, error, and critical error. These values are defined in the ProcessStatus class located in the Stores folder of the Orders.Workers project.

LockedBy

The ID of the worker role that is processing the order (obtained from RoleEnvironment.CurrentRoleInstance.Id) or NULL if the order is not currently being processed.

LockedUntil

The UCT date and time when the current processing of the order will timeout, or NULL if the order is not currently being processed.

Version

A value used to support optimistic locking by EF for rows in this table. It has the ConcurrencyMode property set to Fixed and the StoreGeneratedPatterns property set to Computed. It is automatically updated by EF when rows are inserted or updated.

RetryCount

The number of times that processing of the order has been attempted so far. It is incremented each time a worker role attempts to process the order. After a number of failed attempts, the worker role sets the process status to critical error and raises an exception to advise administrators that there is a problem.

BatchId

A GUID that identifies this batch of orders being processed. Each time a worker role locks a batch of orders that it will start processing it assigns the same BatchId to all of these so that it can extract them after the lock is applied in the database.

As an optimization mechanism, the Execute method retrieves orders from the SQL Azure database in batches, but it needs to ensure that the same orders are not going to be retrieved by another concurrent invocation of the Execute method. Therefore, the Execute method locks a batch of rows in the Order table that it will process, and then retrieves this batch of rows as a collection of OrderProcessStatus objects by calling the LockOrders and GetLockedOrders methods in the ProcessStatusStore class (located in the Stores folder of the Orders.Workers project).

var batchId = this.processStatusStore.LockOrders(
         RoleEnvironment.CurrentRoleInstance.Id);
var ordersToProcess 
    = this.processStatusStore.GetLockedOrders(
         RoleEnvironment.CurrentRoleInstance.Id, batchId);

The LockOrders method in the ProcessStatusStore class locks the orders by executing a SQL statement that sets the values in the OrderProcessStatus table rows. It assigns the current worker role instance ID and the batch ID to each one that has not already been processed, has not resulted in a critical error and been abandoned, and is not already locked by this or another instance of the NewOrderJob job processor. Also, notice that the LockOrders method only locks the first 32 orders available, and only for a specified period of time. This approach prevents the NewOrderJob job processor from causing a bottleneck by attempting to process to many orders at a time, and also prevents a failed instance of the NewOrderJob job processor causing an order to be locked indefinitely.

public Guid LockOrders(string roleInstanceId)
{
  using (var database =
         TreyResearchModelFactory.CreateContext())
  {
    var batchId = Guid.NewGuid();
    var commandText = "UPDATE TOP(32) OrderProcessStatus "
        + "SET LockedBy = {0}, LockedUntil = {1}, "
        + "BatchId = {2} "
        + "WHERE ProcessStatus != {3} "
        + "AND ProcessStatus != {4} "
        + "AND (LockedUntil < {5} OR LockedBy IS NULL)";
    this.sqlCommandRetryPolicy.ExecuteAction(
      () => database.ExecuteStoreCommand(
              commandText, roleInstanceId,
              DateTime.UtcNow.AddSeconds(320), batchId,
              ProcessStatus.Processed,
              ProcessStatus.CriticalError,
              DateTime.UtcNow));
    return batchId;
  }
}

Note

The NewOrderJob job processor locks orders, processes them and finally sends "New Order" messages asynchronously, which means that after sending the message, the execution continues. Under some possible but unlikely circumstances, the Execute method of an instance of the NewOrderJob class can be called when there are still some orders being processed by that same instance. This will occur if processing the full batch of 32 orders takes more than 10 seconds (see the earlier note regarding the sleep interval between invocations of the Execute method).
To prevent the job processor from attempting to handle the same order more than once, the developers added the BatchId to each batch of orders as they are locked. This is a random value generated for each batch of orders. Subsequently, when the Execute method processes each order, it only fetches the orders that have the appropriate value for the BatchId.

The GetLockedOrders method in the ProcessStatusStore class queries the OrderProcessStatus table to retrieve the rows that were successfully locked by the LockOrders method.

public IEnumerable<Models.OrderProcessStatus>
       GetLockedOrders(string roleInstanceId, Guid batchId)
{
  using (var database = 
         TreyResearchModelFactory.CreateContext())
  {
    return
      this.sqlCommandRetryPolicy.ExecuteAction(
        () =>
        database.OrderProcessStatus.Where(
          o =>
          o.LockedBy.Equals(roleInstanceId,
                     StringComparison.OrdinalIgnoreCase)
          && o.BatchId == batchId).Select(
            op =>
            new Models.OrderProcessStatus
              {
                LockedBy = op.LockedBy,
                LockedUntil = op.LockedUntil,
                OrderId = op.OrderId,
                ProcessStatus = op.ProcessStatus,
                Order =
                  new Models.Order
                    {
                      OrderId = op.Order.OrderId,
                      UserName = op.Order.UserName,
                      OrderDate = op.Order.OrderDate,
                      Address = op.Order.Address,
                      City = op.Order.City,
                      State = op.Order.State,
                      PostalCode = op.Order.PostalCode,
                      Country = op.Order.Country,
                      Phone = op.Order.Phone,
                      Email = op.Order.Email,
                      Total = op.Order.Total
                    }
              }).ToList());
  }
}

Handling Orders with Expired Locks

The Execute method then iterates over the collection of locked orders and sends a suitable message for each one to the Service Bus topic. However, there is a small possibility, especially during initial configuration and profiling of the application, that the batch size selected for the NewOrderJob job processor may be too large, or the interval between processing batches is too small, and the next iteration commences before the current batch of orders has been dispatched but after the LockedUntil time for one or more orders in the batch has passed. In this case these locks are considered to have expired and the next iteration of the NewOrderJob job processor may have relocked these orders and be in the process of sending them. So, to avoid the same order being posted twice, the Execute method examines the LockedUntil property of each order in the batch, and if the value of this property is before the current time then the order is skipped and handled in a subsequent iteration, if necessary.

The following code shows how Trey Research iterates over the orders to process and checks for expired messages.

foreach (var orderProcess in ordersToProcess)
{
  if (orderProcess.LockedUntil < DateTime.UtcNow)
  {
    // If this orderProcess expired, ignore it and let 
    // another Worker Role process it.
    continue;
  }

  ...
  // Code here to create the message, add the required
  // Service Bus topic filter properties, and then 
  // send the message. 
  ...
}

Posting Orders to the Service Bus Topic

Hh868039.note(en-us,PandP.10).gifJana Says:
Jana The use of filters in a Service Bus topic allows you to implement rudimentary business logic, and even modify the properties of messages as they pass through the topic. It also allows you to decouple senders and receivers by allowing a varying number of subscribers to listen to a topic, each receiving only messages that are targeted to them by a filter in the topic. However, topics and filters do not provide a general purpose workflow mechanism, and you should not try to implement complex logic using Service Bus topics.

The Service Bus topic that Trey Research uses in the Orders application is configured to filter messages based on the delivery location and the total value of the order. A summary of every order must be sent to the appropriate transport partner to advise the requirement for delivery. Additionally, the details of all orders with a total value over $10,000 are sent to an on-premises service to be stored in the audit log database. The Execute method therefore adds the required properties to the message so that the Service Bus topic can filter them and post them to the appropriate subscribers. The following code shows how the Execute method uses a separate class named TransportPartnerStore to obtain the appropriate transport partner name.

  var transportPartnerName = this.transportPartnerStore
               .GetTransportPartnerName(orderProcess.Order.State);

The TransportPartnerStore class is used to determine which transport partner a message should be sent to; the choice is based on the location of the order recipient. It is defined in the TransportPartnerStore.cs file in Stores folder of the Orders.Workers project.

Creating a New Order Message

The Execute method can now create the order message to send. To handle failures when sending messages, the custom retry mechanism might need to attempt to send the same message more than once. The Execute method cannot just create the message and pass it to the Send method of the ServiceBusTopic class because the serialization mechanism used by Service Bus messages means that the body of a message can only be read once. Therefore, the Execute method defines a function that creates the message dynamically and then passes this function to the Send method of the ServiceBusTopic class. The Send method can then invoke this method to construct a fresh copy of the message each time it is sent.

The following code shows the section of the Execute method that defines this function. Notice that it does so by first creating an instance of the NewOrderMessage class (defined in the Communication\Messages folder of the Orders.Shared project), which represents the specific message that Trey Research needs to send, and then builds a Service Bus BrokeredMessage instance from this. Finally, it sets the following properties of the BrokeredMessage instance:

  • The TransportPartnerName property, previously retrieved from the TransportPartnerStore, is used by a Service Bus filter to direct the message to the appropriate transport partner.
  • The ServiceBusNamespace property indicates the Service Bus namespace that contains the Service Bus queue specified in the ReplyTo property. The Orders application may be deployed to more than one datacenter, and the transport partner needs to know which instance of Service Bus it should use when posting a response.
  • The AcsNamespace property allows the receiver to tell which instance of ACS was used by the sender to authenticate (the application may be configured to use ACS in more than one datacenter for authenticating partners for Service Bus access). This is required so that the receiver of the message can validate the sender, as described in the section "Securing Messages" in Chapter 4 "Implementing Reliable Messaging and Communications with the Cloud."
  • The OrderAmount property is used by a Service Bus filter to detect orders over the specified total value and send a copy of these messages to the on-premises audit log.
  • The ReplyTo property specifies the Service Bus queue on which the receiver should send any response messages. For example, the transport partner uses this queue to post the message that acknowledges receipt of this message, as described in the section "Correlating Messages and Replies" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud."
  Func<BrokeredMessage> brokeredMessageFunc = () =>
    {
      // Send new order message
      var msg = new NewOrderMessage
      {
        OrderId = orderProcess.Order.OrderId,
        OrderDate = orderProcess.Order.OrderDate,
        ShippingAddress = orderProcess.Order.Address,
        Amount =
          Convert.ToDouble(orderProcess.Order.Total),
        CustomerName = orderProcess.Order.UserName
      };

      var brokeredMessage = new BrokeredMessage(msg)
      {
        MessageId = msg.OrderId.ToString(),
        CorrelationId = msg.OrderId.ToString(),
        Properties = { { "TransportPartnerName", 
                          transportPartnerName }, 
                       { "ServiceBusNamespace", 
                          this.serviceBusNamespace }, 
                       { "AcsNamespace", 
                          this.acsNamespace }, 
                       { "OrderAmount", 
                          orderProcess.Order.Total } },
        ReplyTo = this.replyQueueName
      };

      return brokeredMessage;
    };

The message will be sent asynchronously, so the NewOrderJob class must also assemble an object that can be passed as the state in the asynchronous method calls, as shown in the following code. This object must contain the order ID (so that the retry mechanism can update the correct order status rows in the OrderStatus table) and the name of the transport partner (so that it can be displayed when customers view their existing orders).

var objectState = new Dictionary<string, object>
{
  { "orderId", orderProcess.OrderId },
  { "transportPartner", transportPartnerName }
}

Sending the New Order Messages

The Execute method can now send the order to the Service Bus topic. It does this by calling the Send method of the ServiceBusTopic instance it created and saved in the variable named newOrderMessageSender. As described in the section "Sending Messages to a Service Bus Topic" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," the Send method of the ServiceBusTopic class takes four parameters; the function that creates the BrokeredMessage instance to send, the asynchronous state object, and two Action methods (one to execute after a message is sent and one to execute if sending fails). The ProcessStatusStore class defines lambda statements that are passed as these two actions, as shown in the following code.

  this.newOrderMessageSender
    .Send(
      brokeredMessageFunc,
      objectState,
      (obj) =>
      {
        var objState = (IDictionary<string, object>)obj;
        var orderId = (Guid)objState["orderId"];
        var transportPartner 
            = (string)objState["transportPartner"];
        this.processStatusStore.SendComplete(orderId,
                                transportPartner);
      },
      (exception, obj) =>
      {
        var objState = (IDictionary<string, object>)obj;
        var orderId = (Guid)objState["orderId"];
        this.processStatusStore.UpdateWithError(
                       exception, orderId);
      });
}

The first action, which is executed after the message has been sent successfully, extracts the order ID and transport partner name from the asynchronous state object, and passes these to the SendComplete method of the ProcessStatusStore class instance that the NewOrderJob class is using. The second action extracts only the order ID from the asynchronous state object, and passes it and the current Exception instance to the UpdateWithError method instance.

Completing the Reliable Send Process

The SendComplete method of the ProcessStatusStore class, called by the Execute method if the message is posted successfully, performs the following tasks:

  • It updates the matching row in the OrderProcessStatus table to modify the value in the ProcessStatus column to "processed" (this prevents another iteration of the NewOrderJob job processor from attempting to send the order again).
  • It adds a new row to the OrderStatus table to show the current status of the order ("Order sent to transport partner"), with the correct timestamp.
  • It updates the Order table with the name of the transport partner that will deliver the order.
public void SendComplete(Guid orderId, 
                         string transportPartner)
{
  using (var database = 
         TreyResearchModelFactory.CreateContext())
  {
    try
    {
      using (var t = new TransactionScope())
      {
        // Avoid the transaction being promoted.
        this.sqlConnectionRetryPolicy.ExecuteAction(
            () => database.Connection.Open());

        // Update the OrderProcessStatus table row
        var processStatus =
          this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.OrderProcessStatus
                  .SingleOrDefault(
                     o => o.OrderId == orderId));
        processStatus.ProcessStatus 
            = ProcessStatus.Processed;
        processStatus.LockedBy = null;
        processStatus.LockedUntil = null;
        this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.SaveChanges());

        // Add a new row to the OrderStatus table
        var status = new OrderStatus { OrderId = orderId,
        Status = 
          "TreyResearch: Order sent to transport partner",
                  Timestamp = DateTime.UtcNow };
        database.OrderStatus.AddObject(status);
        this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.SaveChanges());

        // Update the Order table row
        var order =
          this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.Order.SingleOrDefault(
                           o => o.OrderId == orderId));
        order.TransportPartner = transportPartner;
        this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.SaveChanges());

        t.Complete();
      }
    }
    catch (UpdateException ex)
    {
      ...
    }
  }
}

The UpdateWithError method, called by the Execute method if the messages was not posted successfully, generates suitable warning messages using the custom TraceHelper class, updates the matching OrderProcessStatus table row with the value "error", and sets the values of the LockedBy and LockedUntil columns to null to unlock the message and make it available for processing again.

The UpdateWithError method also checks whether the number of retry attempts has exceeded the specified maximum (defined in the ServiceConfiguration.cscfg file). If it has, it updates the OrderProcessStatus table row with the value "critical error" and generates a message that indicates the administrator should investigate the failed order process.

public void UpdateWithError(Exception exception, 
                            Guid orderId)
{
  TraceHelper.TraceWarning("NewOrderJob: The Order '{0}' "
    + "couldn't be processed. Error details: {1}", 
    orderId.ToString(), exception.ToString());

  using (var database 
    = TreyResearchModelFactory.CreateContext())
  {
    var processStatus =
        this.sqlCommandRetryPolicy.ExecuteAction(
            () => database.OrderProcessStatus
                  .SingleOrDefault(
                    o => o.OrderId == orderId));
    processStatus.ProcessStatus = ProcessStatus.Error;
    processStatus.LockedBy = null;
    processStatus.LockedUntil = null;
    processStatus.RetryCount 
      = processStatus.RetryCount + 1;

    var newOrderJobRetryCountCheck = int.Parse(
      CloudConfiguration.GetConfigurationSetting(
       "NewOrderJobRetryCountCheck", "3"));

    if (processStatus.RetryCount 
        > newOrderJobRetryCountCheck)
    {
      processStatus.ProcessStatus 
          = ProcessStatus.CriticalError;
      TraceHelper.TraceError("NewOrderJob: The Order '{0}' "
        + "has reached {1} retries. This order requires "
        + "manual intervention.", 
        orderId.ToString(), processStatus.RetryCount);
    }

    this.sqlCommandRetryPolicy.ExecuteAction(
                   () => database.SaveChanges());
  }
}

At this point, the message has been sent and the status recorded in the SQL Azure database is consistent with the actual status of the message.

The next task is to retrieve the order from the subscription at the appropriate transport partner.

How Trey Research Decouples the Order Process from the Transport Partners' Systems

To communicate with the delivery systems of the various transport partners, Trey Research implemented a series of connectivity components in the form of connectors and adapters. These components provide an interface between the Service Bus and the transport partner. Each component is specific to the transport partner, and provides the functionality to retrieve messages from the appropriate Service Bus subscription, translate it into a format accepted by the transport partner, and then pass it to the transport partner's internal system.

Responses from the transport partner are passed back to the component, converted into Service Bus messages, and then posted to the Service Bus queue for the Order application. Figure 1 earlier in this chapter shows where these components are deployed. The following sections provide more information on the sample implementation of these components provided with the Trey Research solution.

Note

The adapter and connector included with the sample solution are provided as simple examples for enabling a transport partner to interact with the Service Bus topics and queues used by Trey Research. In the real world, the internal business logic for these connectivity components may be considerably more complex than that illustrated by these samples.

Receiving and Processing an Order in a Transport Partner

Transport partners connect to the Service Bus topic on which the NewOrderJob job processor has posted the order messages. Each transport partner uses its own Service Bus subscription, configured with a filter that examines the TransportPartnerName property of each message. The subscriptions and filters themselves are created by the setup program in the TreyResearch.Setup project; for more information, see the section "Subscribing to a Service Bus Topic" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud."

Sample transport partners are provided in the TransportPartner project. The solution includes two transport partners; Contoso (implemented by the ContosoTransportPartner class) which ships orders to customers that reside in the same or an adjacent state to the Trey Research manufacturing plant, and Fabrikam (implemented by the FabrikamTransportPartner class) which ships orders to customers located elsewhere.

The local transport partner, Contoso, connects to the Service Bus topic by using the Connector class defined in the Connectivity folder of the TransportPartner project, while the distance transport partner, Fabrikam, uses the Adapter class defined in the same folder.

Note

The section "Implementing Adapters and Connectors for Translating and Reformatting Messages" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," describes the rationale behind the use of connectors and adapters.

The Connector and Adapter classes both inherit from the OrderProcessor class, which provides the functionality for actually connecting to the Service Bus topic and receiving messages in the Run method.

public void Run()
{
  var serviceBusNamespaces = ConfigurationManager.
    AppSettings["serviceBusNamespaces"].
    Split(',').ToList();
  ...

  foreach (var serviceBusNamespace in serviceBusNamespaces)
  {
    this.serviceBusSubscriptionDescription.Namespace = 
      serviceBusNamespace;
    var serviceBusSubscription = new 
      ServiceBusSubscription(
        this.serviceBusSubscriptionDescription);
    var receiverHandler = new 
      ServiceBusReceiverHandler<NewOrderMessage>(
        serviceBusSubscription.GetReceiver())
    {
      MessagePollingInterval = TimeSpan.FromSeconds(2)
    };

    receiverHandler.ProcessMessages(
      (message, queueDescription, token) =>
      {
        return Task.Factory.StartNew(
          () => this.ProcessMessage(
            message, queueDescription),
          this.tokenSource.Token,
          TaskCreationOptions.None,
          context);
      },
      this.tokenSource.Token);
  }
}

Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," describes how and why the Run method creates a ServiceBusReceiverHandler object to retrieve messages. As far as the order process is concerned, there are two important aspects of this method:

  • The Trey Research web application and worker role may be deployed to multiple datacenters, and the deployment at each datacenter is configured with its own set of Service Bus topics and queues in its own Service Bus namespace. Therefore the Run method must connect to each datacenter and listen for messages on the topic at each one. This is the purpose of the foreach loop; the Service Bus namespaces to which the method must connect are defined in the configuration file, and the loop connects to the Service Bus topic in each namespace.
  • The call to the ProcessMessages method of the ServiceBusReceiverHandler object invokes the ProcessMessage method. This method provides the logic for actually processing an order message, as shown by the following code sample:
protected virtual void ProcessMessage(
    NewOrderMessage message, 
    ServiceBusQueueDescription queueDescription)
{
  var trackingId = this.ProcessOrder(message, 
                                     queueDescription);

  if (trackingId != Guid.Empty)
  {
    // Get SWT from ACS.
    var token = this.GetToken(queueDescription);

    var statusMessage = 
      string.Format("{0}: Order Received", 
        this.TransportPartnerDisplayName);
    this.SendOrderReceived(message, queueDescription, 
      statusMessage, trackingId, token);
  }
}

The ProcessMessage method calls the ProcessOrder method to perform the transport partner-specific business processing for the order. ProcessOrder is an abstract method with implementations in the Adapter and Connector classes. In the sample transport partners, the orders are stored as a list of ActiveOrder objects and each order is displayed on the Windows Form that provides the user interface (the OnOrderProcessed event handler performs this task).

However, there is a small possibility that the same order may be received more than once; the retry mechanism in the NewOrderJob class that posts orders to the Service Bus topic may cause the same order message to be repeated, depending on the reliability and performance of the network connection. Consequently, before creating the ActiveOrder object the ProcessOrder method verifies that an order with the same ID as the message just received does not already exist in the list; if there is such an order, this new one is assumed to be a duplicate and is discarded.

The following code sample shows the implementation of the ProcessOrder method in the Adapter class.

protected override Guid ProcessOrder(
  Orders.Shared.Communication.Messages.NewOrderMessage 
  message, ServiceBusQueueDescription queueDescription)
{
  var processedOrder = 
    this.orderStore.GetById(message.OrderId);

  if (processedOrder != null)
  {
    // This order has been received for processing more 
    // than once, and will be discarded.
    return Guid.Empty;
  }

  var activeOrder = new ActiveOrder
  {
    OrderId = message.OrderId,
    ShippingAddress = message.ShippingAddress,
    Amount = message.Amount,
    ReplyTo = queueDescription.QueueName,
    ReplyToNamespace = queueDescription.Namespace,
    Status = "received",
    SwtAcsNamespace = queueDescription.SwtAcsNamespace
  };

  this.orderStore.Add(activeOrder);
  
  // Call the transport partner service and 
  // retrieve a tracking id.
  var trackingId = this.transportServiceWrapper.
    RequestShipment(activeOrder);

  if (this.OnOrderProcessed != null)
  {
    this.OnOrderProcessed(this, 
      new OrderProcessedEventArgs 
      { ActiveOrder = activeOrder });
  }

  // if tracking id received, delivery request is 
  // acknowledged, it is safe to update the status queue 
  // with the "Order Received" status.
  return trackingId;
}

Note that the sample assumes that the transport partners have internal systems that react in different ways when they receive an order message. This is to provide a more real-world experience in the application.

  • The local transport partner, Contoso, responds immediately with an acknowledgement message indicating that the order has been received. Later, when the order is shipped, the local transport partner sends another message.
  • The distance transport partner, Fabrikam, accepts the order and generates a tracking ID. It sends a response message back to Trey Research containing this tracking ID. Many distance transport partners provide their own web applications that enable customers to log in and query the progress of the order by providing this tracking ID (these web applications are the responsibility of the transport partner; an example is not provided as part of the sample solution.)

Acknowledging an Order or Indicating that it has Shipped in a Transport Partner

After receiving an order, the local transport partner, Contoso, should acknowledge successful receipt of the message. Later on, the local transport partner sends another message when the order is dispatched. As described in the previous section, the distance transport partner, Fabrikam, only sends a single message when the order is received. In both cases, the ProcessMessage method calls the SendOrderReceived method to construct and send an appropriate message to the Orders application.

protected virtual void ProcessMessage(
    NewOrderMessage message, 
    ServiceBusQueueDescription queueDescription)
{
  var trackingId = this.ProcessOrder(message, 
                                     queueDescription);

  if (trackingId != Guid.Empty)
  {
    // Get SWT from ACS.
    var token = this.GetToken(queueDescription);

    var statusMessage = 
      string.Format("{0}: Order Received", 
        this.TransportPartnerDisplayName);
    this.SendOrderReceived(message, queueDescription, 
      statusMessage, trackingId, token);
  }
}
        
protected void SendOrderReceived(
  NewOrderMessage message, 
  ServiceBusQueueDescription queueDescription, 
  string statusMessage, Guid trackingId, string swt)
{
  this.SendToUpdateStatusQueue(message.OrderId, trackingId, 
    statusMessage, queueDescription, swt);
}

The SendOrderReceived method calls the SendToUpdateStatusQueue method, which contains the logic for composing an OrderStatusUpdateMessage that it posts to a Service Bus queue. The name of the queue to use and the Service Bus namespace in which it resides were specified in the ReplyTo and ServiceBusNamespace properties of the original order message. They were used to create a ServiceBusQueueDescription object when the message was received by the ProcessMessage method of the ServiceBusReceiverHandler class (for more information about the ServiceBusReceiverHandler class, see the section "Receiving Messages from a Service Bus Queue and Processing Them Asynchronously" in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud").

When the transport partner sends a reply, it should include a security token to enable the Orders application to authenticate the response. This mechanism is also described in Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," in the section "Securing Messages." The ACS namespace in which this token is defined is provided in AcsNamespace property in the original order, and this value is also added to the ServiceBusQueueDescription object. When the transport partner sends the reply, the security token is retrieved from the specified ACS namespace and added to the response message.

public class ServiceBusReceiverHandler<T>
{
  ...
  private void ProcessMessage(
    IBrokeredMessageAdapter message)
  {
    if (message != null)
    {
      ...

      var queueDescription = new ServiceBusQueueDescription
      {
        QueueName = message.ReplyTo,
      };

      if (message.Properties.ContainsKey(
        "ServiceBusNamespace"))
      {
        queueDescription.Namespace = message.Properties[
          "ServiceBusNamespace"].ToString();
      }

      if (message.Properties.ContainsKey("AcsNamespace"))
      {
        queueDescription.SwtAcsNamespace = 
          message.Properties["AcsNamespace"].ToString();
      }
      ...
    }
    ...
  }
  ...
}

The ServiceBusQueueDesciption object permeates down to the SendToUpdateStatusQueue method in the queueDescription parameter. The SendToUpdateStatusQueue method creates an OrderStatusUpdateMessage object, populating it with the acknowledgement details including the "Order Received" status message and the tracking ID generated by the transport partner. This OrderStatusObjectMessage object is packaged up inside a BrokeredMessage object and posted to the queue specified by the queueDescription parameter. Notice that the CorrelationId property of this response message is set to the order ID of the original request so that the Orders application can correlate this response with the request when it is received.

The following code sample shows the SendToUpdateStatusQueue method. Note that, as an optimization mechanism, this method caches a copy of the ServiceBusQueue object that it creates in a Dictionary object called statusUpdateQueueDictionary. When the SendToUpdateStatusQueue method is called again, it should find this ServiceBusQueue in the Dictionary and should not need to create it again.

private void SendToUpdateStatusQueue(Guid orderId, 
  Guid trackingId, string orderStatus, 
  ServiceBusQueueDescription queueDescription, string swt)
{
  var updateStatusMessage =
    new BrokeredMessage(
      new OrderStatusUpdateMessage
      {
        OrderId = orderId, 
        Status = orderStatus, 
        TrackingId = trackingId, 
        TransportPartnerName = 
          this.TransportPartnerDisplayName,
      })
      { CorrelationId = orderId.ToString() };

  updateStatusMessage.Properties.Add(
    "SimpleWebToken", swt);

  ServiceBusQueue replyQueue;
  if (this.statusUpdateQueueDictionary.
      ContainsKey(queueDescription.Namespace))
  {
    replyQueue = this.statusUpdateQueueDictionary[
      queueDescription.Namespace];
  }
  else
  {
    var description = new ServiceBusQueueDescription
    {
      Namespace = queueDescription.Namespace,
      QueueName = queueDescription.QueueName,
      DefaultKey = 
        this.serviceBusQueueDescription.DefaultKey,
      Issuer = this.serviceBusQueueDescription.Issuer
    };

    replyQueue = new ServiceBusQueue(description);
    this.statusUpdateQueueDictionary.Add(
      queueDescription.Namespace, replyQueue);
  }

  var brokeredMessageAdapter = 
    new BrokeredMessageAdapter(updateStatusMessage);
  replyQueue.Send(brokeredMessageAdapter);
}

For the local transport partner, Contoso, when an order is delivered the application calls the SendOrderShipped method in the OrderProcessor class. The SendOrderShipped method operates in much the same way as the SendOrderReceived method, calling the SendToUpdateStatusQueue method to create and post an OrderStatusUpdate message to the Service Bus queue.

Receiving Acknowledgement and Status Messages in the Orders Application

As well as starting a NewOrderJob object to post new orders to transport partners, each worker role in the Trey Research solution creates a StatusUpdateJob object to listen for status messages received from the transport partners. This class is located in the StatusUpdateJob.cs file in the Jobs folder in the Orders.Workers project. In common with the OrderProcessor class, the Run method in the StatusUpdateJob class employs a ServiceBusReceiverHandler object to actually connect to the queue and retrieve messages.

The StatusUpdateJob object also provides the business logic that is run for each status message as it is received, as a lambda expression, when it calls the ProcessMessages method of the ServiceBusReceiverHandler object. This lambda expression performs the following tasks:

  • It checks the authentication token in the message and throws an InvalidTokenException exception if it is not recognized (the message may be from a rogue third party).
  • It creates an order status record with the status information provided by the transport partner.
  • It discards the message if it is a duplicate of an existing order (retry logic in the transport partner may cause it to send duplicate status messages if it detects a transient error).
  • It updates the order with the name of the transport partner that will ship it to the customer.
  • It adds the tracking ID provided by the partner to the order status record.
  • It stores the order status record in the TreyResearch database (other parts of the application can query this status; for example, if the customer wishes to know whether an order has been shipped).

The following code sample shows how the StatusUpdateJob class defines this logic.

public void Run()
{
  ...
  receiverHandler.ProcessMessages(
    (message, replyTo, token) =>
    {                        
      return Task.Factory.StartNew(
        () =>
        {
          ...
          if (!this.IsValidToken(message.OrderId, token))
          {
            // Throw exception, to be caught by handler.  
            // Will send it to the DeadLetter queue.
            throw new InvalidTokenException();
          }

          var orderStatus = new OrderStatus {
            OrderId = message.OrderId, 
            Status = message.Status 
          };

          using (var db = 
            TreyResearchModelFactory.CreateContext())
          {
            // Checking for duplicate entries in the order 
            // status table.  If a duplicate message 
            // arrives, it is discarded.
            var existingStatus =
              this.sqlCommandRetryPolicy.ExecuteAction(
                () => db.OrderStatus.SingleOrDefault(
                   os => os.OrderId == message.OrderId && 
                   os.Status == message.Status));
            if (existingStatus != null)
            {
              return;
            }

            var order = this.sqlCommandRetryPolicy.
              ExecuteAction(
                () => db.Order.Single(o => 
                   o.OrderId == message.OrderId));
                                         
            order.TransportPartner = 
              message.TransportPartnerName;

            if (message.TrackingId != Guid.Empty)
            {
              order.TrackingId = message.TrackingId;
            }

            db.OrderStatus.AddObject(
              new OrderStatus { 
                OrderId = orderStatus.OrderId, 
                Status = orderStatus.Status, 
                Timestamp = DateTime.UtcNow 
              });

            this.sqlCommandRetryPolicy.ExecuteAction(
              () => db.SaveChanges());
          }
        });
    }, 
  ...);
}

Summary

This chapter has examined how Trey Research implemented the business logic for processing orders in their Orders application. The business logic is based on Service Bus topics, subscriptions, and queues, and uses the software infrastructure described in the Chapter 4, "Implementing Reliable Messaging and Communications with the Cloud," to provide an extensible layer for reliable, asynchronous messaging.

This chapter has also described how Trey Research provided a reliable mechanism for posting messages to a Service Bus topic, detecting and handling failures, and transparently retrying to send messages when they occur.

Finally, this chapter described how the internal business logic for the transport partners was decoupled from the messaging infrastructure utilized by Trey Research; the connectors and adapters defined in the sample solution call upon the existing business services provided by the transport partners without requiring that they modify or disrupt their internal systems. These connectors and adapters also provided location independence by tracking the source of incoming order requests and routing any response messages back to the appropriate destination.

More Information

All links in this book are accessible from the book's online bibliography available at: https://msdn.microsoft.com/en-us/library/hh871440.aspx.

Next Topic | Previous Topic | Home

Last built: June 4, 2012