Freigeben über


Azure Storage Bindings Part 2 – Queues

 

I previously described how the Azure Webjobs SDK can bind to Blobs. This entry describes binding to Azure Queues.   (Binding to Service Bus Queues is not yet implemented)

You can see some more examples for queue usage on the sample site. Here are some supported queue bindings in the Alpha:

Queue Input for BCL types (String, Object, Byte[])

A function can have a [QueueInput]  attribute, which means that the function will get invoked when a queue message is available.

The queue name is either specified via the QueueInput constructor parameter or by the name of the local variable (similar to MVC).  So the following are equivalent:

         public static void Queue1([QueueInput] string queuename)
        {
        }

        public static void Queue2([QueueInput("queuename")] string x)
        {
        }

In this case, the functions are triggered when a new message shows up in “queuename”, and the parameter is bound to the message contents (CloudQueueMessage.AsString).

The parameter type can be string or object (both which bind as CloudQueueMessage.AsString) or byte[] (which binds as cloudQueueMessage.AsBytes)

As an added benefit, the SDK will ensure the queue input message is held until the function returns . It does this by calling UpdateMessage on a background thread while the function is still executing and calls DeleteMessage when the function returns.

Queue Input for user types

The parameter type can also be a your user defined poco type, in which case it is deserialized with JSON.Net.  For example:

     public class Payload
    {
        public int Value { get; set; }
        public string Output { get; set; }
    }
     public static void Queue2([QueueInput("queuename")] Payload x)
    {
    }

That has the same execution semantics as this:

     public static void Queue2([QueueInput("queuename")] string json)
    {
        Payload x = JsonConvert.DeserializeObject<Payload>(json);
        // …
    }

Serialization errors are treated as runtime exceptions which show up in the dashboard.

Of course, a big difference is that using poco type means the QueueInput provides route parameters, which means you can directly use the queue message properties in other parameter bindigs, like so:

         public static void Queue2(
            [QueueInput("queuename")] Payload x,
            int Value, 
            [BlobOutput("container/{Output}.txt")] Stream output)
        {
        }

With bindings like these, it’s possible you don’t even need to use x in the function body.

Queue Output

A function can enqueue messages via [QueueOutput]. It can enqueue a single message via an out parameter. If the value is not null on return, the message is serialized and queued, similar to the rules used for [QueueInput].

Here’s an example of queuing a mesasage saying “payload” to a queue named testqueue.

         public static void OutputQueue([QueueOutput] out string testqueue)
        {
            testqueue = "payload";
        }

In this case, the function doesn’t have any triggers (no [QueueInput] or [BlobInput]), but could still be invoked directly from the host:

         host.Call(typeof(Program).GetMethod("OutputQueue"));

It can enqueue multiple messages via an IEnumerable<T>.  This function queues 3 messages:

         public static void OutputMultiple([QueueOutput] out IEnumerable<string> testqueue)
        {
            testqueue = new string[] {
                "one",
                "two",
                "three"
            };
        }  
 

You could of course bind to multiple output queues. For example, this takes an OrderRequest object as input, logs a history to the “history” queue, and may turn around and enqueue suborders for this order.

         public static void ProcessOrders(
            [QueueInput("orders")] OrderRequest input,
            [QueueOutput("history")] out WorkItem output, // log a history
            [QueueOutput("orders")] out IEnumerable<OrderRequest> children
            )
        {
            
        }

Diagnostics: Navigating between Producers and Consumers

The Azure WebJobs SDK will track who queued a message so you can use the dashboard to see relationships between queue producers and consumers.

For example, consider these functions:

         public static void Producer(
            [QueueOutput("testqueue")] out Payload payload,
            string output, 
            int value            
            )
        {
            payload = new Payload { Output = output, Value = value };
        }

        public static void Consumer([QueueInput("testqueue")] Payload payload)
        {
        }

And say we invoked Producer explicitly via:

         host.Call(typeof(Program).GetMethod("Producer"), new { output = "Hello", value = 15 });

So the Producer() queues a message which is then consumed by Consumer().

We can see Consumer’s execution in the dashboard:

image

And we can see that it was executed because “New queue input message on ‘testqueue’ from Producer()”. So we can click on the link to producer to jump to that function instance:

image

And here, we can see that it was executed because of our explicit call to JobHost.Call. And that a child function of Producer() is Consumer(), so you can navigating both directions.

Note this only works when using non-BCL types. You’ll notice the queue payload has an extra “$AzureJobsParentId” field, which we can easily add in JSON objects.

Binding to CloudMessage SDK Types

You can also bind to the CloudQueue Storage SDK type directly. This provides a backdoor if you need full access to queue APIs that don’t naturally map to model binding.

         [NoAutomaticTrigger]
        public static void Queue1(CloudQueue testqueue)
        {
            var ts = TimeSpan.FromSeconds(400);
            testqueue.AddMessage(new CloudQueueMessage("test"), ts);            
        }

It’s still in Alpha, so there are some known issues we’re working through, such as:

  • Alpha 1 does not support binding to CloudQueueMessage directly.
  • better bindings for outputs
  • more customization round serialization for non-BCL types.
  • Better user control over the QueueInput listening