Поделиться через


Asynchronous Agents Library

The Asynchronous Agents Library (or just Agents Library) provides a programming model that lets you increase the robustness of concurrency-enabled application development. The Agents Library is a C++ template library that promotes an actor-based programming model and in-process message passing for coarse-grained dataflow and pipelining tasks. The Agents Library builds on the scheduling and resource management components of the Concurrency Runtime.

Programming Model

The Agents Library provides alternatives to shared state by letting you connect isolated components through an asynchronous communication model that is based on dataflow instead of control flow. Dataflow refers to a programming model where computations are made when all required data is available; control flow refers to a programming model where computations are made in a predetermined order.

The dataflow programming model is related to the concept of message passing, where independent components of a program communicate with one another by sending messages.

The Agents Library is composed of three components: asynchronous agents, asynchronous message blocks, and message-passing functions. Agents maintain state, and use message blocks and message-passing functions to communicate with one another and with external components. Message-passing functions enable agents to send and receive messages to and from the external components. Asynchronous message blocks hold messages and enable agents to communicate in a synchronized manner.

The following illustration shows how two agents use message blocks and message-passing functions to communicate. In this illustration, agent1 sends a message to agent2 by using the Concurrency::send function and a Concurrency::unbounded_buffer object. agent2 uses the Concurrency::receive function to read the message. agent2 uses the same method to send a message to agent1. Dashed arrows represent the flow of data between agents. Solid arrows connect the agents to the message blocks that they write to or read from.

The components of the Agents Library

A code example that implements this illustration is shown later in this topic.

The agent programming model has several advantages over other concurrency and synchronization mechanisms, for example, events. One advantage is that by using message passing to transmit state changes between objects, you can isolate access to shared resources, and thereby improve scalability. An advantage to message passing is that it ties synchronization to data instead of tying it to an external synchronization object. This simplifies data transmission among components and can eliminate programming errors in your applications.

When to Use the Agents Library

Use the Agents library when you have multiple operations that must communicate with one another asynchronously. Message blocks and message-passing functions let you write parallel applications without requiring synchronization mechanisms such as locks. This lets you focus on application logic.

The agent programming model is often used to create data pipelines or networks. A data pipeline is a series of components, each of which performs a specific task that contributes to a larger goal. Every component in a dataflow pipeline performs work when it receives a message from another component. The result of that work is passed to other components in the pipeline or network. The components can use more fine-grained concurrency functionality from other libraries, for example, the Parallel Patterns Library (PPL).

Example

The following example implements the illustration shown earlier in this topic.

// basic-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// This agent writes a string to its target and reads an integer
// from its source.
class agent1 : public agent 
{
public:
   explicit agent1(ISource<int>& source, ITarget<wstring>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Send the request.
      wstringstream ss;
      ss << L"agent1: sending request..." << endl;
      wcout << ss.str();

      send(_target, wstring(L"request"));

      // Read the response.
      int response = receive(_source);

      ss = wstringstream();
      ss << L"agent1: received '" << response << L"'." << endl;
      wcout << ss.str();

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<int>& _source;
   ITarget<wstring>& _target;
};

// This agent reads a string to its source and then writes an integer
// to its target.
class agent2 : public agent 
{
public:
   explicit agent2(ISource<wstring>& source, ITarget<int>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Read the request.
      wstring request = receive(_source);

      wstringstream ss;
      ss << L"agent2: received '" << request << L"'." << endl;
      wcout << ss.str();

      // Send the response.
      ss = wstringstream();
      ss << L"agent2: sending response..." << endl;
      wcout << ss.str();

      send(_target, 42);

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<wstring>& _source;
   ITarget<int>& _target;
};

int wmain()
{
   // Step 1: Create two message buffers to serve as communication channels
   // between the agents.

   // The first agent writes messages to this buffer; the second
   // agents reads messages from this buffer.
   unbounded_buffer<wstring> buffer1;

   // The first agent reads messages from this buffer; the second
   // agents writes messages to this buffer.
   overwrite_buffer<int> buffer2;

   // Step 2: Create the agents.
   agent1 first_agent(buffer2, buffer1);
   agent2 second_agent(buffer1, buffer2);

   // Step 3: Start the agents. The runtime calls the run method on
   // each agent.
   first_agent.start();
   second_agent.start();

   // Step 4: Wait for both agents to finish.
   agent::wait(&first_agent);
   agent::wait(&second_agent);
}

This example produces the following output:

agent1: sending request...
agent2: received 'request'.
agent2: sending response...
agent1: received '42'.

The following topics describe the functionality used in this example.