Make Serverless Music - Orchestrate your workflow with Azure [Part 3 - Azure Functions]
At this point we've seen the declarative way to define a workflow orchestration using both Microsoft Flow and Azure Logic Apps but hey, this is .Netitude and here we write .Net, right? Let's get to it then!
You've likely already heard of Azure Functions (otherwise hey click here!), Azure's serverless web execution allowing you to do thousands, even millions, of executions for merely pennies per run. In many cases you can get away with a serverless implementation completely free!
With that in mind, let's implement our solution from earlier in this series in Azure Functions. As a reminder here are the requirements:
When I receive a request to my Function, I want to
- Validate the schema of the incoming JSON request.
- Validate the content of the JSON request against business rules.
- Return a response indicating which validations failed and which ones passed.
We've defined our schema as a Person schema like so:
{
"Name" : {
"First" : "John",
"Last" : "Doe",
"Title" : "Mr"
},
"Address" : {
"Line1" : "1234 Anywhere St.",
"Line2" : null,
"City" : "Somewhere",
"State" : "OR",
"Zip" : "12345",
"Country" : "United States of America"
}
}
For our business rules, we'll enforce:
- First & Last name must be present & longer than 1 character
- If Address Line 2 is not null, Line 1 must also not be null
- City must be present
- State must be present & only a 2-letter abbreviation
- Zip must be present and 5 characters
Step 1: Create an Azure Functions project
If you haven't already, install VS 2017 (Community Edition is fine) with the Azure Workload option.
Next,
- Click File | New Project
- Choose Cloud | Azure Function
- Choose Azure Functions v2, HTTP Trigger, Storage Emulator, Function authentication
Step 2: Validate the schema
Like most things, Visual Studio makes this cake with the 'Paste JSON as code' feature. Simply highlight the JSON above, go to VS 2017, create a new class file (let's call it Person.cs
), click 'Edit' | 'Paste Special' | 'Paste JSON as classes' and voila:
namespace Functions
{
public class Rootobject
{
public Name Name { get; set; }
public Address Address { get; set; }
}
public class Name
{
public string First { get; set; }
public string Last { get; set; }
public string Title { get; set; }
}
public class Address
{
public string Line1 { get; set; }
public object Line2 { get; set; }
public string City { get; set; }
public string State { get; set; }
public string Zip { get; set; }
public string Country { get; set; }
}
}
for readability, rename Rootobject
to Person
. Because we put null
in the sample for Line2
, we need to fix that up by changing it to
public *string* Line2 { get; set; }
Back in Function1
now, we can enforce the validation of the incoming JSON by simply telling the HttpTrigger binding that it should be of type Person
instead of HttpRequest
:
public static IActionResult Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]Person req, TraceWriter log)
Let's also clean this up by allowing only POST requests, not GET requests:
public static IActionResult Run([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]Person req, TraceWriter log)
and finally, let's name the function Validate
:
[FunctionName("Validate")]
public static IActionResult Run([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]Person req, TraceWriter log)
Now our function will ensure any incoming POST request to /Validate has a body that can be deserialized to our Person
object.
Step 3: Run the validations
"Chained" approach
Our validations are stored in an array (more than one can fail) so let's fab that code up now and see how to return it as part of an HTTP 200 response from the Function:
var errors = new List<Error>();
return new OkObjectResult(errors);
Where we've defined Error
to be:
private class Error
{
public int id { get; set; }
public string message { get; set; }
};
Functions will, by default, return JSON payloads for all types output to the *ObjectResult
types returned from a Function method.
Obviously the validations we have can be done in straight .Net pretty simply:
if ((req?.Name?.First?.Length > 1) == false)
{
errors.Add(new Error { id = 1, message = "First name is null or not longer than 1 character" });
}
if ((req?.Name?.Last?.Length > 1) == false)
{
errors.Add(new Error { id = 2, message = "Last name is null or not longer than 1 character" });
}
if (!string.IsNullOrEmpty(req?.Address?.Line2) && string.IsNullOrEmpty(req?.Address?.Line1))
{
errors.Add(new Error { id = 3, message = "Address line 2 is populated but line 1 is empty" });
}
if (string.IsNullOrEmpty(req?.Address?.City))
{
errors.Add(new Error { id = 4, message = "City is empty" });
}
if ((req?.Address?.State?.Length == 2) == false)
{
errors.Add(new Error { id = 5, message = "State is null or not a 2-letter abbreviation" });
}
if ((req?.Address?.Zip?.Length == 5) == false)
{
errors.Add(new Error { id = 6, message = "Zip is null or not 5 digits" });
}
and it works splendidly:
POST /api/Validate HTTP/1.1
Host: localhost:7071
Content-Type: application/json
Cache-Control: no-cache
Postman-Token: db88214a-8699-4766-bd48-d1a6d1fd495b
{
"Name" : {
"First" : "J",
"Last" : "Doe",
"Title" : "Mr"
},
"Address" : {
"Line1" : "1234 Anywhere St.",
"Line2" : null,
"City" : "Somewhere",
"State" : "OR",
"Zip" : "12345",
"Country" : "United States of America"
}
}
Content-Type →application/json; charset=utf-8
Date →Wed, 30 May 2018 04:30:29 GMT
Server →Kestrel
Transfer-Encoding →chunked
[
{
"id": 1,
"message": "First name is null or not longer than 1 character"
}
]
Fan out/in approach
The previous code roughly corresponds to the sequential Flow set up in the first post of this series. But let's be real, we'd most certainly be doing something a bit more complex than these operations in real life, right? So let's async
this work, and go on to extend it even further.
The body of our Function becomes:
var errors = new List<Error>();
// Fan Out
var checks = new List<Task<Error>>
{
CheckFirstName(req, errors),
CheckLastName(req, errors),
CheckAddressLines(req, errors),
CheckCity(req, errors),
CheckState(req, errors),
CheckZip(req, errors)
};
// wait for all checks to complete (Fan In)
await Task.WhenAll(checks);
// Add any non-null values to our errors list
errors.AddRange(checks.Select(t => t.Result).Where(r => r != null));
return new OkObjectResult(errors);
In this approach we kick off all the Check*
methods to do their work and add the resulting Task<Error>
object to a collection which we then wait for all tasks to complete. Once they do, any that have a non-null result have returned an Error
object and we should add that to our list to return back to the caller as the JSON body of our HTTP 200 response.
Each Check*
method looks like this:
private static async Task<Error> CheckZip(Person req)
{
await Task.Delay(1000);
if ((req?.Address?.Zip?.Length == 5) == false)
{
return new Error { id = 6, message = "Zip is null or not 5 digits" };
}
return null;
}
private static async Task<Error> CheckState(Person req)
{
await Task.Delay(1100);
if ((req?.Address?.State?.Length == 2) == false)
{
return new Error { id = 5, message = "State is null or not a 2-letter abbreviation" };
}
return null;
}
private static async Task<Error> CheckCity(Person req)
{
await Task.Delay(1200);
if (string.IsNullOrEmpty(req?.Address?.City))
{
return new Error { id = 4, message = "City is empty" };
}
return null;
}
private static async Task<Error> CheckAddressLines(Person req)
{
await Task.Delay(1300);
if (!string.IsNullOrEmpty(req?.Address?.Line2) && string.IsNullOrEmpty(req?.Address?.Line1))
{
return new Error { id = 3, message = "Address line 2 is populated but line 1 is empty" };
}
return null;
}
private static async Task<Error> CheckLastName(Person req)
{
await Task.Delay(1400);
if ((req?.Name?.Last?.Length > 1) == false)
{
return new Error { id = 2, message = "Last name is null or not longer than 1 character" };
}
return null;
}
private static async Task<Error> CheckFirstName(Person req)
{
await Task.Delay(1500);
if ((req?.Name?.First?.Length > 1) == false)
{
return new Error { id = 1, message = "First name is null or not longer than 1 character" };
}
return null;
}
Notice that I've put in Task.Delay()
calls to artificially make the execution of the methods take some time; emulating real-world behavior.
What if our calls were other Azure Functions, though? Well, not a whole lot changes except for the signatures of our validation methods and how we call them. Let's take a look:
[FunctionName("CheckZip")]
public static async Task<IActionResult> CheckZip([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]Person req)
{
await Task.Delay(1000);
if ((req?.Address?.Zip?.Length == 5) == false)
{
return new OkObjectResult(new Error { id = 6, message = "Zip is null or not 5 digits" });
}
return new NoContentResult();
}
We label the method as a Function, denote it's triggered via HTTP, and return an Http IActionResult
instead of an object.
We also need to add an HttpClient to our Function:
private static HttpClient _client = new HttpClient { BaseAddress = new System.Uri(@"https://localhost:7071/api/") };
I set the BaseAddress here to make the
PostAsync()
calls cleaner later
With those changes made to all the Check methods, let's look at the body of our "orchestrator" and how it changes:
var errors = new List<Error>();
var personPostContent = new StringContent(JsonConvert.SerializeObject(req), Encoding.Default, @"application/json");
// Fan Out
var checks = new List<Task<HttpResponseMessage>>
{
_client.PostAsync(@"CheckFirstName", personPostContent),
_client.PostAsync(@"CheckLastName", personPostContent),
_client.PostAsync(@"CheckAddressLines", personPostContent),
_client.PostAsync(@"CheckCity", personPostContent),
_client.PostAsync(@"CheckState", personPostContent),
_client.PostAsync(@"CheckZip", personPostContent)
};
// wait for all checks to complete (Fan In)
var responses = await Task.WhenAll(checks);
// Add any non-null values to our errors list
foreach (var response in responses)
{
var err = await response.Content.ReadAsAsync<Error>();
if (err != null)
{
errors.Add(err);
}
}
return new OkObjectResult(errors);
I wasn't able to get
PostAsJsonAsync
methods onHttpClient
to work successfully; the payload kept coming to the Functions asnull
instead of being properly serialized/deserialized to aPerson
object. Hence the need for thepersonPostContent
line.
Debugging the Function now, you see many Functions indexed by the host, and their execution all kicked off in parallel. But at the end of the day they all "fan back in" and we process their results and return it back to the caller.
There's are a couple of problems with this implementation, though.
- We're now distributed across Azure Functions or even disparate web services. This means latency problems, connection issues, and the like.
- Azure Functions, in the Consumption plan, is limited to 5-minutes per execution by default (you can increase it to 10 if you like with some host.json editing). What if our total orchestration takes longer than this?
How can we solve these?
Tackling it with Logic Apps
One way to handle this issue is to orchestrate our Function calls with Azure's purpose-build orchestration offering: Logic Apps. As you might expect, its integration with Functions is first-class; let's take a look.
First, deploy the Functions we wrote above (the orchestrator and all the sub-task functions) out to Azure so we can use them from our Logic App.
Next, create a new Logic App in Azure, and head to the designer surface.
The first step of our Logic App is going to be the same as it was for Flow (and the subsequent LA import); a 'Request' trigger with JSON schema in it matching what we expect from our caller. If you did the Microsoft Flow part of this series, it should look familiar when you're done:
Next, let's orchestrate the call to the first Function. Click 'Add an action' and type 'functions' in the search area. Choose 'Azure Function - Choose an Azure Function'
Next, choose the Azure Functions instance to which you deployed all the functions, and select the first of our validation functions:
In the request body for this function, you'll pass the same body that came in to the Logic App:
Using the same actions as before, we need to create a variable, initialize it to empty, then append the responses from each function on to it and, finally, send that back as the Response from the Logic App. By the time you're done, it should look like this:
"That's great" you might be wondering, "but how does this help the retry/reliability of my orchestration?"
For a glimpse in to this, click the '...' in the upper right of any of the Functions actions and choose 'Settings'
Notice the 'Timeout' and 'Retry Policy' options. Also make note of the 'Asynchronous pattern' handling built-in. With these 3 options, you can achieve extreme resilience in your orchestration. I encourage you to try them out as you experiment more with serverless workflow orchestration.
Logic Apps enforces a limit of 120s on any single HTTP-based action. So your orchestration steps must come back w/in that timeframe or LA will kill them.
Messaging approach
There's another way we could string our Azure Functions together, or fan out/in their execution, using Azure's Messaging offerings: Service Bus, Event Hubs, and Event Grid. For reliable orchestration, I recommend going with Service Bus as it provides queues with de-duplication, partitioning, and dead-lettering capabilities. In general, SB is recommended for "transactional" systems where delivery is critical. Since we wouldn't want any part of our orchestration to fall over, it fits here.
The usual design of this type of setup is to put "messages" for each Azure Function in to a Service Bus queue and set up each "worker" of the orchestration to pick off their own queue for work. This isn't too complex to set up, but the back side of our orchestration - fanning back in from a fan out design - can be. Consider this: How do you ensure that an execution that's part of a fan out comes back in to the same execution?
However, if you're not doing parallel execution, then the path is more simple; you just put things in queue A for worker A, then worker A puts them in queue B for worker B and so on until you're done and you put the result in a queue for the "receiver" which ends the orchestration.
In both cases, you will be making use of the "Asynchronous Pattern" shown off by Logic Apps where you return an HTTP 202 Accepted back to the caller with a 'Location' endpoint they can "ping" to get back the result. You can read more about the async pattern served by HTTP 202 here at the REST cookbook.
Azure Function Bindings make the "serial" orchestration really easy: the "starter" function usually binds to HttpTrigger
and has a ServiceBus
output binding. All subsequent functions have a ServiceBusTrigger
and ServiceBus
output binding. The final function has a ServiceBusTrigger
and another output binding in which to "store" the final result. Optionally, this last function could output to another endpoint (callback, Event Grid, etc) with the final result.
Typically this approach is implemented as follows:
- "Start" Function receives a request
- Decorate the request with a unique ID (eg: GUID)
- For the "serial" execution path:
- Queue the payload of the request (with new Unique ID) in to a Service Bus queue for Function 1
- Function 1 does work, queues for Function 2
- ...
- Function N does work, drops result where
/result/<id>
will find it
- For the "parallel" (fan out/in) execution path
- Enqueue the payload of the request (with new Unique ID) in to Service Bus queues for Functions 1...N
- Function X does work on the payload, drops result where
/result/<id>
will find it - Important: In this case, a call to
/result/<id>
only returns once it has information from Functions 1...N available
- Send back 202 with Location of
/result/<id>
The operation of the
/result/<id>
endpoint is up to you. I like to do something where it returns a 2xx code (but not 200) until the result is there, then returns 200 with the result when it's available. Ultimately the logic for what to do w/ the response from/result/<id>
will be implemented by the caller that kicked off the Start function, so pick your poison here.
For the sake of brevity, I'll walk through coding up a Function that gets a request & drops the message, decorated with ID, in to a Service Bus queue and then another Function which picks up off the queue, does work, and drops the result in to Table Storage. Let's get started!
Step 1 - Create, of course, a Service Bus instance
Use the Standard tier for now.
Step 2 - Create a queue for each worker function in our workflow
It's up to you how you want to set up your queue but I would recommend enforcing duplicate detection and dead lettering upon expiration.
With dead-lettering on, the usual practice is to have a "watchdog" on the dead-letter box for any SB queue and do something with things that end up in there. You could re-queue them in to the same queue, log something, any number of things.
I've found sessions less-than-easy to work with inside Functions so, while they'd be something we would typically use for this approach (set the session id on the message to the unique id the orchestration function generates), I won't be using them in this sample.
Now we're ready for
Step 3 - Set the Orchestrator to drop messages in to the queue
To get this started, let's take the Validate
Function and get its bindings and return values set up for the procedure I outlined above.
First, add a [ServiceBus]
parameter like so:
public static async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]Person req,
[ServiceBus(@"function-a", Connection = @"ServiceBusOrchestrationConnection")]out QueueMessage functionAmsg,
TraceWriter log)
The ServiceBus connection string must not contain any queue/topic information; it needs to be to the top level Service Bus instance or you'll get a runtime error. You can get this connection string by going to the Service Bus instance, clicking the
RootManageSharedAccessKey
, then copying either of the Connection Strings available. If you want to make your own Shared Access Key, that's fine too but it needs to have Manage permissions so the trigger can wire up as a listener and the output binding can send. Separate keys w/ these permissions are also acceptable.
What you'll immediately find is the binding for one parameter can only go to one queue. This means you either need to use multiple output parameters (eg: functionAmsg
, functionBmsg
, functionCmsg
...) or forego using the bindings altogether and just use a ServiceBus client object in your Function; I'll leave that up to you.
The other thing you'll notice is a new type, QueueMessage
, that I've injected here. I did this so I had a place to put the unique id I'll assign the messages for retrieval later. This class is simple, and looks like this:
public class QueueMessage
{
public string id { get; set; }
public Person data { get; set; }
}
Now, when a new request comes in to the trigger (orchestrator), it needs to create a new guid, put it in the id
property of a new QueueMessage
object, and send that to our ServiceBus instance. Thanks to our bindings, this is just a handful of code:
var id = Guid.NewGuid().ToString();
log.Info($@"New orchestration started: {id}");
functionAmsg = new QueueMessage { id = id, data = req };
and off it goes.
But we also want to give back an HTTP 202 ACCEPTED response with a new function URL the caller can use to get the result
return new AcceptedResult($@"/api/GetResult?id={id}", functionAmsg);
does the trick nicely (we'll implement this later);
Next, let's configure the CheckFirstName
Function to get messages from the new ServiceBus queue we've set up. In my configuration I called it function-a
.
This changes our HttpTrigger
for this function to ServiceBusTrigger
and looks like so:
[FunctionName("CheckFirstName")]
public static IActionResult CheckFirstName([ServiceBusTrigger(@"function-a", Connection = @"ServiceBusOrchestrationConnection")]QueueMessage msg,
TraceWriter log)
{
if ((msg?.data?.Name?.First?.Length > 1) == false)
{
// store the error result
}
else
{
// store an empty error result
}
}
For storage, we could choose to spin up a CosmosDB instance and store the JSON as documents there, or we could use the Table Storage that our Azure Functions already require (check out the AzureWebJobsStorage
app setting). Functions makes both of these equally easy thanks to bindings, but for simplicity let's just go w/ the Table Storage we already have as part of our Functions.
To configure this as output for our CheckFirstName
Function, change its signature to this:
public static void CheckFirstName([ServiceBusTrigger(@"function-a", Connection = @"ServiceBusOrchestrationConnection")]QueueMessage msg,
[Table(@"checkfirstnameoutput", Connection = @"AzureWebJobsStorage")]out ErrorTableEntity err,
TraceWriter log)
Notice we had to create a type that inherits from ITableEntity
in order to use this. Our new ErrorTableEntity
type simply wraps up our previous one:
public class ErrorTableEntity : TableEntity
{
public ErrorTableEntity() { }
public ErrorTableEntity(Error err, string sessionId)
{
this.PartitionKey = this.RowKey = sessionId;
this.Error = err;
}
public Error Error { get; set; }
}
With this we can now fill out the rest of our CheckFirstName
Function:
log.Info($@"Message received: {msg.id}");
if ((msg?.data?.Name?.First?.Length > 1) == false)
{
err = new ErrorTableEntity(new Error { id = 1, message = "First name is null or not longer than 1 character" }, msg.id);
log.Info($@" - Error logged: {err.Error.message}");
}
else
{
err = new ErrorTableEntity(new Error { id = 0 }, msg.id);
log.Info($@" - NoError logged");
}
return new OkResult();
So at this point, with a request to our HTTP-triggered function we should see:
- The message gets decorated with a new GUID
- It's put in to the pipeline for our
CheckFirstName
function CheckFirstName
kicks off with that message- ... processes its validity
- ... drops the appropriate error in Table Storage for our session
The logging messages we put in place will help trace things. If you haven't already, now's a good time to go download the Azure Storage Explorer so you can have a look at the table storage in the emulator (if that's what you're local.settings.json is pointing to) when we test this out next!
Let's give 'er a whirl and see what happens!
Postman sends:
POST /api/Validate HTTP/1.1
Host: localhost:7071
Content-Type: application/json
Cache-Control: no-cache
Postman-Token: b995b4d7-63fb-4b12-b021-4f469089c61c
{
"Name" : {
"First" : "J",
"Last" : "Doe",
"Title" : "Mr"
},
"Address" : {
"Line1" : "1234 Anywhere St.",
"Line2" : null,
"City" : "Somewhere",
"State" : "OR",
"Zip" : "12345",
"Country" : "United States of America"
}
}
And gets back:
Content-Type →application/json; charset=utf-8
Date →Thu, 31 May 2018 04:57:30 GMT
Location →/api/GetResult?id=d6b07382-3fdb-4f06-b9a9-ab0fbdf15594
Server →Kestrel
Transfer-Encoding →chunked
{
"id": "d6b07382-3fdb-4f06-b9a9-ab0fbdf15594",
"data": {
"Name": {
"First": "J",
"Last": "Doe",
"Title": "Mr"
},
"Address": {
"Line1": "1234 Anywhere St.",
"Line2": null,
"City": "Somewhere",
"State": "OR",
"Zip": "12345",
"Country": "United States of America"
}
}
}
And our Function spits out
[5/31/2018 5:04:50 AM] Executing 'Validate' (Reason='This function was programmatically called via the host APIs.', Id=39ff9db3-03b6-4252-8429-0bd30f4ed6a1)
[5/31/2018 5:04:50 AM] New orchestration started: d6b07382-3fdb-4f06-b9a9-ab0fbdf15594
[5/31/2018 5:04:51 AM] Executed 'Validate' (Succeeded, Id=39ff9db3-03b6-4252-8429-0bd30f4ed6a1)
[5/31/2018 5:04:51 AM] Executing 'CheckFirstName' (Reason='New ServiceBus message detected on 'function-a'.', Id=9665e73b-f945-48df-82a5-8258d45b4787)
[5/31/2018 5:04:51 AM] Message received: d6b07382-3fdb-4f06-b9a9-ab0fbdf15594
[5/31/2018 5:04:54 AM] - Error logged: First name is null or not longer than 1 character
[5/31/2018 5:04:55 AM] Executed 'CheckFirstName' (Succeeded, Id=9665e73b-f945-48df-82a5-8258d45b4787)
And if we take a look in our Storage account:
A row for our session, with its output! Now we can write our GetResult
function like this:
[FunctionName("GetResult")]
public static async Task<IActionResult> GetResultAsync([HttpTrigger(AuthorizationLevel.Function, "get", Route = null)]HttpRequest req, [Table(@"checkfirstnameoutput", Connection = @"AzureWebJobsStorage")]CloudTable firstnameoutputTable)
{
string targetId = null;
if (req.GetQueryParameterDictionary()?.TryGetValue(@"id", out targetId) == true)
{
var queryResult = await firstnameoutputTable.ExecuteAsync(TableOperation.Retrieve<ErrorTableEntity>(targetId, targetId));
if (queryResult.Result != null)
{
return new OkObjectResult(queryResult.Result);
}
return new NotFoundResult();
}
else
{
return new BadRequestObjectResult(@"'id' parameter is required");
}
}
And just like that:
GET /api/GetResult?id=d6b07382-3fdb-4f06-b9a9-ab0fbdf15594 HTTP/1.1
Host: localhost:7071
Content-Type: application/json
Cache-Control: no-cache
Postman-Token: cced7ca9-a039-43d9-ae30-6c7d14c306de
Gets back
{
"ErrorId": 1,
"ErrorMessage": "First name is null or not longer than 1 character",
"PartitionKey": "d6b07382-3fdb-4f06-b9a9-ab0fbdf15594",
"RowKey": "d6b07382-3fdb-4f06-b9a9-ab0fbdf15594",
"Timestamp": "2018-05-31T05:04:55.297+00:00",
"ETag": "W/\"datetime'2018-05-31T05%3A04%3A55.297Z'\""
}
So we now have:
- An orchestration function delegating work to one or more workers and returning back to the caller where it can find the results
- A "lookup" Function to return results based on the session ID created by the orchestration function
- One or more worker functions each placing its ending state in the location where the lookup function can find it
If you were to do a Fan out/in process using this same architecture, your lookup function would - instead of checking one spot for output - handle the work of checking to make sure all applicable Functions have placed their stuff where it needs to be, or comes back with 404/204 to let the caller know "not done yet".
Tune in next time for the final part in the series where I cover a handy library shipped by a group within the Azure Functions team, Durable Functions, and how it helps abstract some of this work away for you!