演练:创建基于代理的应用程序

本主题介绍了如何创建基本的基于代理的应用程序。 在本演练中,你可以创建一个代理,从文本文件中异步读取数据。 应用程序使用 Adler-32 校验和算法来计算该文件内容的校验和。

先决条件

必须了解以下主题才能完成本演练:

章节

本演练演示如何执行以下任务:

创建控制台应用

本部分介绍了如何创建 C++ 控制台应用程序,用于引用程序将使用的头文件。 根据所使用的 Visual Studio 版本,初始步骤会有所不同。 若要查看 Visual Studio 首选项的文档,请使用“版本”选择器控件。 它位于此页面上目录表的顶部。

在 Visual Studio 中创建 C# 控制台应用程序

  1. 在主菜单中,选择“文件”>“新建”>“项目”,打开“创建新项目”对话框

  2. 在对话框顶部,将“语言”设置为“C++”,将“平台”设置为“Windows”,并将“项目类型”设置为“控制台”。

  3. 从筛选的项目类型列表中,选择“控制台应用”,然后选择“下一步” 。 在下一页中,输入 BasicAgent 作为项目的名称,并根据需要指定项目位置。

  4. 选择“创建”按钮创建项目。

  5. 右键单击“解决方案资源管理器”中的项目节点,并选择“属性”。 在“配置属性”>“C/C++”>“预编译标头”>“预编译标头”下,选择“创建”

在 Visual Studio 2017 和更早版本中创建 C++ 控制台应用程序

  1. 在“文件”菜单上,单击“新建”,然后单击“项目”以显示“新建项目”对话框。

  2. 在“新建项目”对话框中,选择“项目类型”窗格中的“Visual C++”节点,然后在“模板”窗格中选择“Win32 控制台应用程序”。 为项目键入一个名称,例如 BasicAgent,然后单击“确定”以显示“Win32 控制台应用程序向导”

  3. 在“Win32 控制台应用程序向导”对话框中,单击“完成”

更新头文件

在 pch.h(Visual Studio 2017 及更早版本中为 stdafx.h)文件中,添加以下代码:

#include <agents.h>
#include <string>
#include <iostream>
#include <algorithm>

头文件 agents.h 包含 concurrency::agent 类的功能。

验证应用程序

最终,通过生成并运行应用程序来验证其是否已成功创建。 若要生成应用程序,请在“生成”菜单上单击“生成解决方案”。 如果应用程序已成功生成,请单击“调试”菜单上的“开始调试”来运行应用程序

[返回页首]

创建 file_reader 类

本部分介绍如何创建 file_reader 类。 运行时计划每个代理在其自己的上下文中执行工作。 因此,可以创建一个代理,该代理以同步方式执行工作,但与其他组件异步交互。 file_reader 类从给定输入文件读取数据,并将该文件中的数据发送到给定的目标组件。

创建 file_reader 类

  1. 将新的 C++ 头文件添加到项目。 为此,请右键单击“解决方案资源管理器”中的“头文件”节点,单击“添加”,然后单击“新建项”。 在“模板”窗格中,选择“头文件(.h)”。 在“添加新项”对话框中,在“名称”框中键入 file_reader.h,然后单击“添加”

  2. 在 file_reader.h 中,添加以下代码。

    #pragma once
    
  3. 在 file_reader.h 中,创建一个派生自 agent 的名为 file_reader 的类。

    class file_reader : public concurrency::agent
    {
    public:
    protected:
    private:
    };
    
  4. 将以下数据成员添加到类的 private 部分。

    std::string _file_name;
    concurrency::ITarget<std::string>& _target;
    concurrency::overwrite_buffer<std::exception> _error;
    

    _file_name 成员是代理从中读取的文件名。 _target 成员是代理向其写入文件内容的 concurrency::ITarget 对象。 _error 成员保留代理生命周期内发生的任何错误。

  5. file_reader 构造函数的以下代码添加到 file_reader 类的 public 部分。

    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target)
       : _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target,
       concurrency::Scheduler& scheduler)
       : agent(scheduler)
       , _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target,
       concurrency::ScheduleGroup& group)
       : agent(group) 
       , _file_name(file_name)
       , _target(target)
    {
    }
    

    每个构造函数重载均设置 file_reader 数据成员。 第二和第三个构造函数重载使应用程序能够将特定的计划程序与代理一起使用。 第一个重载将默认的计划程序与代理一起使用。

  6. get_error 方法添加到 file_reader 类的公共部分。

    bool get_error(std::exception& e)
    {
       return try_receive(_error, e);
    }
    

    get_error 方法检索代理生命周期内发生的任何错误。

  7. 在类的 protected 部分实现 concurrency::agent::run 方法。

    void run()
    {
       FILE* stream;
       try
       {
          // Open the file.
          if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
          {
             // Throw an exception if an error occurs.            
             throw std::exception("Failed to open input file.");
          }
       
          // Create a buffer to hold file data.
          char buf[1024];
    
          // Set the buffer size.
          setvbuf(stream, buf, _IOFBF, sizeof buf);
          
          // Read the contents of the file and send the contents
          // to the target.
          while (fgets(buf, sizeof buf, stream))
          {
             asend(_target, std::string(buf));
          }   
          
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Close the file.
          fclose(stream);
       }
       catch (const std::exception& e)
       {
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Write the exception to the error buffer.
          send(_error, e);
       }
    
       // Set the status of the agent to agent_done.
       done();
    }
    

run 方法打开文件并从中读取数据。 run 方法使用异常处理来捕获文件处理期间发生的任何错误。

每次该方法从文件读取数据时,它都会调用 concurrency::asend 函数,将该数据发送到目标缓冲区。 它将空字符串发送到其目标缓冲区,以指示处理结束。

下面的示例展示了 file_reader.h 的完整内容。

#pragma once

class file_reader : public concurrency::agent
{
public:
   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target)
      : _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target,
      concurrency::Scheduler& scheduler)
      : agent(scheduler)
      , _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target,
      concurrency::ScheduleGroup& group)
      : agent(group) 
      , _file_name(file_name)
      , _target(target)
   {
   }
   
   // Retrieves any error that occurs during the life of the agent.
   bool get_error(std::exception& e)
   {
      return try_receive(_error, e);
   }
   
protected:
   void run()
   {
      FILE* stream;
      try
      {
         // Open the file.
         if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
         {
            // Throw an exception if an error occurs.            
            throw std::exception("Failed to open input file.");
         }
      
         // Create a buffer to hold file data.
         char buf[1024];

         // Set the buffer size.
         setvbuf(stream, buf, _IOFBF, sizeof buf);
         
         // Read the contents of the file and send the contents
         // to the target.
         while (fgets(buf, sizeof buf, stream))
         {
            asend(_target, std::string(buf));
         }   
         
         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Close the file.
         fclose(stream);
      }
      catch (const std::exception& e)
      {
         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Write the exception to the error buffer.
         send(_error, e);
      }

      // Set the status of the agent to agent_done.
      done();
   }

private:
   std::string _file_name;
   concurrency::ITarget<std::string>& _target;
   concurrency::overwrite_buffer<std::exception> _error;
};

[返回页首]

在应用程序中使用 file_reader 类

本部分介绍了如何使用 file_reader 类读取文本文件的内容。 它还演示了如何创建 concurrency::call 对象,用来接收此文件数据并计算其 Adler-32 校验和。

在应用程序中使用 file_reader 类

  1. 在 BasicAgent.cpp 中,添加以下 #include 语句。

    #include "file_reader.h"
    
  2. 在 BasicAgent.cpp 中,添加以下 using 指令。

    using namespace concurrency;
    using namespace std;
    
  3. _tmain 函数中,创建 concurrency::event 对象来发送处理结束信号。

    event e;
    
  4. 创建 call 对象,该对象在接收数据时更新校验和。

    // The components of the Adler-32 sum.
    unsigned int a = 1;
    unsigned int b = 0;
    
    // A call object that updates the checksum when it receives data.
    call<string> calculate_checksum([&] (string s) {
       // If the input string is empty, set the event to signal
       // the end of processing.
       if (s.size() == 0)
          e.set();
       // Perform the Adler-32 checksum algorithm.
       for_each(begin(s), end(s), [&] (char c) {
          a = (a + c) % 65521;
          b = (b + a) % 65521;
       });
    });
    

    call 对象还会在接收空字符串时设置 event 对象,以发送处理结束信号。

  5. 创建 file_reader 对象以从文件 test.txt 读取内容并将该文件内容写入 call 对象。

    file_reader reader("test.txt", calculate_checksum);
    
  6. 启动代理,然后等待其完成。

    reader.start();
    agent::wait(&reader);
    
  7. 等待 call 对象接收所有数据并完成。

    e.wait();
    
  8. 检查文件读取器是否存在错误。 如果未发生错误,请计算最终的 Adler-32 总和并将总和打印到控制台。

    std::exception error;
    if (reader.get_error(error))
    {
       wcout << error.what() << endl;
    }   
    else
    {      
       unsigned int adler32_sum = (b << 16) | a;
       wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
    }
    

下面的示例展示了完整的 BasicAgent.cpp 文件。

// BasicAgent.cpp : Defines the entry point for the console application.
//

#include "pch.h" // Use stdafx.h in Visual Studio 2017 and earlier
#include "file_reader.h"

using namespace concurrency;
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
   // An event object that signals the end of processing.
   event e;

   // The components of the Adler-32 sum.
   unsigned int a = 1;
   unsigned int b = 0;

   // A call object that updates the checksum when it receives data.
   call<string> calculate_checksum([&] (string s) {
      // If the input string is empty, set the event to signal
      // the end of processing.
      if (s.size() == 0)
         e.set();
      // Perform the Adler-32 checksum algorithm.
      for_each(begin(s), end(s), [&] (char c) {
         a = (a + c) % 65521;
         b = (b + a) % 65521;
      });
   });

   // Create the agent.
   file_reader reader("test.txt", calculate_checksum);
   
   // Start the agent and wait for it to complete.
   reader.start();
   agent::wait(&reader);

   // Wait for the call object to receive all data and complete.
   e.wait();

   // Check the file reader for errors.
   // If no error occurred, calculate the final Adler-32 sum and print it 
   // to the console.
   std::exception error;
   if (reader.get_error(error))
   {
      wcout << error.what() << endl;
   }   
   else
   {      
      unsigned int adler32_sum = (b << 16) | a;
      wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
   }
}

[返回页首]

示例输入

这是输入文件 text.txt 的示例内容:

The quick brown fox
jumps
over the lazy dog

示例输出

随示例输入一起使用时,此程序将生成以下输出:

Adler-32 sum is fefb0d75

可靠编程

为防止对数据成员的并发访问,我们建议向类的 protectedprivate 部分添加用于执行工作的方法。 只将向代理发送消息或从代理接收消息的方法添加到类的 public 部分。

始终调用 concurrency::agent::done 方法以将代理移至已完成状态。 通常在从 run 方法返回之前调用此方法。

后续步骤

有关基于代理的应用程序的另一个示例,请参阅演练:使用联接避免死锁

另请参阅

异步代理库
异步消息块
消息传递函数
同步数据结构
演练:使用 join 避免死锁