Dela via


C++ Agents

At PDC 2008, we talked a lot about agents-based programming, in particular during the native C++ talks on parallel programming. We showed how the agents library that we plan to ship in VS 2010 can be used to overlap I/O with computationally intensive processing to raise the throughput of a program. I thought I should spend a couple of posts talking a little about what that means and why it's a good thing.

Agent-based programming is a pattern that has been in use for quite some time, often referred to as actor-based programming instead. We use the term 'agent.'

https://en.wikipedia.org/wiki/Actor_model

The basic idea is to remove dependencies between the active components of a program by having them communicate solely via asynchronous message-passing. While truly avoiding shared memory is hard, given current programming languages (i.e. C++), there are things we can do to help get close to the benefits of avoiding shared memory (robustness, and sometimes, performance). If you follow the pattern with discipline, you should be able to avoid having to take locks or worry about data races. (There are many other concurrency problems to worry about, though, such as deadlocks...)

The data-flow messaging blocks we showed at PDC may seem to have little to do with agents, but they are essential. The agent class itself, though, wasn't discussed much.

As always, let's start with "hello world". Here's what it would look like with our agents API:

#include "stdafx.h"
#include <agents.h>

using namespace ::Concurrency;

class agent1 : public agent
{
protected:
void run()
{
wprintf(L"Hello World!\n");
done(agent_done);
}
};

int _tmain(int argc, _TCHAR* argv[])
{
agent1 a1;
a1.start();

    agent::wait(&a1);

    return 0;
}

The agent class is simple: it has a run() method which does all the work and sets its own status to 'done'. We create the agent from _tmain(), start it and wait for it to finish. That's it. Notice that the agent class doesn't have any public members. That would be typical: most agents will only have a few public members, for example the constructors and accessor functions for the message-passing channels.

As cool as it was to see something printed to the screen, a single agent isn't very interesting, in all honesty. We have to have someone for it to communicate with. We'll add a second agent, and send some data between them. To do so, we have to add some form of communication channel...

class agent1 : public agent
{
public:
unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

protected:
void run()
{
wprintf(L"Hello World!\n");
asend(m_output, 10);

int reply = receive(m_input);
wprintf(L"Received (1): %d\n", reply);

        done(agent_done);
}

private:
ISource<int> *m_input;
unbounded_buffer<int> m_output;
};

class agent2 : public agent
{
public:
unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

protected:
void run()
{
int request = receive(m_input);
wprintf(L"Received (2): %d\n", request);

        asend(m_output, request + 7);

done(agent_done);
}

private:
ISource<int> *m_input;
unbounded_buffer<int> m_output;
};

int _tmain(int argc, _TCHAR* argv[])
{
agent1 a1;
agent2 a2;
a1.SetInput(a2.GetOutput());
a2.SetInput(a1.GetOutput());
a1.start();
a2.start();

    agent::wait(&a1);

    return 0;
}

Which results in:

Hello World!
Received (2): 10
Received (1): 17

Note that the separate start of the agent makes more sense here, given that we need to link the two agents together before either of them should run.

Some may worry about the fact that waiting for data with a receive call surely must be holding the stack up. That is a valid concern, especially if you have many, many agents. In fact, because of the 'receive' calls, this little example uses three threads. Ouch!

That is why the data-flow network block 'call' is so important -- it allows us to route data to methods (in this case, a C++ lamda) and not hold on to any stack while we're waiting.

Let's look at how that works for our agent sample. The _tmain() method remains identical, but the agents are slightly larger:

class agent1 : public agent
{
public:
unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

    ~agent1() { delete m_reply; }

protected:
void run()
{
wprintf(L"Hello World!\n");
asend(m_output, 10);

        m_input->link_target(m_reply = new call<int>([&] (int reply)
{
wprintf(L"Received (1): %d\n", reply);
done(agent_done);
}));
}

private:
call<int> *m_reply;
ISource<int> *m_input;
unbounded_buffer<int> m_output;
};

class agent2 : public agent
{
public:
unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

    ~agent2() { delete m_request; }

protected:
void run()
{
        m_input->link_target(m_request = new call<int>([&] (int request)
{
wprintf(L"Received (2): %d\n", request);
asend(m_output, request + 7);
done(agent_done);
}));
    }

private:
call<int> *m_request;
ISource<int> *m_input;
unbounded_buffer<int> m_output;
};

Instead of executing the receive call, we link up a call block that is allocated on the heap (it can't be on the stack, since that's what we're trying to get rid of here). Once the linking is done, we return from run() and give up the stack so it may be used for other things...

Since we have it on the heap, we also need to create a destructor to delete the call block. Note: once you get your hands on this, some of you may discover through experimentation that it is actually safe to delete the call block before returning from the lambda, but that's taking advantage of an implementation detail and I strongly discourage you from doing so.

That's it for right now, I think. Back to work...

Comments

  • Anonymous
    January 12, 2009
    In this blog I will be giving an introduction to the native concurrency support in Visual Studio 2010.
  • Anonymous
    May 29, 2009
    My fellow programming language enthusiast Matthew Podwysocki just posted another F#-based actor sample