Share via


Parallel Tasks

Chapter 2 shows how you can use a parallel loop to apply a single operation to many data elements. This is data parallelism. Chapter 3 explains what happens when there are distinct asynchronous operations that can run simultaneously. In this situation, you can temporarily fork a program's flow of control with tasks that can potentially execute in parallel. This is task parallelism. The Parallel Tasks pattern is sometimes known as the Fork/Join pattern or the Master/Worker pattern.

Data parallelism and task parallelism are two ends of a spectrum. Data parallelism occurs when a single operation is applied to many inputs. Task parallelism uses multiple operations, each with its own input.

In the Microsoft® .NET Framework, tasks are implemented by the Task class in the System.Threading.Tasks namespace. The static Task.Factory property contains an instance of the TaskFactory class, which is used to create and schedule new tasks. You can wait for a task to complete by invoking the task's Wait method. It's also possible to wait for multiple tasks to complete. If you think in terms of fork/join, the StartNew method is the fork operation and the Wait method is the join operation.

Scheduling is an important aspect of parallel tasks. Unlike threads, new tasks don't necessarily begin executing immediately. Instead, they are placed in a work queue. Tasks run when their associated task scheduler removes them from the queue, usually as cores become available. The task scheduler attempts to optimize overall throughput by controlling the system's degree of concurrency. As long as there are enough tasks and the tasks are sufficiently free of serializing dependencies, the program's performance scales with the number of available cores. In this way, tasks embody the concept of potential parallelism that was introduced in Chapter 1.

Another important aspect of task-based applications is how they handle exceptions. In .NET, an unhandled exception that occurs during the execution of a task is deferred for later observation. For example, the deferred exception is automatically observed at a later time when you call one of the task class's wait methods. At that time, the exception is rethrown in the calling context of the wait method. This allows you to use the same exception handling approach in parallel programs that you use in sequential programs.

The Basics

Each task is a sequential operation; however, tasks can often run in parallel. In .NET, a task is also an object with properties and methods of its own. Here's some sequential code.

  DoLeft();
  DoRight();

Let's assume that the methods DoLeft and DoRight are independent. This means that neither method writes to memory locations or files that the other method might read. Because the methods are independent, you can use the Invoke method of the Parallel class to call them in parallel. This is shown in the following code.

  Parallel.Invoke(DoLeft, DoRight);

Parallel.Invoke is the simplest expression of the parallel task pattern. It creates new parallel tasks for each delegate method that is in its params array argument list. The Invoke method returns when all the tasks are finished.

Callout: Parallel.Invoke is a method in .NET that creates a list of parallel tasks and waits for them all to complete.

You can't assume that all parallel tasks will immediately run. Depending on the current work load and system configuration, tasks might be scheduled to run one after another, or they might run at the same time. For more information about how tasks are scheduled, see the section, "The Default Task Scheduler," later in this chapter.

The delegate methods of Parallel.Invoke can either complete normally or finish by throwing an exception. Any exceptions that occur during the execution of Parallel.Invoke are deferred and rethrown when all tasks finish. All exceptions are rethrown as inner exceptions of an AggregateException instance. For more information and a code example, see the section, "Handling Exceptions," later in this chapter.

Internally, Parallel.Invoke creates new tasks and waits for them. It uses methods of the Task class to do this. Here's an example.

  Task t1 = Task.Factory.StartNew(DoLeft);  
  Task t2 = Task.Factory.StartNew(DoRight);

  Task.WaitAll(t1, t2);

Note

Two tasks, t1 and t2, are needed in this example for correct exception handling. Without the second task, t2, the code would not have guaranteed that exceptions thrown during the execution of task t1 would be observed. For more information, see the section, "Handling Exceptions," later in this chapter.

The StartNew method of the TaskFactory class creates and schedules a new task that executes the delegate method that is provided as its argument. You can wait for parallel tasks to complete by calling the Task.WaitAll method. (Use the Wait method if you want to wait on a single task.)

When you use StartNew to create a task, the new task is added to a work queue for eventual execution, but it does not start to run until its task scheduler takes it out of the work queue, which can happen immediately or can occur at some point in the future.

The examples you've seen so far are simple, but they're powerful enough to handle many scenarios. For more ways to use tasks, see the section, "Variations," later in this chapter.

An Example

An example of task parallelism is an image processing application where images are created with layers. Separate images from different sources are processed independently and then combined with a process known as alpha blending. This process superimposes semitransparent layers to form a single image.

The source images that are combined are different, and different image processing operations are performed on each of them. This means that the image processing operations must be performed separately on each source image and must be complete before the images can be blended. In the example, there are only two source images, and the operations are simple: conversion to gray scale and rotation. In a more realistic example, there might be more source images and more complicated operations.

Here's the sequential code. The source code for the complete example is located at http://parallelpatterns.codeplex.com in the Chapter3\ImageBlender folder.

static int SeqentialImageProcessing(Bitmap source1, Bitmap source2, 
                                    Bitmap layer1, Bitmap layer2, 
                                    Graphics blender)
{
  SetToGray(source1, layer1);
  Rotate(source2, layer2);
  Blend(layer1, layer2, blender);

  return source1.Width;
}

In this example, source1 and source2 are bitmaps that are the original source images, layer1 and layer2 are bitmaps that have been prepared with additional information needed to blend the images, and blender is a Graphics instance that performs the blending and references the bitmap with the final blended image. Internally, SetToGray, Rotate, and Blend use methods from the .NET System.Drawing namespace to perform the image processing. The last statement returns an integer that the caller uses to print a progress message.

The SetToGray and Rotate methods are entirely independent of each other. This means that you can execute them in separate tasks. If two or more cores are available, the tasks might run in parallel, and the image processing operations might complete in less elapsed time than a sequential version would.

The parallel code uses tasks explicitly.

static int ParallelTaskImageProcessing(Bitmap source1, 
   Bitmap source2, Bitmap layer1, Bitmap layer2, 
   Graphics blender)
{
  Task toGray = Task.Factory.StartNew(() => 
     SetToGray(source1, layer1));
  Task rotate = Task.Factory.StartNew(() => 
     Rotate(source2, layer2));
  Task.WaitAll(toGray, rotate);
  Blend(layer1, layer2, blender);
  return source1.Width;
}

This code calls Task.Factory.StartNew to create and run two tasks that execute SetToGray and Rotate, and calls Task.WaitAll to wait for both tasks to complete before blending the processed images.

You can also use the Parallel.Invoke method to achieve parallelism. The Parallel.Invoke method has very convenient syntax. This is shown in the following code.

static int ParallelInvokeImageProcessing(Bitmap source1, Bitmap source2, 
                                         Bitmap layer1, Bitmap layer2, 
                                         Graphics blender)
{
  Parallel.Invoke(
    () => SetToGray(source1, layer1),
    () => Rotate(source2, layer2));
  Blend(layer1, layer2, blender);
  return source1.Width;
}

Here the tasks are identified implicitly by the arguments to Parallel.Invoke. This call does not return until all of the tasks complete.

Variations

This section describes variations of the .NET implementation of the Parallel Task pattern.

Canceling a Task

In .NET, a cancellation request does not forcibly end a task. Instead, tasks use a cooperative cancellation model. This means that a running task must poll for the existence of a cancellation request at appropriate intervals and then shut itself down by calling back into the library.

The .NET Framework uses separate types. One allows a program request cancellation, and the other checks for cancellation requests. Instances of the CancellationTokenSource class are used to request cancellation, while CancellationToken values indicate whether cancellation has been requested. Here's an example:

CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;

Task myTask = Task.Factory.StartNew(() =>
{
    for (...)
    {
        token.ThrowIfCancellationRequested();

        // Body of for loop.
    }
}, token);

// ... elsewhere ...
cts.Cancel();

The CancellationTokenSource object contains a cancellation token. This token is used in two places. It's passed as an argument to the StartNew factory method, and its ThrowIfCancellationRequested method is invoked to detect and process a cancellation request. Although the example doesn't show it, you can also read the cancellation token's IsCancellationRequested property to test for a cancellation request. For example, you would do this if you need to perform local cleanup actions for a task that's in the process of being canceled.

If the cancellation token indicates that a cancellation has been requested, the ThrowIfCancellationRequested method creates an OperationCanceledException instance and passes in the cancellation token. It then throws the exception. This exception is the signal that notifies the .NET Framework that the task has been canceled; therefore, the OperationCanceledException should not be handled by user code within the task (it is often handled, however, outside of the task that was canceled). If you follow the steps that have been described in this section, the task will be stopped and its Status property will be set to the enumerated value TaskStatus.Canceled.

To ensure that a running task transitions to status Canceled, a cancellation token must have been given as an argument to the StartNew method and an unhandled OperationCanceledException exception that contains that same cancellation token must be thrown by the task.

Checking for cancellation within a loop with many iterations that each perform a small amount of work can negatively affect your application's performance, but checking only infrequently for cancellation can introduce unacceptable latency in your application's response to cancellation requests. For example, in an interactive GUI-based application, checking for cancellation more than once per second is probably a good idea. An application that runs in the background could poll for cancellation much less frequently, perhaps every two to ten seconds. Profiling your application can give you performance data that you can use when determining the best places to test for cancellation requests in your code. You need to understand which code locations will allow you to poll for cancellation at evenly spaced intervals of your choosing, and profiling can help you do this.

If a cancellation token is passed to the StartNew method and cancellation is signaled on that token before the new task begins to execute, the new task's status will transition to Canceled without ever running.

For a little fine print about the behavior of cancellation, see the sections, "Task Life Cycle" and "Unobserved Task Exceptions," later in this chapter.

Note

Whether or not a task gets the status Canceled can be important. If you don't do this correctly, the process might be unexpectedly terminated when the improperly canceled task is garbage collected. A typical mistake is forgetting to pass the cancellation token to the StartNew method. Another typical mistake is forgetting to throw the OperationCanceledException that contains the required cancellation token.

Handling Exceptions

If the execution of a Task object's delegate throws an unhandled exception, the task terminates and its Status property is set to the enumerated value TaskStatus.Faulted. The unhandled exception is temporarily unobserved. This means that the Task Parallel Library (TPL) catches the exception and records its details in internal data structures associated with the task object. Recovering a deferred exception and embedding it in a new exception is known as "observing an unhandled task exception." In many cases, unhandled task exceptions will be observed in a different thread than the one that executed the task.

Ways to Observe an Unhandled Task Exception

There are several ways to observe an unhandled task exception.

Invoking the faulted task's Wait method causes the task's unhandled exception to be observed. The exception is also thrown in the calling context of the Wait method. The Task class's static WaitAll method allows you to observe the unhandled exceptions of more than one task with a single method invocation. The Parallel.Invoke method includes an implicit call to WaitAll. Exceptions from all of the tasks are grouped together in an AggregateException object and thrown in the calling context of the WaitAll or Wait method.

Getting the Exception property of a faulted task causes the task's unhandled exception to be observed. The property returns the aggregate exception object. Getting the value does not automatically cause the exception to be thrown; however, the exception is considered to have been observed when you get the value of the Exception property. Use the Exception property instead of the Wait method or WaitAll method when you want to examine the unhandled exception but do not want it to be rethrown in the current context.

Special handling occurs if a faulted task's unhandled exceptions are not observed by the time the task object is garbage-collected. For more information, see the section, "Unobserved Task Exceptions," later in this chapter.

Note

You must take care to observe the unhandled exceptions of each task. If you don't do this, .NET's exception escalation policy can terminate your process when the task is garbage collected.

Aggregate Exceptions

In some cases, such as when you're waiting for more than one task, there can be multiple unhandled exceptions that need to be observed. For this reason, the runtime collects the unhandled task exceptions and wraps them in an AggregateException instance. TPL throws the aggregate exception in the context of the observer (that is, in the waiting thread). You can catch the aggregate exception and examine its InnerExceptions property to handle the original exceptions individually. Even if the unhandled exception from just one task is observed, it's still wrapped in an AggregateException. Throwing an aggregate exception preserves the inner exception's stack trace, which would otherwise be overwritten if the exception were rethrown in the current context.

Like all exceptions that are raised inside of a task, an OperationCanceledException instance that's created by canceling a task will become an inner exception of an AggregateException thrown by the task in the context of the thread that calls the task's Wait method. However, if you follow the cancellation rules that are described in the section, "Canceling a Task," earlier in this chapter, the task's final status will be TaskStatus.Canceled instead of TaskStatus.Faulted.

The Handle Method

The AggregateException class has several helper methods to make handling inner exceptions easier. The Handle method of the AggregateException class invokes a user-provided delegate method for each of the inner exceptions of an aggregate exception. The delegate should return true if it has handled the inner exception and false if it has not handled the inner exception. Here's an example.

try
{
  Task t = Task.Factory.StartNew( ... );
  // ... 
  t.Wait();
}
catch (AggregateException ae)
{
  ae.Handle(e =>
  {
    if (e is MyException)
    {
      // ... handle exception ...
      return true;
    }
    else
    { 
      return false;
    }
  });
}

When the Handle method invokes the user-provided delegate for each inner exception, it keeps track of the results of the invocation. After it processes all inner exceptions, it checks to see if the results for one or more of the inner exceptions were false, which indicates that they have not been handled. If there are any unhandled exceptions, the Handle method creates and throws a new aggregate exception that contains the unhandled ones as inner exceptions.

If the Handle method doesn't meet your needs, you can write your own code that iterates through the InnerExceptions property of an AggregateException object.

The Flatten Method

The Flatten method of the AggregateException class is useful when tasks are nested within other tasks. In this case, it's possible that an aggregate exception can contain other aggregate exceptions as inner exceptions. The Flatten method produces a new exception object that merges the inner exceptions of all nested aggregate exceptions into the inner exceptions of the top-level aggregate exception. In other words, it converts a tree of inner exceptions into a flat sequence of inner exceptions. It's typical to use the Flatten and Handle methods together. This is shown in the following example.

try
{
  Task t1 = Task.Factory.StartNew(() =>
  {
    Task t2 = Task.Factory.StartNew(() =>
    { 
       // ...
       throw new MyException();
    });
    // ...
    t2.Wait();
  });
  // ... 
  t1.Wait();
}
catch (AggregateException ae)
{
  ae.Flatten().Handle(e =>
  {
    if (e is MyException)
    {
      // ... handle exception ...
      return true;
    }
    else
    { 
      return false;
    }
  });
}

Waiting for the First Task to Complete

The Task class includes a method named Task.WaitAll that allows you to wait for a set of tasks to complete. However, you can also wait for the first task to complete by calling the Task.WaitAny method.

Here's an example.

  var taskIndex = -1;

  Task[] tasks = new Task[]
                   {
                     Task.Factory.StartNew(DoLeft),
                     Task.Factory.StartNew(DoRight),
                     Task.Factory.StartNew(DoCenter)
                   };
  Task[] allTasks = tasks;

  // Print completion notices one by one as tasks finish.
  while (tasks.Length > 0)
  {
    taskIndex = Task.WaitAny(tasks);
    Console.WriteLine("Finished task {0}.", taskIndex + 1);
    tasks = tasks.Where((t) => t != tasks[taskIndex]).ToArray();
  }

  // Observe any exceptions that might have occurred.
  try
  {
    Task.WaitAll(allTasks);
  }
  catch (AggregateException ae)
  {
    ...
  }

This example shows how you can use the WaitAny method to be notified when the first of a group of tasks completes. The code prints a line to the console output after each task finishes. The while loop exits after all the tasks finish.

You need to observe any exceptions that may have occurred, but note that exceptions are never observed by the WaitAny method. Instead, you should add a step that checks for exceptions whenever you use the WaitAny method. Although this example uses the WaitAll method to observe exceptions, you could also use the Exception property or the Wait method for this purpose. The IsFaulted property of the Task class can be used to check to see whether an unhandled exception occurred within a task.

Note

Exceptions are never observed by the WaitAny method. However, you must make sure that all exceptions are eventually observed using one of the techniques that are described in the section, "Handling Exceptions," earlier in this chapter.

Speculative Execution

Speculative execution is a variation that occurs where you perform an operation in anticipation of a particular result. For example, you might predict that the current computation being run will output the value 42, and you start the next computation that depends on this computation's result by using 42 as the input. If the first computation ends with 42, you've now gained parallelism by successfully starting the dependent operation well in advance of when you otherwise could have. If the first computation results in something other than 42, you can restart the second operation using the correct value.

Another example of speculative execution occurs when you execute more than one asynchronous operation in parallel but need just one of the operations to complete before proceeding. Imagine, for example, that you use three different search tasks to search for an item. After the fastest task finds the item, you don't need to wait for the other searches to complete. In cases like, this you wait for the first task to complete and usually cancel the remaining tasks. However, you should always observe any exceptions that might have occurred in any of the tasks.

Here's an example.

  SpeculativeInvoke(SearchLeft, SearchRight, SearchCenter);

This example executes three delegate methods in parallel. Only one of the delegates needs to complete for the operation to succeed, so the tasks that did not finish first are canceled. The SpeculativeInvoke method executes each of the delegates in the params array argument. Its implementation is shown in the following code.

public static void SpeculativeInvoke(
                     params Action<CancellationToken>[] actions)
{
  var cts = new CancellationTokenSource();
  var token = cts.Token;
  var tasks = 
    (from a in actions
     select Task.Factory.StartNew(() => a(token), token))
    .ToArray();

  // Wait for fastest task to complete.
  Task.WaitAny(tasks);

  // Cancel all of the slower tasks.
  cts.Cancel();

  // Wait for cancellation to finish and observe exceptions.
  try 
  { 
    Task.WaitAll(tasks); 
  }
  catch (AggregateException ae)
  {
    // Filter out the exception caused by cancellation itself.
    ae.Flatten().Handle(e => e is OperationCanceledException);
  }
  finally
  {
    if (cts != null) cts.Dispose();
  }
}

The WaitAny method does not observe unhandled task exceptions. This is because you need the returned index from WaitAny to know which task completed (potentially by throwing an exception). Therefore, this example also makes a call to the WaitAll method after canceling the tasks that did not finish first. This causes all unhandled task exceptions to be observed in the current thread. The code catches the aggregate exception that is thrown by WaitAll and removes the OperationCanceledException instances. If there are no remaining unhandled exceptions, the code proceeds normally. Otherwise, the remaining unhandled exceptions are rethrown as the inner exceptions of a new aggregate exception.

Creating Tasks with Custom Scheduling

You can customize the details of how tasks in .NET are scheduled and run by overriding the default task scheduler that's used by the task factory methods. For example, you can provide a custom task scheduler as an argument to one of the overloaded versions of the TaskFactory.StartNew method.

There are some cases where you might want to override the default scheduler. The most common case occurs when you want your task to run in a particular thread context. This can happen when you use objects provided by libraries, such as Windows Presentation Foundation (WPF), that impose thread affinity constraints. Other cases occur when the load-balancing heuristics of the default task scheduler don't work well for your application. For more information, see the section, "Thread Injection," later in this chapter.

Unless you specify otherwise, any new tasks will use the current task scheduler, which is the value of the static property TaskScheduler.Current. In other words, subtasks inherit the task scheduler of the enclosing task context. Unless otherwise specified, new top-level tasks (and, consequently, their subtasks) use the default task scheduler, which is the value of the static property TaskScheduler.Default. The default task scheduler is tightly integrated with the .NET thread pool and handles a wide variety of operating conditions. It's a good choice for most applications. For more information, see the section, "The Default Task Scheduler," later in this chapter. The TaskScheduler class's FromCurrentSynchronizationContext static method returns a task scheduler object that schedules the task to run wherever the current thread's SynchronizationContext property indicates. This is sometimes the current thread, but not necessarily. This task scheduler is useful in cases of thread affinity. Not every thread has a current synchronization context, and there is no API to get a task scheduler that runs in the synchronization context of a thread other than the current thread. For example, you can't use the FromCurrentSynchronizationContext method if you're trying to schedule a task in a Windows service's main thread.

You can implement your own task scheduler class. For more information, see the section, "Writing a Custom Task Scheduler," later in this chapter.

Anti-Patterns

Here are some things to watch out for when you use tasks.

Variables Captured by Closures

In C#, a closure can be created with a lambda expression in the form args => body that represents an unnamed (anonymous) delegate. A unique feature of closures is that they may refer to variables defined outside their lexical scope, such as local variables that were declared in a scope that contains the closure.

The semantics of closures in C# may not be intuitive to some programmers and it's easy to make mistakes. Unless you understand the semantics, you may find that captured variables don't behave as you expect, especially in parallel programs.

Problems occur when you reference a variable without considering its scope. Here's an example.

  for (int i = 0; i < 4; i++)
  {     // WARNING: BUGGY CODE, i has unexpected value
     Task.Factory.StartNew(() => Console.WriteLine(i));
  }

Note

Capturing a loop index variable in a closure is usually a bug. Be on the lookout for this very common coding error.

You might think that this code sample would print the numbers 1, 2, 3, 4 in some arbitrary order, but it can print other values, depending on how the threads happen to run. For example, you might see 4, 4, 4, 4. The reason is that the variable i is shared by all the closures created by the steps of the for loop. By the time the tasks start, the value of the single, shared variable i will probably be different from the value of i when the task was created. All the tasks are sharing the same variable.

The solution is to introduce an additional temporary variable in the appropriate scope.

  for (int i = 0; i < 4; i++)
  {
     var tmp = i;
     Task.Factory.StartNew(() => Console.WriteLine(tmp));
  }

This version prints the numbers 1, 2, 3, 4 in an arbitrary order, but each number will be printed. The reason is that the variable tmp is declared within the block scope of the for loop's body. This causes a new variable named tmp to be instantiated with each iteration of the for loop. (In contrast, all iterations of the for loop share a single instance of the variable i.)

This bug is one reason why you should use Parallel.For instead of coding a loop yourself. It's also one of the most common mistakes made by programmers who are new to tasks.

Disposing a Resource Needed by a Task

When you create a task, don't forget that you can't call the Dispose method on the objects that the task needs to do its work. Careless use of the C# using keyword is a common way to make this mistake. Here's an example.

Note

Be careful not to dispose resources needed by a pending task.

Task<string> t;
using (var file = new StringReader("text"))
{
  t = Task<string>.Factory.StartNew(() => file.ReadLine());
}
// WARNING: BUGGY CODE, file has been disposed
Console.WriteLine(t.Result);

The using keyword introduces an implicit try/finally block that invokes the Dispose method on the value of the variable when it exits its scope. The using keyword can't be used with captured variables. Instead, you need to call the Dispose method when you know that the disposable object is no longer needed. In this example, Dispose can only be called after the task's Result property is read.

Avoid Thread Abort

Terminating tasks with the Thread.Abort method leaves the AppDomain in a potentially unusable state. Also, aborting a thread pool worker thread is never recommended. If you need to cancel a task, use the technique described in the section, "Canceling a Task," earlier in this chapter. Do not abort the task's thread.

Note

Never attempt to cancel a task by calling the Abort method of the thread that is executing the task.

Design Notes

This section describes some of the design considerations that were used to create the Task Parallel Library.

Tasks and Threads

When a task begins to run, the applicable task scheduler invokes the task's user delegate in a thread of its choosing.

The task will not migrate among threads at run time. This is a useful guarantee because it lets you use thread-affine abstractions, such as the Microsoft Windows® operating system's critical sections, without having to worry, for example, that the Monitor.Enter function will be executed in a different thread than the Monitor.Exit function.

Task Life Cycle

The Status property of a Task instance tracks its life cycle. Figure 1 shows the life cycle of the tasks that are described in this chapter:

Ff963549.67470d87-e063-4ce8-87e5-95e3faf6a94f(en-us,PandP.10).png

Figure 1

Task life cycle

The TaskFactory.StartNew method creates and schedules a new task, which results in the status TaskStatus.WaitingToRun. Eventually, the TaskScheduler instance that is responsible for managing the task begins to execute the task's user delegate on a thread of its choosing. At this point, the task's status transitions to TaskStatus.Running. After the task begins to run, it has three possible outcomes. If the task's user delegate exits normally, the task's status transitions to TaskStatus.RanToCompletion. If the task's user delegate throws an unhandled exception, the task's status becomes Task.Faulted.

It's also possible for a task to end in TaskStatus.Canceled. For this to occur, you must pass a CancellationToken as an argument to the factory method that created the task. If that token signals a request for cancellation before the task begins to execute, the task won't be allowed to run. The task's Status property will transition directly to TaskStatus.Canceled without ever invoking the task's user delegate. If the token signals a cancellation request after the task begins to execute, the task's Status property will only transition to TaskStatus.Canceled if the user delegate throws an OperationCanceledException and that exception's CancellationToken property contains the token that was given when the task was created.

Later in this book, you'll see two variations of the task life cycle. Chapter 5, "Futures," introduces continuation tasks whose life cycles also depend on antecedent tasks. Chapter 6, "Dynamic Task Parallelism," describes tasks whose life cycles depend on subtasks that have been created with the AttachedToParent task creation option.

There is one more task status, TaskStatus.Created. This is the status of a task immediately after it's created by the Task class's constructor; however, it's recommended that you use a factory method to create tasks instead of the new operator.

Writing a Custom Task Scheduler

The .NET Framework includes two task scheduler implementations: the default task scheduler and a task scheduler that runs tasks on a target synchronization context. If these task schedulers don't meet your application's needs, it's possible to implement a custom task scheduler.

There are a number of advanced scenarios where creating a custom task scheduler is relevant. For example, you could implement a custom task scheduler if you wanted to enable a single maximum degree of parallelism across multiple loops and tasks instead of across a single loop. You could also create a custom task scheduler if you want to implement alternative scheduling algorithms, for example, to ensure fairness among batches of work. Finally, you can implement a custom task scheduler if you want to use a specific set of threads, such as Single Thread Apartment (STA) threads, instead of thread pool worker threads.

You can see how to create a custom task scheduler by looking at the ParallelExtensionsExtras project that is part of the Microsoft Parallel Samples package. For details of where to get these additional samples, see the section, "Further Reading," at the end of this chapter. This package includes an implementation of a QueuedTaskScheduler class that provides multiple global task queues that use round-robin scheduling. A custom scheduler doesn't need to use its own threads; it can simply limit the number of tasks that are allowed to run concurrently but still use the thread pool. This is the approach taken by the LimitedConcurrencyLevelTaskScheduler class in the ParallelExtensionsExtras project.

Unobserved Task Exceptions

If you don't give a faulted task the opportunity to propagate its exceptions (for example, by calling the Wait method), the runtime will escalate the task's unobserved exceptions according to the current .NET exception policy when the task is garbage-collected. Unobserved task exceptions will eventually be observed in the finalizer thread context. The finalizer thread is the system thread that invokes the Finalize method of objects that are ready to be garbage-collected. If an unhandled exception is thrown during the execution of a Finalize method, the runtime will, by default, terminate the current process, and no active try/finally blocks or additional finalizers will be executed, including finalizers that release handles to unmanaged resources. To prevent this from happening, you should be very careful that your application never leaks unobserved task exceptions. You can also elect to receive notification of any unobserved task exceptions by subscribing to the UnobservedTaskException event of the TaskScheduler class and choose to handle them as they propagate into the finalizer context.

This last technique can be useful in scenarios such as hosting untrusted plug-ins that have benign exceptions that would be cumbersome to observe. For more information, see the section, "Further Reading," at the end of this chapter.

During finalization, tasks that have a Status property of Faulted are treated differently from tasks with the status Canceled. The task's status determines how unobserved task exceptions that arise from task cancellation are treated during finalization. If the cancellation token that was passed as an argument to the StartNew method is the same token as the one embedded in the unobserved OperationCanceledException instance, the task does not propagate the operation-canceled exception to the UnobservedTaskException event or to the finalizer thread context. In other words, if you follow the cancellation protocol described in this chapter, unobserved cancellation exceptions will not be escalated into the finalizer's thread context.

Relationship Between Data Parallelism and Task Parallelism

The Parallel.Invoke method was described as creating tasks for each of the delegates passed in its params array. As a mental model, this is fine. In practice, this isn't always how it's actually executed. For example, with enough delegates in the params list, TPL may, for performance reasons, use a parallel loop to invoke all the delegates.

This highlights how task parallelism and data parallelism are related. If you represent each operation to be performed as a delegate, you can then use data parallelism to perform the same operation (invoking the delegate) on each piece of data (the delegate). Conversely, if you have a data parallel problem, you can approach it in a task-parallel manner by spinning up a task to process each individual operation.

The Default Task Scheduler

All parallel libraries rely on a scheduler to organize and order the execution of tasks and threads, and TPL is no exception. This section is a behind-the-scenes look at the .NET Framework 4 implementation of task scheduling. The material in this section can help you understand the performance characteristics that you'll observe when using TPL and PLINQ. Be aware that the scheduling algorithms described here represent an implementation choice. They aren't constraints imposed by TPL itself, and future versions of .NET might optimize task execution differently.

Note

The behind-the-scenes behavior described in this section applies to .NET Framework 4. There's no guarantee that future releases of the runtime or the framework won't behave differently.

In .NET Framework 4, the default task scheduler is tightly integrated with the thread pool. If you use the default task scheduler, the worker threads that execute parallel tasks are managed by the .NET ThreadPool class. Generally, there are at least as many worker threads as cores on your computer. When there are more tasks than available worker threads, some tasks will be queued and wait until the thread pool provides an available worker thread.

An example of this approach is the thread pool's QueueUserWorkItem method. In fact, you can think of the default task scheduler as an improved thread pool where work items return a handle that a thread can use as a wait condition, and where unhandled task exceptions are forwarded to the wait context. The handles to the work items are called tasks, and the wait condition occurs when you call the task's Wait method. In addition to these enhancements, the default thread scheduler is capable of better performance than the thread pool alone as the number of cores increases. Here's how this works.

The Thread Pool

In its simplest form, a thread pool consists of a global queue of pending work items and a set of threads that process the work items, usually on a first-in, first-out (FIFO) basis. This is shown in Figure 2.

Ff963549.37b8d8fa-f151-4cd9-bfcd-b9b5fe79c943-thumb(en-us,PandP.10).png

Figure 2

A thread pool

Thread pools have problems scaling to large numbers of cores. The main reason is that the thread pool has a single, global work queue. Each end of the global queue can be accessed by only one thread at a time, and this can become a bottleneck. When there are only a few, coarse-grained work items and a limited number of cores, the synchronization overhead of a global queue (that is, the cost of ensuring that only one thread at a time has access) is small. For example, the overhead is negligible when the number of cores is four or fewer and when each work item takes many thousands of processor cycles. However, as the number of cores increases and the amount of work that you want to do in each work item decreases (due to the finer-grained parallelism that is needed to exploit more of the cores), the synchronization cost of the traditional thread pool design begins to dominate.

Synchronization is an umbrella term that includes many techniques for coordinating the activities of multi-threaded applications. Locks are a familiar example of a synchronization technique. Threads use locks to make sure that they don't try to modify a location of memory at the same time as another thread. All types of synchronization have the potential of causing a thread to block (that is, to do no work) until a condition is met.

Tasks in .NET are designed to scale to large numbers of cores. They are also lightweight enough to perform very small units of work, in the hundreds or low thousands of CPU cycles. In .NET, it's possible for an application to run efficiently with millions of tasks. To handle this kind of scale, a more decentralized approach to scheduling than one that uses a single global queue is needed.

Decentralized Scheduling Techniques

The .NET Framework provides local task queues for each worker thread in the thread pool. Local task queues distribute the responsibility for queue management and avoid much of the serial access required by a global queue of work items. Giving different parts of the application their own work queues helps avoid a central bottleneck. This is shown Figure 3.

Ff963549.fd35c425-c583-4a6d-a365-fc6238123232-thumb(en-us,PandP.10).png

Figure 3

Per-thread local task queues

You can see that there are as many task queues as there are worker threads, plus one global queue. All of these queues operate concurrently. The basic idea is that when a new task needs to be added, it can sometimes be added to a thread-local queue instead of to the global queue, and when a thread is ready for a new task to run, it can sometimes find one waiting in its local queue instead of having to go to the global queue. Of course, any work that comes from a thread that is not one of the thread pool's worker threads still has to be placed on the global queue, which always incurs heavier synchronization costs than adding to a local queue.

In the typical case, accessing the local queue needs only very limited synchronization. Items can be locally added and removed very quickly. The reason for this efficiency is that the local queues are implemented using a special concurrent data structure known as a work-stealing queue. A work-stealing queue is a double-ended queue that has a private end and a public end. The queue allows lock-free pushes and pops from the private end but requires costlier synchronization for operations at the public end. When the length of the queue is small, synchronization is required from both ends due to the locking strategy used by the implementation.

Moving to a distributed scheduling approach makes the order of task execution less predictable than with a single global queue. Although the global queue executes work in FIFO order, the local work-stealing queues use last-in, first out (LIFO) order to avoid synchronization costs. However, the overall throughput is likely to be better because a thread only uses the relatively more expensive global queue when it runs out of work from its local queue.

Work stealing

What happens when a thread's local work queue is empty and the global queue is also empty? There still may be work on the local queues of other worker threads. This is where work stealing comes into play. This is shown Figure 4.

Ff963549.54a41d82-03fe-49bb-b7b8-7008f3e4f1c2-thumb(en-us,PandP.10).png

Figure 4

Work stealing

The diagram shows that when a thread has no items in its local queue, and there are also no items in the global queue, the system "steals" work from the local queue of one of the other worker threads. To minimize the amount of synchronization, the system takes the task from the public end of the second thread's work-stealing queue. This means that, unless the queue is very short, the second thread can continue to push and pop from the private end of its local queue with minimal overhead for synchronization.

This mix of LIFO and FIFO ordering has other interesting benefits that arise from the work distribution patterns of typical applications. It turns out that LIFO order makes sense for local queues because it reduces the likelihood of a cache miss. Something that has just been placed in a work queue has a good chance of referencing objects that are still present in the system's memory caches. One way to take advantage of the cache is by prioritizing recently added tasks.

Many parallel algorithms have a divide-and-conquer approach similar to recursion, as you'll see later in Chapter 6, "Dynamic Task Parallelism." The largest chunks of work tend to get pushed onto the queue before smaller subtasks. With FIFO ordering, these larger chunks are the first to be removed by other threads. Transferring a larger task to another thread reduces the need for stealing additional tasks in the future. As one of these larger tasks executes, it pushes and pops its subtasks in the new thread's local queue. This is a very efficient way to schedule these kinds of tasks.

Top-Level Tasks in the Global Queue

Tasks are placed in the global queue whenever a task factory method is invoked from a thread that is not one of the thread pool worker threads. (Of course, the factory method must be allowed to use the default task scheduler for this to be true. The information in this section applies only to tasks managed by the default task scheduler.)

You can also force the default task scheduler to place a task in the global queue by passing the task creation option PreferFairness to the factory method.

In this book, tasks in the global queue are called top-level tasks. Top-level tasks have approximately the same performance characteristics as work items that have been created with the thread pool's QueueUserWorkItem method.

Subtasks in a Local Queue

When one of the task factory methods is called from within a thread pool worker thread, the default task scheduler places the new task in that thread's local task queue. This is a faster operation than placing it in the global queue.

The default task scheduler assumes that minimizing the worst-case latency for subtasks isn't important. Instead, its goal is to optimize overall system throughput. This makes sense if you believe that any time you create a task from within another task or from within a thread pool work item, you are performing an operation that is part of some larger computation (such as a top-level task). In this case, the only latency that matters is that of the top-level task. Therefore, the default task scheduler doesn't care about FIFO ordering of subtasks. Although these assumptions don't hold in all cases, understanding them will help you use the features of the default task scheduler in the most efficient way for your application.

In this book, tasks in a local queue are known as subtasks. The motivation for this term is that most tasks that end up in a local queue are created while executing the user delegate of some other task.

Inlined Execution of Subtasks

It's often the case that a task must wait for a second task to complete before the first task can continue. If the second task hasn't begun to execute, you might imagine that the thread that is executing the first task blocks until the second task is eventually allowed to run and complete. An unlucky queue position for the second task can make this an arbitrarily long wait, and in extreme cases can even result in deadlock if all other worker threads are busy.

Fortunately, the TPL can detect whether the second task has begun to execute. If the second task is not yet running, the default task scheduler can sometimes execute it immediately in the first task's thread context. This technique, known as inlined execution, enables the reuse of a thread that would otherwise be blocked. It also eliminates the possibility of deadlock due to thread starvation. A nice side effect is that inline execution can reduce overall latency by acting as a scheduling short cut for urgent tasks.

The default task scheduler in .NET Framework 4 inlines a pending subtask if Task.Wait or Task.WaitAll is called from within the worker thread whose local queue contains that subtask. Inlining also applies to tasks created by methods of the Parallel class if these methods are called from within a worker thread. In other words, a thread pool worker thread can perform inline execution of tasks that it created. Top-level tasks in the global queue are never eligible to be inlined, and tasks created with the LongRunning task creation option do not inline other tasks.

The default task scheduler's policy for inline execution was motivated by the synchronization requirements of the work-stealing queues. Removing or marking a task in another local queue as "processed" would require additional, expensive cross-thread synchronization. Also, it turns out that typical applications almost never need cross-thread inlining. The most common coding patterns result in subtasks that reside in the local queue of the thread that executes the parent task.

Thread Injection

The .NET thread pool automatically manages the number of worker threads in the pool. It adds and removes threads according to built-in heuristics. The .NET thread pool has two main mechanisms for injecting threads: a starvation-avoidance mechanism that adds worker threads if it sees no progress being made on queued items and a hill-climbing heuristic that tries to maximize throughput while using as few threads as possible.

The goal of starvation avoidance is to prevent deadlock. This kind of deadlock can occur when a worker thread waits for a synchronization event that can only be satisfied by a work item that is still pending in the thread pool's global or local queues. If there were a fixed number of worker threads, and all of those threads were similarly blocked, the system would be unable to ever make further progress. Adding a new worker thread resolves the problem.

A goal of the hill-climbing heuristic is to improve the utilization of cores when threads are blocked by I/O or other wait conditions that stall the processor. By default, the managed thread pool has one worker thread per core. If one of these worker threads becomes blocked, there's a chance that a core might be underutilized, depending on the computer's overall workload. The thread injection logic doesn't distinguish between a thread that's blocked and a thread that's performing a lengthy, processor-intensive operation. Therefore, whenever the thread pool's global or local queues contain pending work items, active work items that take a long time to run (more than a half second) can trigger the creation of new thread pool worker threads.

The .NET thread pool has an opportunity to inject threads every time a work item completes or at 500 millisecond intervals, whichever is shorter. The thread pool uses this opportunity to try adding threads (or taking them away), guided by feedback from previous changes in the thread count. If adding threads seems to be helping throughput, the thread pool adds more; otherwise, it reduces the number of worker threads. This technique is called the hill-climbing heuristic.

Therefore, one reason to keep individual tasks short is to avoid "starvation detection," but another reason to keep them short is to give the thread pool more opportunities to improve throughput by adjusting the thread count. The shorter the duration of individual tasks, the more often the thread pool can measure throughput and adjust the thread count accordingly.

To make this concrete, consider an extreme example. Suppose that you have a complex financial simulation with 500 processor-intensive operations, each one of which takes ten minutes on average to complete. If you create top-level tasks in the global queue for each of these operations, you will find that after about five minutes the thread pool will grow to 500 worker threads. The reason is that the thread pool sees all of the tasks as blocked and begins to add new threads at the rate of approximately two threads per second.

What's wrong with 500 worker threads? In principle, nothing, if you have 500 cores for them to use and vast amounts of system memory. In fact, this is the long-term vision of parallel computing. However, if you don't have that many cores on your computer, you are in a situation where many threads are competing for time slices. This situation is known as processor oversubscription. Allowing many processor-intensive threads to compete for time on a single core adds context switching overhead that can severely reduce overall system throughput. Even if you don't run out of memory, performance in this situation can be much, much worse than in sequential computation. (Each context switch takes between 6,000 and 8,000 processor cycles.) The cost of context switching is not the only source of overhead. A managed thread in .NET consumes roughly a megabyte of stack space, whether or not that space is used for currently executing functions. It takes about 200,000 CPU cycles to create a new thread, and about 100,000 cycles to retire a thread. These are expensive operations.

As long as your tasks don't each take minutes, the thread pool's hill-climbing algorithm will eventually realize it has too many threads and cut back on its own accord. However, if you do have tasks that occupy a worker thread for many seconds or minutes or hours, that will throw off the thread pool's heuristics, and at that point you should consider an alternative.

The first option is to decompose your application into shorter tasks that complete fast enough for the thread pool to successfully control the number of threads for optimal throughput.

A second possibility is to implement your own task scheduler object that does not perform thread injection. If your tasks are of long duration, you don't need a highly optimized task scheduler because the cost of scheduling will be negligible compared to the execution time of the task. MSDN® developer program has an example of a simple task scheduler implementation that limits the maximum degree of concurrency. For more information, see the section, "Further Reading," at the end of this chapter.

As a last resort, you can use the SetMaxThreads method to configure the ThreadPool class with an upper limit for the number of worker threads, usually equal to the number of cores (this is the Environment.ProcessorCount property). This upper limit applies for the entire process, including all AppDomains.

Note

The SetMaxThreads method can cause deadlock if thread pool worker threads are waiting for scheduled work items to run. Use it with extreme caution.

Bypassing the Thread Pool

If you don't want a task to use a worker thread form the thread pool, you can create a new thread for its dedicated use. The new thread will not be a thread pool worker thread. To do this, include the LongRunning task creation option as an argument to one of the task factory methods. The option is mostly used for tasks with long I/O-related wait conditions and for tasks that act as background helpers.

A disadvantage of bypassing the thread pool is that, unlike a worker thread that is created by the thread pool's thread injection logic, a thread created with the LongRunning option cannot use inline execution for its subtasks.

Exercises

  1. The image blender example in this chapter uses task parallelism: a different task processes each image layer. A typical strategy in image processing uses data parallelism: the same computation processes different portions of an image or different images. Is there a way to use data parallelism in the image blender example? If there is, what are the advantages and disadvantages, compared to the task parallelism discussed here?
  2. In the image blender sample, the image processing methods SetToGray and Rotate are void methods that do not return results, but they save their results by updating their second argument. Why don't they return their results?
  3. In the image blender sample that uses Task.Factory.StartNew, what happens if one of the parallel tasks throws an exception? Answer the same question for the sample that uses Parallel.Invoke.

Further Reading

Leijen et al. discusses design considerations, including scheduling and work stealing. Hoag provides a detailed discussion of task creation options. The Microsoft Parallel Samples package is ParExtSamples on MSDN. The how-to article on MSDN is a good resource for writing custom task schedulers. The Task Scheduler Events page on MSDN provides more information about unobserved task exceptions.

D. Leijen, W. Schulte, and S. Burckhardt. "The Design of a Task Parallel Library." S. Arora and G.T. Leavens, editors, OOP-SLA 2009: Proceedings of the 24th Annual ACM SIGPLAN Conference on Object-Oriented Programming, Systems, Languages, and Applications, pages 227–242. ACM, 2009.

J. E. Hoag. "A Tour of Various TPL Options." April 2009.
https://blogs.msdn.com/b/pfxteam/archive/2010/04/19/9997552.aspx

ParExtSamples software. "Samples for Parallel Programming with the .NET Framework 4."
https://code.msdn.microsoft.com/ParExtSamples

"How to: Create a Task Scheduler That Limits the Degree of Concurrency."
https://msdn.microsoft.com/en-us/library/ee789351.aspx

.NET Framework Class Library. "TaskScheduler Events."
https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler_events.aspx

Next | Previous | Home | Community