演练:创建数据流代理

本文档演示如何创建基于代理的应用程序,这些应用程序基于数据流而不是控制流。

“控制流”是指程序中操作的执行顺序。通过使用条件语句、循环等控制结构可调节控制流。“数据流”是指在所有必需数据都可用时进行计算的编程模型。数据流编程模型与“消息传递”概念相关,在消息传递中,程序的独立组件通过发送消息与另一个程序进行通信。

异步代理支持控制流和数据流编程模型。在某些情况下可以使用控制流模型,而在其他情况下(例如,在代理接收数据并执行基于此数据负载的操作时)可以使用数据流模型。

系统必备

在开始本演练之前,请阅读下列文档:

各节内容

本演练包含以下各节:

  • 创建基本控制流代理

  • 创建基本数据流代理

  • 创建消息日志记录代理

创建基本控制流代理

请考虑以下定义了 control_flow_agent 类的示例。control_flow_agent 类作用于三个消息缓冲区:一个输入缓冲区和两个输出缓冲区。run 方法从循环中的源消息缓冲区读取数据并使用条件语句来引导程序执行流。该代理会使某个计数器按非零的负值递增,使另一个计数器按非零的正值递增。在收到零标记值后,该代理会向输出消息缓冲区发送计数器的值。negativespositives 方法使应用程序能够从代理读取负值和正值的计数。

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

虽然此示例说明了代理中控制流的基本使用方法,但它演示了基于控制流的编程的串行特性。尽管输入消息缓冲区中可能包含多条可用消息,但必须按顺序处理每条消息。通过数据流模型可以同时评估条件语句的两个分支。当数据流模型可用时,您可以通过它创建作用于数据的更复杂的消息网络。

Top

创建基本数据流代理

本节演示如何转换 control_flow_agent 类,以便使用数据流模型执行相同的任务。

数据流代理通过创建消息缓冲区的网络进行工作,而其中每个缓冲区都可提供特定服务。某些消息块根据消息的负载使用筛选功能来接受或拒绝该消息。筛选功能可确保消息块只接收特定值。

将控制流代理转换为数据流代理

  1. control_flow_agent 类的主体复制到另一个类,例如,dataflow_agent。或者,可以重命名 control_flow_agent 类。

  2. run 方法中移除调用 receive 的循环主体。

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. run 方法中,在初始化 negative_countpositive_count 变量后,添加跟踪活动操作计数的 countdown_event 对象。

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    本主题后面演示了 countdown_event 类。

  4. 创建将参与数据流网络的消息缓冲区对象。

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. 连接消息缓冲区以便形成网络。

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. 等待设置 eventcountdown event 对象。这些事件通过发送信号指示代理已经接收标记值以及所有操作均已完成。

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

下图显示了 dataflow_agent 类的完整数据流网络:

数据流网络

下表描述了网络中的成员。

成员

说明

increment_active

A concurrency::transformer 增加活动事件计数器并将输入的值传递给网络的其余部分的对象。

negatives, positives

concurrency::call 的活动事件计数器递增的数字和递减计数的对象。这些对象每次都使用筛选器接受负数或正数。

sentinel

A concurrency::call 接受的活动事件计数器的只有零和递减的 sentinel 值的对象。

connector

A concurrency::unbounded_buffer 连接到内部网络的源消息缓冲区对象。

因为在独立线程上调用了 run 方法,所以在完全连接网络以前,其他线程可以将消息发送到该网络。_source 数据成员是一个 unbounded_buffer 对象,它可以缓冲从应用程序发送到代理的所有输入。若要确保网络能够处理所有输入消息,代理首先要链接网络的内部节点,然后再开始将此网络的 connector 链接到 _source 数据成员。这可以确保在形成网络时不会处理消息。

因为此示例中的网络基于数据流而不是控制流,所以网络必须与代理进行通信,该代理已经处理完每个输入值并且标记节点已接收它的值。本示例使用countdown_event信号处理完所有的输入的值的对象和 concurrency::event 指示的 sentinel 节点已收到它的值的对象。当计数器值达到零时,countdown_event 类使用 event 对象发出信号。数据流网络的报头每次在收到值时都会递增计数器。网络的每个终端节点在处理输入值后都会递减计数器。在代理形成数据流网络以后,它会等待标记节点设置 event 对象并等待 countdown_event 对象发出其计数器已达到零的信号。

以下示例显示了 control_flow_agentdataflow_agentcountdown_event 类。wmain 函数创建一个 control_flow_agent 和一个 dataflow_agent 对象,并使用 send_values 函数将一系列随机值发送到代理。

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

此示例产生下面的示例输出:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Ff398051.collapse_all(zh-cn,VS.110).gif编译代码

将示例代码复制并将其粘贴在 Visual Studio 项目中,或将它粘贴到一个文件,名为数据流 agent.cpp ,然后在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc dataflow-agent.cpp

Top

创建消息日志记录代理

以下示例显示了 log_agent 类,它类似于 dataflow_agent 类。log_agent 类可实现将日志消息写入文件和控制台的异步日志记录代理。log_agent 类使应用程序能够将消息分类为信息、警告或错误。它还使应用程序能够指定是将每个日志类别写入文件或控制台,还是同时写入这两者中。此示例将所有日志消息写入文件并只将错误消息写入控制台。

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

此示例将下列输出写入控制台。

error: This is a sample error message.

此示例还生成 log.txt 文件,后者包含以下文本。

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Ff398051.collapse_all(zh-cn,VS.110).gif编译代码

将示例代码复制并将其粘贴在 Visual Studio 项目中,或将它粘贴到一个文件,名为日志 filter.cpp ,然后在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc log-filter.cpp

Top

请参见

其他资源

并发运行时演练