Photo Mosaics Part 5: Queues
Our look at the internals of my Azure Photo Mosaics program continues with this coverage of the use of Windows Azure Queues in the application. Windows Azure Queues are designed to facilitate inter-role communication within your Windows Azure applications, allowing you to decouple processing and thereby scale portions of the application independently. The Photo Mosaics program makes use the four Windows Azure queues highlighted in the architecture diagram below:
As you can see, each of the queues is positioned between two of the web or worker roles that handle the processing within the application.
- The imagerequest queue is where the ClientInterface Web Role (specifically the Job Broker service) places a message to initiate a new request on behalf of the client.
- The slicerequest queue is where the JobController Worker Role dispatches potentially multiple messages after ‘slicing’ up the original image into n-pieces, where n is the number of slices specified by the end user.
- The sliceresponse queue is where the ImageProcessor Worker Role dispatches messages for each completed slice of the mosaic as it’s generated.
- The imageresponse queue is where the JobController Worker Role dispatches a single message corresponding to the completion of an end user’s request.
As you can see the queues are paired: imagerequest and imageresponse each contain one message corresponding, respectively, to the initiation and completion of a client request. slicerequest and sliceresponse contain possibly multiple messages for each client request (depending on the value of ‘number of slices’ input by the user), but there is a one-to-one correspondence between messages in slicerequest and sliceresponse.
Windows Azure Queue Primer
Windows Azure Queues are one of the three main types of storage available within a Windows Azure Storage account (and in previous blog posts we looked at the use of tables and blobs in the context of the Photo Mosaics application). You can create an unlimited number of queues per storage account, and each queue can store an unlimited number of messages (up to the 100TB limit of a storage account, of course). Additionally queues have the following restrictions/attributes:
Messages are a maximum of 8KB in size. Note that this is the size of the message after Base-64 encoding, not the size of raw bytes comprising the message. For larger message requirements, the common design pattern is to store the larger data in Windows Azure blobs (or tables or even SQL Azure) and include a lookup key to that storage in the queue message itself.
Like blobs, queues can have metadata associated with them – in the form of name/value pairs - up to 8KB per queue.
Messages are NOT always processed in first-in/first-out (FIFO) order. This may be counterintuitive for graduates of Data Structures 101, but it makes sense when you look at how guaranteed message delivery is ensured.
When a message is read from the queue, it is no longer visible to other roles that might be monitoring that queue, but it may reappear if it is not successfully processed by the role that has read it from the queue. For instance, suppose the ImageProcessor (cf. the architecture diagram above) grabs a message from the slicerequest queue, and the role VM crashes because of a hardware failure or just gets rebooted as part of a regular OS patching cycle in the Windows Azure Data Center. The message is no longer on the queue, but it was also not successfully processed by the VM that dequeued it – now what, is it lost forever?
Nope! To accommodate this contingency, the consumer of a message from a queue must explicitly delete the message from the queue after it has finished processing it. If the deletion does not occur within an allotted time – the invisibility timeout – the message reappears on the queue. The default timeout is 30 seconds, but it can be as much as 2 hours, and is specified at the point the message is retrieved (via GetMessage when using the Storage Client API).
That then leads to the question of what happens if the message was processed correctly, but the role processing it crashed in the instant immediately before it was to explicitly delete the message. Won’t the message reappear to potentially be processed again? Yes, indeed, and that’s something you have to plan for. Essentially, you need to make sure that the operations you perform are idempotent, meaning that regardless of whether the message is processed once or multiple times, the outcome is identical.
But what if there’s a bug, and the message is never processed successfully? Won’t it continually reappear on the queue only to be part of a vicious cycle of failure – a poison message if you will? That’s where two additional message attributes come in:
-
The DequeueCount property specifies how many times a specific message has been fetched from the queue. To guard against poison messages, you can set a threshold value for how many times you want to retry a message before treating it as a poison message. When the DequeueCount exceeds this value, explicitly delete the message, log the occurrence, and take whatever other corrective action you deem appropriate in the application.
Keep in mind that if the invisibility timeout value is too small in comparison to the time it takes to process the message, it’s possible this same scenario will result even though the message itself isn’t ‘poisoned’ per se.
All messages also have a time-to-live (TTL) value (cf. ExpirationTime) that is specified when they are first put on the queue (via the AddMessage method if you’re using the Storage Client API). By default, the TTL is seven days, and that’s also the maximum value. If the message isn’t explicitly deleted before the TTL expires, the Windows Azure storage system will delete and garbage collect it.
queues can handle approximately
- 500 requests per second per queue
- 5000 requests per second across all queues within a single Windows Azure storage account
For the level of activity in the Photo Mosaics application, these limits well exceed the expected load; however, 500 messages for a large application or even a small one that responds to real-time stimuli (like sensors) is well within the realms of possibility. In scenarios like this where you need to scale your queue storage, you’ll want to consider load balancing over multiple queues or even multiple queues in multiple storage accounts to get the throughput you need.
Photo Mosaics Queue Abstraction
For the Photo Mosaics application, I’ve built a few abstractions over Windows Azure queues and messages that may initially seem overly complex, but actually help simplify and standardize their usage within the Web and Worker roles.
In a previous post on the storage architecture, I’ve already introduce the QueueAccessor class as part of a data access layer (CloudDAL). QueueAccessor is a static class that maintains a reference to the four queues used in the application, namely, imagerequest, imageresponse, slicerequest, and sliceresponse. The references are of a new type, ImageProcessingQueue, which I’ve defined in the Queue.cs file within the CloudDAL:
ImageProcessingQueue has the following members:
- Name – the queue name,
- PoisonThreshold – the number of times a message can be dequeued before it’s treated as a poisoned message and immediately deleted,
- Timeout – the default invisibility timeout for messages retrieved from the queue,
- RawQueue – a reference to the CloudQueue object abstracted by this given instance of ImageProcessingQueue.
- AcceptMessage<T> – a generic method used to retrieve messages of type T from the given queue,
- SubmitMessage<T> – a generic method used to submit messages of type T to the given queue.
Similar to my use of metadata for blobs, each ImageProcessingQueue instance is created with two pieces of metadata that are used to set the PoisonThreshold and Timeout:
1: internal ImageProcessingQueue(CloudQueueClient queueClient, String queueName)
2: {
3: this.Name = queueName;
4: this.RawQueue = queueClient.GetQueueReference(queueName);
5:
6: // fetch the queue's attributes (metadata)
7: this.RawQueue.FetchAttributes();
8: String timeout = this.RawQueue.Metadata["defaulttimeout"];
9: String threshold = this.RawQueue.Metadata["poisonthreshold"];
10:
11: // pull out queue-specific timeout/poison message retry value (or set to defaults)
12: Int32 i;
13: this.Timeout = (Int32.TryParse(timeout, out i)) ? new TimeSpan(0, 0, i) : new TimeSpan(0, 0, 30);
14: this.PoisonThreshold = (Int32.TryParse(threshold, out i)) ? i : Int32.MaxValue;
15: }
AcceptMessage and SubmitMessage are essentially inverse operations that wrap the GetMessage and AddMessage APIs by providing some exception handling and deferring message-specific handling to two generic methods defined on QueueMessage, a class which we’ll discuss in the next section.
Photo Mosaics Queue Message Abstraction
Similar to ImageProcessingQueue’s encapsulation of the CloudQueue reference, a new class, QueueMessage (in Messages.cs of the CloudDAL), wraps a reference to CloudQueueMessage and augments it with application-specific functionality. That class serves as the abstract ancestor of the four distinct message types uses in the Photo Mosaics application – ImageRequestMessage, ImageResponseMessage, SliceRequestMessage, SliceResponseMessage - each of which is handled by exactly one of the similarly-name queues.
Each of the four messages shares a common set of three fields that ultimately appear in every message’s payload:
- _clientId: a string ID uniquely identifying the client that made the request. In the Windows Forms application, the user’s SID is used, but this field could be extended to be an e-mail address or an application-specific user name. The field is also exposed as the ClientId property.
- _requestId: a GUID assigned to each request to convert an image into a mosaic. The GUID is used to re-name the original image as well as slices of the image as they are stored as blobs during the processing. This field is also exposed as the RequestId property.
- _queueUri: the full URI identifying the queue to which this message was associated. Though this may seem redundant – after all we must know the queue in order to retrieve the message – it becomes necessary for processing messages generically. In particular, given a reference to a QueueMessage alone you cannot delete that message, since DeleteMessage is a method of a CloudQueue not of a CloudQueueMessage. The _queueUri field is used along with the FindQueue message of QueueAccessor to get a reference to the containing queue and store that as the Queue property of the QueueMessage instance.
The four concrete implementations of QueueMessage add additional properties that are specific to the task represented by the message. Those fields are summarized below:
ImageRequestMessage | ||
ImageUri | reference to the original image now stored as a blob in the imageinput container | |
ImageLibraryUri | reference to a blob container that holds an image library of tiles used to construct the mosaic | |
TileSize | size in pixels of each tile (from 1x1 to 64x64 square) | |
Slices | number of slices into which the original image should be divided to split processing over multiple roles in Windows Azure (the end-user currently provides this value) | |
SliceRequestMessage | ||
ImageLibraryUri | reference to a blob container that holds an image library of tiles used to construct the mosaic | |
SliceUri | reference to an unprocessed slice of the original image now stored in the slicerequest container | |
SliceSequence | a zero-based index identifying which slice of the original image the current message payload represents | |
TileSize | size in pixels of each tile (from 1x1 to 64x64 square) | |
SliceResponseMessage | ||
SliceUri | reference to a processed slice of the original image now stored in the sliceresponse container | |
ImageResponseMessage | ||
ImageUri | reference to the completed mosaic image now stored in the imageoutput container | |
Each of these classes implements a Parse method which handles the conversion of the message payload (which is just a string) into the fields and properties for the specific message type. If a message payload fails to parse, a custom exception of type QueueMessageFormatException (also defined in Messages.cs) is thrown. They Payload property is the inverse of the Parse method and formats the actual string payload for the message based on the property values of the specific instance of QueueMessage.
With this infrastructure, all of the message processing within the application can be handled with two generic methods (AcceptMessage and SubmitMessage), versus having to spread message-specific processing across each implementation of the various Windows Azure roles. Maintenance and enhancements are also simplified since modifying the structure of a message requires changes only to the appropriate descendant of the QueueMessage class.
Pulling it all together…
ImageProcessingQueue.AcceptMessage<T> and QueueMessage.CreateFromMessage<T> work in tandem to pull a message (of type T) off of the queue and parse the payload into the appropriate descendant of the QueueMessage class.
In the AcceptMessage case below, a message is retrieved from the queue (GetMessage in Line 6) and the payload parsed, via CreateFromMessage, to return a strongly-typed message of the appropriate type.
1: public T AcceptMessage<T>() where T : QueueMessage, new()
2: {
3: T parsedMsg = default(T);
4: if (this.RawQueue.Exists())
5: {
6: CloudQueueMessage rawMsg = this.RawQueue.GetMessage(this.Timeout);
7: if (rawMsg != null)
8: {
9: try
10: {
11: parsedMsg = QueueMessage.CreateFromMessage<T>(rawMsg);
12: }
13: catch (QueueMessageFormatException qfme)
14: {
15: // exception handling elided for brevity
16: }
17: }
18: }
19: return parsedMsg;
20: }
Note that CreateFromMessage assumes the three fields shared by every message appear in a specific order at the beginning of the payload (Lines 17-19), followed by the message-specific properties (in _components in Line 22)
1: internal static T CreateFromMessage<T>(CloudQueueMessage rawMsg) where T : QueueMessage, new()
2: {
3: // check if message parameter is valid
4: if ((rawMsg == null) || String.IsNullOrEmpty(rawMsg.AsString))
5: throw new ArgumentNullException("rawMsg", "No message data to parse");
6:
7: // create a new message instance
8: T newQueueMessage = new T();
9: newQueueMessage.RawMessage = rawMsg;
10:
11: // split message payload into array
12: String[] s = newQueueMessage.RawMessage.AsString.Split(MSG_SEPARATOR);
13:
14: // first element is queue URI
15: if (s.Length >= 3)
16: {
17: newQueueMessage._queueUri = s[0];
18: newQueueMessage._clientId = s[1];
19: newQueueMessage._requestId = s[2];
20:
21: // split payload array into components
22: newQueueMessage._components = s.Skip(3).ToList();
23:
24: // parse into strongly typed message fields
25: newQueueMessage.Parse();
26: }
27: else
28: {
29: throw new QueueMessageFormatException( "Message is missing one or more required elements (queueUri, userId, requestId)");
30: }
31:
32: // return the new message instance
33: return newQueueMessage;
34: }
When a message is placed on a queue, an inverse operation occurs via ImageProcessingQueue.SubmitMessage<T> and QueueMessage.CreateFromArguments<T> . In Line 8 below, you can see that CreateFromArguments accepts the clientId and requestId as parameters as well as the specific queue’s URI – these, again, are the three properties that are part of all messages in the Photo Mosaics application. The message-specific properties are passed in the params argument.
1: public void SubmitMessage<T>(String clientId, Guid requestId, params object[] parms) where T : QueueMessage, new()
2: {
3: T parsedMsg = default(T);
4:
5: this.RawQueue.CreateIfNotExist();
6: try
7: {
8: parsedMsg = QueueMessage.CreateFromArguments<T>( this.RawQueue.Uri, clientId, requestId, parms);
9: this.RawQueue.AddMessage(new CloudQueueMessage(parsedMsg.Payload));
10: }
11: catch (QueueMessageFormatException qfme)
12: {
13: // exception handling code elided for brevity
14: }
15: }
1: internal static T CreateFromArguments<T>(Uri queueUri, String clientId, Guid requestId, params object[] parms) where T : QueueMessage, new()
2: {
3: T newQueueMessage = new T();
4:
5: // pull arguments into payload arrays
6: newQueueMessage._queueUri = queueUri.ToString();
7: newQueueMessage._clientId = clientId;
8: newQueueMessage._requestId = requestId.ToString();
9: newQueueMessage._components = (from p in parms select p.ToString()).ToList<String>();
10:
11: // parse into strongly typed message fields
12: newQueueMessage.Parse();
13:
14: // return the new message instance
15: return newQueueMessage;
16: }
What we’ll see in the next post is how code in the web and worker roles of the application leverage AcceptMessage, SubmitMessage, and the QueueMessage instances to carry out the workflow of the Photo Mosaics application.
Key Takeaways
I realize that was quite of lot of technical content to absorb, and if you’re really interested in understanding it, you’re likely poring over the code now. If I lost you as soon as you scrolled past the first screen – no worries. I feel the biggest takeaway from this post is that creating a flexible and fault-tolerant infrastructure for handling your messaging between the various roles in your application is paramount. The time you spend doing that will reap rewards later as you’re building out the processing for the application and as new requirements and unforeseen challenges surface. I don’t suggest that the framework I’ve set up here is ideal; however, some sort of framework is recommended, and hopefully this post has provided some food-for-thought.
Lastly, we haven’t talked much yet about diagnostics or monitoring, but it should be apparent that queues are one of the more significant barometers of how well the application is running in Windows Azure. If the queue length gets too high, it may mean that you need additional web or worker roles to handle the additional requests. On the other hand, if there’s never any wait time, perhaps the load is so light that you could dial down some of the roles you have spun up (and save some money). We’ll look at Windows Azure diagnostics and tracing in a later blog post, and it’s through that mechanism that you can keep tabs on the application’s health and even respond automatically to bursts or lulls in activity made manifest by changes in queue length.