Workflow Foundation 4.0 Activity Model (II)
In previous post I introduced WF4's Activity model. In this section, I'm going to use 2 samples to demo 4 styles to create a custom Activity.
How to build a leaf activity – HttpGet
Today HTTP based RESTful services are getting popular, so it might be useful to perform some basic HTTP operations in workflow. Now let’s try to build one simple Activity to grab some data from a web service using HTTP GET request.
Code it up
First let’s pick a base activity class to implement.
This is a leaf level operation which could not be built by composition of other Activities, aka “leaf activity”. For leaf activities, the simplest approach is to use CodeActivity if the logic could be implemented by one block of imperative code. We know the basic HTTP GET functionality could be implemented by few lines of code using HttpWebRequest API:
WebRequest request = HttpWebRequest.Create(someUri);
using (WebResponse response = request.GetResponse())
{
//read data out of response.GetResponseStream()
}
Code 1. C# code to perform an HTTP GET operation
So CodeActivity would be a natural choice here. Let’s say we want the activity to “return” the HTTP response content as a string, so we should use CodeActivity<string> as base class.
Next step is to define input/output.
To simplify the case, let’s assume users could only specify URI for the request, so this Activity will have a string-typed input argument Uri. By using CodeActivity<string> as base class, it automatically inherits a string-typed output argument Result. So the class skeleton looks like this:
public class HttpGet : CodeActivity<string>
{
public InArgument<string> Uri
{
get;
set;
}
//inherited from CodeActivity<string>
//public OutArgument<string> Result
//{
// get;
// set;
//}
}
Code 2. Input/output of HttpGet activity
Final step is to blow in breath of life to the activity.
CodeActivity has one protected abstract method void Execute(CodeActivityContext context). Subclasses need to implement this method with activity execution logic. Similarly CodeActivity<T> has an abstract method T Execute(CodeActivityContext context). Subclasses implement this method with concrete logic and returns a result of T. CodeActivity<T> will set the return value to the Result output argument automatically. So in our case, we just need to use code 1 to implement Execute method with one change: to access value of an argument, you need to call Argument.Get method, which we will explain in a future blog.
public class HttpGet : CodeActivity<string>
{
…
protected override string Execute(CodeActivityContext context)
{
WebRequest request = HttpWebRequest.Create(this.Uri.Get(context));
using (WebResponse response = request.GetResponse())
{
//read everything response.GetResponseStream() as one string
using (StreamReader reader = new StreamReader(response.GetResponseStream()))
{
return reader.ReadToEnd();
}
}
}
}
Code 3. Implementation of HttpGet activity
With < 20 lines of code, we create a new activity which could be used in any workflow. That was easy!
To test this activity, the easiest way is to use WorkflowInvoker. Here is a quick example:
HttpGet fetchMsn = new HttpGet
{
Uri = "https://www.msn.com"
};
string msnContent = WorkflowInvoker.Invoke<string>(fetchMsn);
Console.WriteLine(msnContent);
Code 4. Test code for HttpGet activity
Unblock it
I’ve repeated time after time, workflow is meant for asynchronous services. Whenever possible, we should avoid blocking in activity’s execution logic. In general, any form of IO is potentially slow and we should try to make them non-blocking. The easiest approach to make a CodeActivity to be asynchronous is to use AsyncCodeActivity as long as the logic could be implemented by an asynchronous method call. Looking closer to HttpWebRequest, we found it has Begin/EndGetResponse method pairs, what a godsend! So let’s change HttpGet to AsyncCodeActivity.
AsyncCodeActivity is similar to CodeActivity, except it has a Begin/EndExecute abstract method pair instead of one Execute method. Following .Net Asynchronous Invoke pattern, BeginExecute starts an asynchronous operation, returns an IAsyncResult without waiting for it to complete; when the operation is done, EndExecute is invoked and returns a result. So HttpGet would be implemented this way:
class HttpGet : AsyncCodeActivity<string>
{
…
protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
WebRequest request = HttpWebRequest.Create(this.Uri.Get(context));
context.UserState = request;
return request.BeginGetResponse(callback, state);
}
protected override string EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
WebRequest request = (WebRequest)context.UserState;
using (WebResponse response = request.EndGetResponse(result))
{
using (StreamReader reader = new StreamReader(response.GetResponseStream()))
{
return reader.ReadToEnd();
}
}
}
}
Code 5. Implementation of HttpGet using AsyncCodeActivity
This is great, but you might see more room to improve: reading from a stream is also a blocking operation. We could use BeginRead to make it asynchronous. However now we hit a limitation of AsyncCodeActivity: it only supports execution with one continuation point. So if we want to make HttpGet 100% non-blocking, we should go with NativeActivity and use bookmark API, which I’m not going to demo in this blog.
How to build a composite activity - World
In workflow scenarios, developers often need to build higher level activities to model some business processes, or to integrate existing systems. Usually such an activity is built by putting other activities together in an interesting way and is called a “composite activity”.
Now let’s say we want to build an activity called “World” to simulate the universe we live in. What it does is to continuously run different events (represented by Activities) with different probabilities until doomsday. Since it needs to execute other activities, it could not be built using CodeActivity or AsyncCodeActivity. Let’s try something new here.
Describe the logic using AST
For any composite activity, I would suggest to consider using Activity class first. Because it supports fully declarative programming style, a developer could simply spell out the execution logic as a tree of Activities provided there are existing Activities for all the leaf functionalities.
As always, let’s start with defining input and output.
World needs a list of events (modeled by Activities) and their probabilities (modeled by int as percentage number). It doesn’t return anything so no output is required. Here is a skeleton for World:
class World : Activity
{
Collection<Activity> events = new Collection<Activity>();
Collection<int> probablities = new Collection<int>();
public Collection<Activity> Events
{
get { return this.events; }
}
public Collection<int> Probablities
{
get { return this.probablities; }
}
}
Code 6. Public interface of World activity
It’s intentional to separate Activities and their probabilities in 2 collections. Although this design is less OO, it saves us from some plumbing code required activity reflection model, which we need to talk sometime later.
Then it comes to meat of the activity. Let’s first write down the logic in pseudo code:
Random rand = new Random();
while (true)
{
Console.WriteLine("A new day:");
//ideally each iteration of this foreach could be run in parallel
//as they are independent of each other
foreach (event in Events, probability in Probabilities)
{
if (rand.Next(1,100) <= probability)
{
event.Run();
}
}
}
Code 7. pseudo code to demo World’s logic
The spirit of Activity class is to define this logic by an activity tree declaratively. So let’s check current activity bag so see if we have enough tools. Here are my findings:
Imperative code |
Activity shipped in WF4 to represent the same logic |
A block of sequential code:
{ Statement1; Statement2; … } |
public sealed class Sequence : NativeActivity { public Collection<Activity> Activities { get; } … }
It has a collection of activities as its children. At run time, Sequence will run activities in the collection one by one. |
Assign statement
a=b; |
public sealed class Assign<T> : CodeActivity { public OutArgument<T> To { get; set; } public InArgument<T> Value { get; set; } … }
It has one input argument Value and an output argument To. At run time, it assign value of Value to To.
|
While loop
while(condition) { Statement1; Statement2; … } |
public sealed class While : NativeActivity { public Activity<bool> Condition { get; set; } public Activity Body { get; set; } … }
It has a Condition who is an Activity returning bool. While will keep running the Body activity time after time as long as Condition returns true at beginning of each iteration. |
If-else
if (condition) { Statement1; … } else { Statement2; …. } |
public sealed class If : NativeActivity { public InArgument<bool> Condition { get; set; } public Activity Then { get; set; } public Activity Else { get; set; } … } If has an input argument Condition and two Activities. If Condition is evaluated to true at run time, it will execute Then activity, otherwise Else activity. |
A list of things to run in parallel. There is no built-in support for this construct in mainstream programing languages (OpenMp is an effort bring this feature to compilers). Parallel Computing Platform is a Microsoft effort to add this feature to .Net framework, e.g:
Parallel.For ( startIndex, endIndex, (currentIndex) => DoSomework(currentIndex))
|
public sealed class Parallel : NativeActivity { public Collection<Activity> Branches { get; } … }
Parallel will run every activity in its Branches collection at the same time. All asynchronous operations (including waiting for messages or timeout) performed by those branches could happen concurrently. |
Table 1. Some WF4.0 Activities for common imperative code pattern
It looks like we have enough ingredients, so let’s put them together to describe the logic in code 7. To implement Activity, the key is to assign its Implementation property a delegate which returns the activity tree describing its logic.
class World : Activity
{
…
public World()
{
this.Implementation = CreateBody;
}
Activity CreateBody()
{
Variable<Random> rand = new Variable<Random>();
//build a Parallel activity that each branch is:
// if (rand.Next(1,100) <= probability)
// {
// event.Run();
// }
Parallel runEventsInADay = new Parallel();
for (int i = 0; i < this.events.Count; i++)
{
int eventProbability = this.Probablities[i];
If tryToRunOneEvent = new If
{
Condition = new InArgument<bool>(ctx => rand.Get(ctx).Next(1, 100) <= eventProbability),
Then = this.events[i]
};
runEventsInADay.Branches.Add(tryToRunOneEvent);
}
// a block of code which has an assign and a while loop
return new Sequence
{
Variables = {rand},
Activities =
{
new Assign<Random>
{
To = rand,
Value = new InArgument<Random>(ctx=>new Random())
},
// a while(true) loop whose body contains the Parallel built above
new While
{
Condition = new Literal<bool>(true),
Body = new Sequence
{
Activities =
{
new WriteLine {Text = "A new day:"},
runEventsInADay
}
}
}
}
};
}
}
Code 8. Implementation of World activity
Hopefully the code is clear by itself by now. The Variable “rand” and how to use it might need further explanation but I’ll skip it in this post.
We built a world with less than 50 lines of code. You could test it out to see if it’s a happy place:
World world = new World();
//there is 90% chance for a baby to be born in a day
world.Events.Add(new WriteLine { Text = " A new baby is born" });
world.Probablities.Add(90);
//there is 50% chance for Nasdaq to go up in a day
world.Events.Add(new WriteLine { Text = " Nasdaq is going up" });
world.Probablities.Add(50);
//there is 30% chance for Seattle to rain in a day
world.Events.Add(new WriteLine { Text = " Seattle is raining" });
world.Probablities.Add(30);
//there is 5% chance for someone to win 1M lottery in a day
world.Events.Add(new WriteLine { Text = " Someone wins $1M lottery" });
world.Probablities.Add(5);
//there is 1% chance for Armageddon to happen in a day
world.Events.Add(
new Sequence
{
Activities = {
new WriteLine { Text = " Armageddon!" },
new Throw {Exception = new InArgument<Exception>(ctx=>new InvalidOperationException())}
}
}
);
world.Probablities.Add(1);
WorkflowInvoker.Invoke(world);
Code 9. Test code of World
Please note that here I’m using an unhandled exception to model Armageddon and break out the infinite loop. This is not encouraged for production code, but fun for demo. J For sharing, this is result of one run of my World:
A new day:
A new baby is born
Nasdaq is going up
Seattle is raining
A new day:
A new baby is born
Nasdaq is going up
A new day:
A new baby is born
A new day:
A new day:
A new baby is born
Nasdaq is going up
… //many days passed by
A new day:
Seattle is raining
A new day:
Nasdaq is going up
A new day:
A new baby is born
Nasdaq is going up
Armageddon! //an exception is thrown here and the World crashes
Output 1. Output of a test run for World
Describe the logic as screenplay
If you want deeper control of the logic or access to some low level runtime features, or if you are just curious how Activities is executed under the hood, NativeActivity is your friend. Using NativeActivity, you need to describe the logic as script for a play, clearly specifying who should do what and when. Now Iet’s try re-implement World using NativeActivity.
The class input remains the same so the public interface doesn’t change:
class World : NativeActivity
{
Collection<Activity> events = new Collection<Activity>();
Collection<int> probablities = new Collection<int>();
Variable<Random> rand = new Variable<Random>();
public Collection<Activity> Events
{
get { return this.events; }
}
public Collection<int> Probablities
{
get { return this.probablities; }
}
protected override void CacheMetadata(NativeActivityMetadata metadata)
{
base.CacheMetadata(metadata);
metadata.AddImplementationVariable(rand);
}
}
Code 10. NativieActivity version of World’s skeleton
One thing different than the Activity version (Code 6) is that the variable “rand” needs to be defined by World class itself and CacheMetadata method is overridden to report this variable to runtime. I would not go into details for them here.
The key to implement a NativeActivity is to implement its abstract method Execute(NativeActivityContext context). This method executes the activity logic using WF runtime’s API, provided by the “context” parameter of type NativeActivityContext.
WF runtime works on a queue of workitems. When one activity is scheduled, execution of the activity is put into the queue. Runtime pops one workitem from a queue and runs it. After one workitem is completed, another is poppd up and run. Activity could use runtime’s API to add child workitems to the queue when itself is being executed. When an Activity has no child workitems in the queue, it is considered completed. Runtime exposes a set of APIs for Activities to interact with the workitem queue. The most important one is ScheduleActivity defined in NativeActivityContext:
public class NativeActivityContext : ActivityContext
{
//There are other overloads of ScheduleActivity,I'm just using this one as example
public ActivityInstance ScheduleActivity(Activity activity, CompletionCallback onCompleted);
…
}
Code 11. ScheduleActivity method prototype
With this method, an Activity could schedule a child Activity and provide a callback. The callback will be invoked after the child Activity has completed.
Let’s first think about how to implement some simple control flow pattern with this API.
· Sequence. During execution, Sequence just needs to schedule one child Activity (pointed by an index variable), when this activity has completed (in CompletionCallback), it schedules the next child activity. This cycle is repeated until the last child is done. Here is pseudo code:
public class Sequence : NativeActivity
{
…
protected override void Execute(NativeActivityContext context)
{
//set current index to 0
…
ScheduleOneActivity(context);
}
void ScheduleOneActivity(NativeActivityContext context)
{
//grab current index value
…
if (currentIndex < this.Activities.Count)
{
context.ScheduleActivity(this.Activities[currentIndex], OnChildActivityCompleted);
//increment current index value
…
}
}
void OnChildActivityCompleted(NativeActivityContext context, ActivityInstance completedInstance)
{
ScheduleOneActivity(context);
}
}
Code 12. Pseudo code for Sequence implementation
· While. While’s execution is made of iterations. In every iteration, it evaluates the loop condition, schedule the loop body if the condition is still true, after the body completes, While starts another iteration. Here is pseudo code:
public class While : NativeActivity
{
…
protected override void Execute(NativeActivityContext context)
{
RunIteration(context)
}
private void RunIteration(NativeActivityContext context)
{
//evaluate loop condition
…
if (condition)
{
context.ScheduleActivity(this.Body, OnBodyCompleted);
}
}
void OnBodyCompleted(NativeActivityContext context, ActivityInstance completedInstance)
{
RunIteration(context);
}
}
code 13. Pseudo code for While implementation
· Parallel. The main difference between Parallel and Sequence is that instead of schedule child Activities one after another, Parallel schedule all of them at the beginning. That way, execution of child Activities are not necessarily serialized and there are chances for them to interleave. Here is pseudo code for Parallel:
public class Parallel : NativeActivity
{
…
protected override void Execute(NativeActivityContext context)
{
foreach (Activity branch in this.Branches)
{
context.ScheduleActivity(branch);
}
}
}
Code 14. Pseudo code for Parallel implementation
Please note Parallel doesn’t need to do anything after its children are done so we don’t even need a completion callback for ScheduleActivity.
With understanding on how ScheduleActivity and completion callback could be used to control the execution, it is not hard for us to implement World using this API:
class World : NativeActivity
{
…
protected override void Execute(NativeActivityContext context)
{
rand.Set(context, new Random());
ScheduleEventsForOneDay(context);
}
void ScheduleEventsForOneDay (NativeActivityContext context)
{
Random r = rand.Get(context);
Console.WriteLine("A new day:");
for (int i = 0; i < this.Events.Count; i++)
{
//schedule an event based on its probability
if (r.Next(1, 100) <= this.Probablities[i])
{
context.ScheduleActivity(this.Events[i], OnEventCompleted);
}
}
}
void OnEventCompleted(NativeActivityContext context, ActivityInstance completedInstance)
{
//if the last event in a day has completed, start another day
if (context.GetChildren().Count == 0)
{
ScheduleEventsForOneDay(context);
}
}
}
Code 15. Implement World using NativeActivity
One thing worth mentioning is the completion callback for events. The Activity needs to move on and start next day after all events scheduled in one day have completed. This is achieved by checking result of NativeActivityContext.Getchild method when every event finishes. This method returns all currently running child Activity instances scheduled by this Activity. When the list is empty, we know all events scheduled for one day are done.
Code 15 has one small problem: ScheduleEventsForOneDay could return without scheduling any event if none of them wins the odds. As we discussed before, if an Activity has no child workitem running, it will be completed by runtime. But we still want our World to run even after a boring day, so as a final version, we should update ScheduleEventsForOneDay method to this:
void ScheduleEvents(NativeActivityContext context)
{
Random r = rand.Get(context);
bool eventsHappenedForOneDay = false;
while (!eventsHappenedForOneDay)
{
Console.WriteLine("A new day:");
for (int i = 0; i < this.Events.Count; i++)
{
if (r.Next(1, 100) <= this.Probablities[i])
{
context.ScheduleActivity(this.Events[i], OnEventCompleted);
eventsHappenedForOneDay = true;
}
}
}
}
Code 16. the correct implementation of ScheduleEvents method
Now run this Activity, it should have similar behavior as the composite version in Code 8.
As a summary, NativeActivity is most powerful activity authoring style. It could achieve whatever other base classes could do plus a lot of more capabilities by directly using runtime API. On the other hand, programming with NativeActivity requires better understanding of runtime thus more learning is required. When we talk about more runtime features in future blogs, we will use NativeActivity as samples.
Yun Jin
Development Lead
https://blogs.msdn.com/yunjin