Freigeben über


[Multithreading] Asynchronous Sleep in Distributed applications

Scenario: You have to design and implement a distributed application where a lot of clients call different operations in parallel. In certain situations, in order to make the system behave better, you need to delay some operations. Some examples where this can be useful:

- The distributed system encounters a problem and you have a retry logic in place (eg. the network goes down and you need to retry until it comes up again). In order to avoid hammering the system with the retry logic, you decide to add a small delay between consecutive retries.

- You know a critical operation is taking place and you want to wait for it to finish, either because the resources are shared or it has a higher priority etc (eg. Updates are installed every night at 2 PM or trace data is collected every 4 hours, and your client operations can wait until that is done).

To implement the delay, all the threads of the application need to sleep for a period of time (that can be determined at runtime, eg. choose a random delay in order to avoid all requests to execute at the same time).

Consider an app that starts a number of parallel operations that need to sleep. You can implement this in a sync or async fashion.

1. Sync sleep – Create all the threads and set the work to include sleeping

 Thread[] clients = new Thread[this.ParallelOperations];
for (int i = 0; i < this.ParallelOperations; i++)
{
    clients[i] = new Thread(new ThreadStart(
        // We need a method void () target; 
        // for simplicity, inline it here
        () =>
        {
            // ... Previous operations 
            Stopwatch watch = new Stopwatch();
            watch.Start();
            Thread.Sleep(SleepTime);
            watch.Stop();
            Console.WriteLine("Elapsed time: {0} msec", 
                watch.ElapsedMilliseconds);
            // ... Subsequent operations 
        }));
}
 // Start all clients
for (int i = 0; i < this.ParallelOperations; i++)
{
    clients[i].Start();
}

// Wait for all clients to finish
for (int i = 0; i < this.ParallelOperations; i++)
{
    clients[i].Join();
}

This implementation manually creates as many threads as the number of operations that should be ran and calls Sleep (if you need to sleep on a thread that has STAThreadAttribute, but you want to perform standard COM and SendMessage pumping, consider using one of the overloads of the Join method that specifies a timeout interval - Thread.CurrentThread.Join).

Another option is to use the CLR threadpool instead. To wait for the threads, we can use an event that we signal when all operations are done.

 int clientsDone = 0;
ManualResetEvent waitEvent = new ManualResetEvent(false);

for (int i = 0; i < this.ParallelOperations; i++)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback(o =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        Thread.Sleep(SleepTime);
        watch.Stop();
        Console.WriteLine("Elapsed time: {0} msec", watch.ElapsedMilliseconds);
        if (Interlocked.Increment(ref clientsDone) >= this.ParallelOperations)
        {
            waitEvent.Set();
        }
    }));
}

waitEvent.WaitOne(TimeSpan.FromSeconds(WaitTime));
 // dispose of the timer and async event

The sync version has one huge problem: it doesn’t scale. If you look on msdn, it is clearly stated that “the thread will not be scheduled for execution by the operating system for the amount of time specified”. With another words, we are sitting on the current thread without doing any useful work. If the system has many clients, it’s a high possibility that we will eventually run out of threads because operations come in faster than the rate at which we finish them.

2. Async sleep – The alternative is for the operation to start the Sleep and leave the thread to do other, useful work; when the wait is done, the app will be notified and another thread will continue the execution.

To implement the sleep async, we use a Timer – when the callback is triggered, the sleep is done.

 int clientsDone = 0;
ManualResetEvent waitEvent = new ManualResetEvent(false);

Timer t;
Stopwatch watch;
TimerCallback callback;
for (int i = 0; i < ParallelOperations; i++)
{
    callback = new TimerCallback(
        o =>
        {
            Stopwatch passedWatch = (Stopwatch)o;
            passedWatch.Stop();
            Console.WriteLine("Elapsed time: {0} msec", 
                passedWatch.ElapsedMilliseconds);
            if (Interlocked.Increment(ref clientsDone) >= this.ParallelOperations)
            {
                waitEvent.Set();
            }
        });

    watch = new Stopwatch();
    watch.Start();
    t = new Timer(callback, watch, SleepTime, TimeSpan.FromMilliseconds(-1));
}

waitEvent.WaitOne(TimeSpan.FromSeconds(50));
// dispose of the timer and async event

Running this code with 10 operations and a sleep of 10 sec leads to this output:

Elapsed time: 10010 msec
Elapsed time: 10009 msec
Elapsed time: 10010 msec
Elapsed time: 10009 msec
Elapsed time: 10011 msec
Elapsed time: 10011 msec
Elapsed time: 10011 msec
Elapsed time: 10011 msec
Elapsed time: 10011 msec
Elapsed time: 10011 msec

(notice how the actual time is not 10 sec, but it differs slightly. If the system is in an overloaded state, the difference will be much higher. The same is true for the sync case).

This works, but let’s rewrite to be more in sync with the .NET FX Asynchronous Programming model. From msdn:

An asynchronous operation that uses the IAsyncResult design pattern is implemented as two methods named BeginOperationName and EndOperationName that begin and end the asynchronous operation OperationName respectively. For example, the FileStream class provides the BeginRead and EndRead methods to asynchronously read bytes from a file. These methods implement the asynchronous version of the Read method.

After calling BeginOperationName, an application can continue executing instructions on the calling thread while the asynchronous operation takes place on a different thread. For each call to BeginOperationName, the application should also call EndOperationName to get the results of the operation.

We can change our code to implement this pattern by defining an async result. To make my job easier, I took the abstract class AsyncResult.cs from one of the WCF MSDN Samples (download the samples, then go to the directory you saved them and look for WCFWFCardSpace\WCF\Basic\Contract\Service\Asynchronous\CS\library). This class implements the IAsyncResult interface.

Using this base class, I defined SleepAsyncResult to start the timer and complete the operation in callback (for simplicity, I always set completedAsync flag to true, this should be set accordingly in practice).

 class SleepAsyncResult : AsyncResult
{
    Timer t;
    readonly TimeSpan sleepTime;

    public SleepAsyncResult(TimeSpan sleepTime, AsyncCallback callback, object state)
        : base(callback, state)
    {
        this.sleepTime = sleepTime;
        this.Start();
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        TimerCallback cb = new TimerCallback(
           o =>
           {
               watch.Stop();
               Console.WriteLine("Elapsed time: {0} msec",
                   watch.ElapsedMilliseconds);
               this.Complete(false);
               t.Dispose();
           });

        watch.Start();
        t = new Timer(cb, null, this.sleepTime, TimeSpan.FromMilliseconds(-1));
    }

    public static void End(IAsyncResult result)
    {
        AsyncResult.End<SleepAsyncResult>(result);
    }
}

Then we write the Begin/End methods using this async result:

 int clientsDone = 0;
ManualResetEvent waitEvent = new ManualResetEvent(false);

IAsyncResult BeginSleep()
{
    return new SleepAsyncResult(SleepTime, EndSleep, null);  
}

void EndSleep(IAsyncResult result)
{
    SleepAsyncResult.End(result); 
    if (Interlocked.Increment(ref this.clientsDone) >= this.ParallelOperations)
    {
        this.waitEvent.Set();
    }
}

Implementing the Sleep becomes dead simple.

 for (int i = 0; i < ParallelOperations; i++)
{
    this.BeginSleep();
}

waitEvent.WaitOne(TimeSpan.FromSeconds(50));

Et voila! We are now sleeping asynchronously!

Note:

The code presented is just for demo purposes and I don’t recommend using it in production. Distributed code should be very careful with exception handling, multithreading issues, sharing resources, leaking resources etc.