Sdílet prostřednictvím


Asynchrony

We rarely speak of parallelism when discussing Axum, because you very rarely think about it when designing or writing an Axum application. Instead, you write a bunch of components which are all single-threaded and have them send messages to each other asynchronously; efficient use of parallel hardware comes from running many Axum agents simultaneously.

In most .NET-based scenarios, asynchronous operations are relatively scarce. When you need to use one of the various async features, it can have a huge impact on the structure and design of your application, but many manage to avoid having to do it at all.

Clearly, with Axum, this is not so. Interactions between agents, whether within a domain or in different domains, are always asynchronous. Thus, we have to go beyond what is available elsewhere in the platform and provide actual language support for asynchronous programming.

We are taking two approaches to handling asynchrony: a) data-flow networks, discussed in an earlier post by Artur Laksberg, and b) control-flow-based constructs. Here, I will cover the latter.

Receive

In Axum, asynchrony centers around ‘receive,’ which collects a message from a single source, such as a channel port. Logically, receive suspends execution of the agent or network that is executing until data is available from the source.

 int x = receive ( PrimaryChannel::InputMessage );

A naive implementation of receive would block the actual thread that is in use when the operator is reached. We have this in place for situations when that is really what you want, but it places a ceiling on the scalability of the application. For example, using the standard stack size of 1MB for each thread, you will quickly run out of address space (especially on 32-bit systems) when creating lots of agents. Threads are expensive to create, delete, and hold on to, and relying on a thread pool doesn’t help one bit when you block the thread with a synchronous call.

A better approach is to perform compiler transformations to the code similar to what the C# compiler does for iterators, which moves local variable to the heap and enables us to give up the thread while waiting for the data to come in. This makes agents much more light-weight and allows us to have thousands and thousands of agents running concurrently in a single process. I’ve had 500,000 blocked agents on my 2GB laptop without bringing it to its knees.

That kind of scale opens the model for use in entirely new categories of algorithms than what it can be used for when you can only have a couple hundred or a few thousand agents.

Alas, the compiler transformations are expensive when there are no receives in the code, the overhead is still there. Especially for leaf methods (those not calling other Axum methods), it is common to not do any message-passing, in which case the transformations are a waste of runtime performance.

Therefore, the default is that methods are not doing these transformations; we call these methods synchronous. The only category of methods that are asynchronous by default are the agent constructors, since they are never leaves. The default for other methods is overridden by declaring the method ‘asynchronous’:

 asynchronous protected int foo() { ... receive (x); ...}

The compiler takes care of everything else – you do not have to do anything beyond adding that one keyword to the declaration. Asynchronous methods calling other asynchronous methods will do so via compiler transformations that give up the thread while waiting for results. In fact, receive is implemented in the runtime as an asynchronous method.

Asynchronous Programming Model

Those who have tried to do asynchronous programming in .NET will be familiar with the Asynchronous Programming Model, APM. It builds on an operation XXX being encoded in two methods, BeginXXX and EndXXX: the former starts the operation, the latter collects the results (including any exceptions). The pattern is very powerful and flexible, so much so that it is sometimes difficult to program against or implement.

Even though ‘receive’ is the real foundation of asynchrony in Axum, we chose to support the APM in the same way, by treating any APM operation as an asynchronous operation. We did this to make it easier to build applications that mix I/O operations and message-passing, a major scenario for realizing concurrency with the Axum model.

Thus, to call System.IO.Stream.Read() using the APM, you don’t have to do anything, as long as you’re invoking it from an asynchronous method. Your code will look completely synchronous, but the compiler will make sure that it is not holding onto the thread while waiting.

 asynchronous int ReadFile(string path) 
{ 
    Stream stream = …;  
    int numRead = 0; 
    // This is where things are asynchronous. 
    while ( (numRead = stream.Read(buffer, ...) ) > 0 )  
    { 
        PrimaryChannel::NextBufferRead <-- buffer; 
    }

    return numRead;  
}

Interleave

Once you have the tools you need to productively work with individual asynchronous operations, how do we realize the full potential of parallel hardware? One way, as stated earlier, is to run many agents and pass messages between them. That’s why we have the support for asynchronous messages in the first place. Another is to try to do I/O in parallel. The parallelism you wind up with using Axum is different from the structured, very regular patterns of parallel for and such constructs; ideally, you will find ways of using both together.

Let start by considering the scenario of having more than one outstanding I/O operation. Doing I/O concurrently is very efficient on Windows because it allows the operating system to balance the amount of work based on how many processors there are. Also, you can potentially do some preliminary work in your code while the hardware devices are operating independently.

Building on the earlier example, to perform two simultaneous file read operations, we can just write this piece of code:

 asynchronous int ReadTwoFiles(string pthA, string pthB) 
{   
    int numRead = 0; 
    interleave 
    { 
        numRead += ReadFile(pthA); 
        numRead += ReadFile(pthB); 
    } 
    return numRead; 
}

What happens here is that the two statements under ‘interleave’ are coordinated so that multiple reads may be outstanding at any one point in time, but it does not introduce parallelism in the code itself: what is running concurrently are the I/O operations, not the user code.

In this specific case, the code will start reading from file A, and as soon as a read operation doesn’t immediately complete, there is an opportunity to start reading from file B. When that pauses, which means that the device is working on our behalf, the read from file A may complete and the code can go on to reading the next chunk.

If it hasn’t yet completed, the code waits for either operation to finish first, and whichever comes in first is resumed. Unlike the code in ReadFile, where the read operations are asynchronous but strictly ordered, this is unordered asynchrony. However, the interleave block is, as a block, ordered with respect to operations outside it:

 asynchronous int ReadThreeFiles(string pthA, 
                                string pthB, 
                                string pthC) 
{   
    int numRead = 0; 
    interleave 
    { 
        numRead += ReadFile(pthA); 
        numRead += ReadFile(pthB); 
    }  
    numRead += ReadFile(pthC);  
    return numRead; 
}

In the preceding example, either A or B may finish first, but reading from C isn’t started until both A and B have finished. Regardless, max one thread of execution is active an any point in time and we have have thousands and thousands of agents like this in a process consuming a minimum of threads from the thread pool.

This keeps the system busy without introducing data races in our code: updating ‘numRead’ in each of the branches of the interleave is perfectly safe, because the two statements, while technically concurrent, will never be executing code in parallel: “concurrent waiting, serial execution.”

There can be any number of statements under the interleave, and they may be any kind of statement, including block statements (which are probably the most useful to have there). One limitation, though, is that the number of statements has to be known at compile time.

Sometimes, though, you really want the number of statements to be determined at runtime. We’ll look at that in the context of another scenario: getting agents to do work in parallel with each other.

As agent A, I can send a message to another agent B, do a little bit of work before calling receive and waiting for a response. If A and B perform work at the same, we have some concurrency in our application. At some point, though, A probably needs to hear back from B in order to proceed, and they will then stop working in parallel. Likewise, if B finishes its work before A hits the receive, the two will not be working in parallel. Sooner or later, only one thing is running.

Either way, unless B also contacts C, which contacts D, etc., the maximum increase in efficiency is 100% for that period of time. What can we do about that? As Gustafson’s Law (i.e. John Gustafson, no relation), essentially points out, just do more! We can, for example, try to find a number of operations B, C, D, E and F, which all are independent of each other and run them in parallel with A, all controlled from A.

For example:

 asynchronous void a_method() 
{  
    foreach (var chan in {b, c, d, e, f}) 
    { 
        chan::RequestPort <-- new Request(...); 
        // Do something that takes a little while 
        var result = receive(chan::ReplyPort);  
        // Do some more processing with the result. 
    } 
}

The only thing is, that won’t work as we hoped – it is serial. A will work concurrently first with B, then C, then D, then E, then F. Again, the maximum increase in efficiency is 100%, just sustained for a longer period of time. This lengthening is a good thing, don’t get me wrong, as efficiency measured over time is increased. However, we don’t use that efficiency to improve our time-to-completion.

To do so, we need something else, a replacement for foreach.

Now, let us get back to the replacement for ‘foreach,' which looks like this, very similar to the interleave statement in the text earlier:

 asynchronous void a_method() 
{   
    interleave (var chan in {b, c, d, e, f}) 
    { 
        chan::RequestPort <-- new Request(...); 
        // Do something that takes a little while 
        var result = receive(chan::ReplyPort);  
        // Do some more processing with the result. 
    } 
}

What this will do is interact with B, C, D, E and F in an unordered fashion. That is, the five other agents will all be able to run in parallel, but we are not introducing any parallelism into A, as only one “fork” at a time will be executing the code in A. What happens is that the first iteration starts, sends its message, does some work, then blocks at the receive expression. At that point, the second iteration may start, do the same thing, and so on. This allows some overlap between the five agents we’re talking to.

Of course, if the work between the send and the receive is significant relative to the work going on in the other agents, there’s little overlap between them, since the other agent finishes before or right after A reaches the receive. This means that the second message isn’t sent until B has already finished, which takes us back to where we started.

The code can be fixed up to account for this by inserting a call to ‘wait(0)’:

 asynchronous void a_method() 
{   
    interleave (var chan in {b, c, d, e, f}) 
    { 
        chan::RequestPort <-- new Request(...); 
        wait(0); 
        // Do something that takes a little while 
        var result = receive(chan::ReplyPort);  
        // Do some more processing with the result. 
    } 
}

This will have the effect of yielding to the second branch, which yields to the third, etc. When all have started and blocked, the first restarts, etc. This increases the overlap between the other agents that we are orchestrating from A. wait(), like receive, is an asynchronous method. wait() should always be used instead of Thread.Sleep() in Axum. Thread.Sleep will block the thread invoking it, which is the last thing we want.

The two forms of interleave may be used not only with receive, but also with the APM pattern and any asynchronous method (which will, eventually, use either receive or an APM operation). A more interesting example of using it to orchestrate I/O than what we saw earlier would be a web crawler, which spawns off a separate, asynchronous, line of execution for every web site it is crawling. Since operation latency is likely to be high, using an interleave to gather data in parallel rather than serially really makes sense.

Niklas Gustafsson
Axumite

Comments