Freigeben über


Introduction to Asynchronous Agents Library

One of the native concurrency components coming in Visual Studio 2010 is the Asynchronous Agents Library. The goal of the Agents Library is to improve developer productively and enable developers to take advantage of concurrency with an agent-based message passing model to build isolated and composable components. In this blog, I will provide an introduction to the features available in the Asynchronous Agents Library and go into more detail about the agent class itself with an example program.

An Agent Based Model

Programming with an agent-based model allows concurrency to be inherently baked into the program from the start. Separating parts of a program into composable and reusable pieces that follow well-defined boundaries to communicate with message passing can tolerate latencies and effectively use parallel resources. By avoiding sharing memory when possible and focusing on data dependencies, scaling performance can be obtained using higher level abstractions, like agents, for parallelism. For more information on agent based models take a look here:

https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts

Overview of Agents Library Features

The Agents Library can be broken down into three basic parts:

The Agent Class – The agent class itself is intended for course grained parallelism/components that handle larger computationally intensive tasks or collections of smaller tasks. Fundamentally, agents are tasks that have an observable lifecycle and communicate with other agents by using message passing. Agents are NOT intended to be used for fine-grained parallelism; for that, the patterns and constructs in the Parallel Patterns Library are better suited.

Message Blocks – To enable mechanisms for communicating between asynchronous agents and isolated program components, we provide a collection of various message blocks:

  • unbounded_buffer
  • overwrite_buffer
  • single_assignment
  • call
  • transformer
  • timer
  • join
  • multitype_join
  • choice

For example, an unbounded_buffer works very similar to a queue. It collects messages from each of its source blocks and offers them in order to its target blocks.

In addition, infrastructure for easily creating your own message blocks is included.

Coordination Primitives – For inserting and removing messages from a group of messaging blocks (a messaging network), we include a set of APIs:

  • send
  • asend
  • receive
  • try_receive

In later blogs, I will dive into the specifics of each message block and coordination primitive explaining how and when to use them.

Agent Class in More Detail

Essentially an agent is a light weight task (LWT) with observable state and cancellation. The lifecycle of an agent is as follows:

AgentStateDiagram

To see how an agent passes through the states, let’s look at the simplest case creating, starting, and waiting on an agent:

     MyAgent myagent();

myagent is now in the ‘agent_created’ state.

     myagent.start();

Calling start() moves the agent to ‘agent_runnable’.

     agent::wait(&myagent);

Wait blocks until myagent reaches one of the final states (agent_cancelled, agent_done, or agent_faulted).

The agent class contains a protected pure virtual run() method which is called when the agent is executed:

     virtual void run() = 0;

When start() is called a light weight task is scheduled and the function will immediately return; once there are available resources the scheduler will start executing the agent task, which will call the run() method when it executes. When an agent is finished, it must call the done() function to signal completion of its work. Commonly, done() is called at the end of the run() function; however, in some circumstances, it may be desirable to decouple the lifetime of the agent from the execution of the run method, such as if the agent does work asynchronously.

A single agent can be waited on by using the wait function, or multiple agents using the wait_for_one and wait_for_all functions on an array of agents. Waiting on an agent is one way of observing an agents state or status. The two following methods allow for directly querying the status and obtaining a message block to receive from:

     agent_status status(); 
    ISource<agent_status> * status_port();

Agents also can be canceled:

     bool cancel();

If an agent has not started yet and cancel is called it will not be run and its status will be canceled, however if the agent has already started then calling cancel has no effect. Already started agents can’t be stopped.

Agents Sample:

To better illustrate agents, consider this implementation of a find string. The program works by searching a directory recursively for files matching an expression and then searching each file for a given string, very similar to ‘findstr’. The program is separated into several agents with specific jobs. One agent is for locating matching files, several for parsing the files looking for the specified string, and one for writing out found matches to the console. This sample also uses unbounded_buffer, asend, and receive.

FindStringDiagram

FileFinder.h:

This agent takes a string containing the type of files to search for. Each matching file found is sent to an unbounded_buffer m_pFileNames which the FileReader agents receive from.

Here is the run() method. It searches each directory recursively for matching files. A special end message is also used at the end to signal there are no more files. Notice at the end of run() the done() function is called.

 void run()
{
    wchar_t currentDirectory[MAX_FILE_NAME] = L".\\";
    FindFilesRecursively(currentDirectory);

    // Send END_MSG to signal done processing.
    asend(m_pFileNames, (wchar_t *)END_MSG);
    done(agent_done);
}

Here is the code where matching file names are sent to the unbounded_buffer:

 while(moreFiles)
{
    // Copy file name, receiver will be responsible for cleanup.
    pFileName = new wchar_t[MAX_FILE_NAME];
    wcscpy_s(pFileName, MAX_FILE_NAME, pDirectory);
    wcscat_s(pFileName, MAX_FILE_NAME, findFileData.cFileName);

    //
    // Send the filename to the unbounded_buffer.
    //
    asend(m_pFileNames, pFileName);
    moreFiles = FindNextFile(hFind, &findFileData);
}

FileReader.h:

The FileReader agent keeps receiving file names to parse from an input unbounded_buffer and sends any matches to another unbounded_buffer which the ConsoleWriter will receive from.

Here is the run() method. Notice the while loop that keeps receiving messages, until the end message is reached, from the same unbounded_buffer the FileFinder agent was using.

 void run()
{
    wchar_t *pFileName;

    //
    // Repeatedly pull filename messages out of the 
    // unbounded_buffer to parse until the END_MSG is 
    // reached. Note this will ConcRT aware block until
    // a message is avaliable in the buffer.
    //
    size_t currentLineNum;
    const size_t lineSize = 2000;
    wchar_t line[lineSize];
    while((int)(pFileName = receive(m_pFileNames)) != END_MSG)
    {
        currentLineNum = 1;
        wifstream inputFile;
        inputFile.open(pFileName, ifstream::in);
        while(inputFile.good() 
            && inputFile.getline(line, lineSize))
        {
            if(wcsstr(line, m_pSearchString) != NULL)
            {
                //
                // Create a new message payload and send it to 
                // the unbounded_buffer for the ConsoleWriter 
                // to receive from.
                //
                asend(m_pFoundBuffer, new Payload(pFileName, 
                    wcsnlen(pFileName, MAX_FILE_NAME)+1,
                    currentLineNum,
                    line,
                    wcsnlen(line, lineSize)+1));
            }

            ++currentLineNum;
            line[0] = '\0';
        }
        inputFile.close();
        delete pFileName;
    }

    // Resend the END_MSG for any other FileReaders.
    asend(m_pFileNames, (wchar_t *)END_MSG);
    done(agent_done);
}

ConsoleWriter.h:

This agent receives messages containing information about found matches and prints the information to the console.

Here is a snippet from its run method. It loops receiving messages from the unbounded_buffer the FileReaders send to.

 //
// Keep receiving messages to print to the console until
// the END_MSG is received. Note this will ConcRT aware
// block until a message is avaliable in the buffer.
//
while((int)(pPayload = receive(m_pFoundBuffer)) != END_MSG)
{
    SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE 
        | FOREGROUND_GREEN | FOREGROUND_RED | FOREGROUND_INTENSITY); 
    printf("%ls:%lu:", pPayload->m_pFileName, 
        (unsigned long)pPayload->m_lineNumber);
    SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE 
        | FOREGROUND_GREEN | FOREGROUND_RED);
    printf(":%ls\n", pPayload->m_pLine);
    delete pPayload;
}

Payload.h:

Defines a class used to pass messages between FileReaders and the ConsoleWriter. It holds the file name a match was found in, the line number, and the line itself.

FindString.cpp

Driver of the program creates, starts, and then waits on all the agents.

Here is the code to create the unbounded_buffers:

     unbounded_buffer<wchar_t *> fileBuffer;
    unbounded_buffer<Payload *> foundBuffer;

Creation of the agents:

 // Agent to find files.
FileFinder fileFinder(pFilePattern, &fileBuffer);
fileFinder.start();

// Agents to handle parsing files.
FileReader **ppFileReaderAgents = new FileReader*[numAgents];
for(unsigned int i = 0; i < numAgents; ++i)
{
    ppFileReaderAgents[i] = new FileReader(&fileBuffer, 
        pStringPattern, &foundBuffer);
    ppFileReaderAgents[i]->start();
}

// Agent to handle writing to the console.
ConsoleWriter consoleWriter(&foundBuffer);
consoleWriter.start();

Waiting on the agents to finish:

 // Wait for agents to finish.
agent::wait(&fileFinder);
for(unsigned int i = 0; i < numAgents; ++i)
{
    agent::wait(ppFileReaderAgents[i]);
    delete ppFileReaderAgents[i];
}
delete [] ppFileReaderAgents;

// Now that all other agents have finished signal the ConsoleWriter
// it can finish.
send(foundBuffer, (Payload *)END_MSG);
agent::wait(&consoleWriter);

In this program, I created agents to each handle large chunks of isolated work and I/O. Notice how parallelism is built in to the program by using higher levels of abstraction from the beginning. I created a pipeline of agents that work concurrently together with almost no knowledge of each other. Each agent takes in messages, performs some action, and then accordingly sends a message. Since each agent can be running at the same time I can exploit any available cores on the system; notice how there are many FileReader agents used, this means the program will be parsing multiple files concurrently at the same time. By designing my program in this fashion I can easily add, remove, or modify individual agents without needed to restructure anything else. I also avoided having to deal explicitly with locks and other difficult forms of synchronization.

To see all the code and play around with this sample download the project and source files here:

Code Samples

Example command line execution of this program is:

FindString.exe agents *.txt

In future posts, I will go into more detail about each message block the Asynchronous Agents Library provides, how to use them, the coordination primitives, and how to take advantage of our infrastructure and easily construct your own message blocks.

Comments

  • Anonymous
    June 04, 2009
    PingBack from http://tryfly.com/?p=144

  • Anonymous
    August 19, 2010
    Really a good topic. I am happy that Microsoft is now taking equally interest in VC++ also. Still the UI portion is very limited, waiting to other articles on UI topics.