演练:创建基于代理的应用程序

本主题描述如何创建基于代理的基本应用程序。 在本演练中,您可以创建以异步方式从文本文件读取数据的代理。 应用程序使用 Adler-32 校验和算法计算该文件的内容校验和。

系统必备

若要完成本演练,必须了解以下主题:

各节内容

本演练演示如何执行以下任务:

  • 创建控制台应用程序

  • 创建 file_reader 类

  • 在应用程序中使用 file_reader 类

创建控制台应用程序

本节演示如何创建一个引用该程序将使用的头文件的 Visual C++ 控制台应用程序。

通过使用 Win32 控制台应用程序向导创建 Visual C++ 应用程序

  1. 在**“文件”菜单上,单击“新建”,再单击“项目”以显示“新建项目”**对话框。

  2. 在**“新建项目”对话框中,选择“项目类型”窗格中的“Visual C++”节点,然后选择“模板”窗格中的“Win32 控制台应用程序”。 键入项目名称,例如 BasicAgent,然后单击“确定”以显示“Win32 控制台应用程序向导”**。

  3. 在**“Win32 控制台应用程序向导”中,单击“完成”**。

  4. 在 stdafx.h 中,添加下列代码。

    #include <agents.h>
    #include <string>
    #include <iostream>
    #include <algorithm>
    

    头文件 agents.h 包含 concurrency::agent 类的功能。

  5. 通过生成并运行应用程序来验证该应用程序已成功创建。 若要生成应用程序,请在**“生成”菜单上单击“生成解决方案”。 如果成功生成了该应用程序,请单击“调试”菜单上的“开始调试”**来运行该应用程序。

[Top]

创建 file_reader 类

本节演示如何创建 file_reader 类。 运行时安排每个代理在其自己的上下文中执行工作。 因此,您可以创建以同步方式执行工作但以异步方式与其他组件进行交互的代理。 file_reader 类读取给定输入文件中的数据,并将该文件中的数据发送到给定的目标组件。

创建 file_reader 类

  1. 将新的 C++ 头文件添加到您的项目中。 若要执行此操作,请在**“解决方案资源管理器”中右击“头文件”节点,单击“添加”,再单击“新建项”。 在“模板”窗格中选择“头文件(.h)”。 在“添加新项”对话框内的“名称”框中键入 file_reader.h,然后单击“添加”**。

  2. 在 file_reader.h 中添加以下代码。

    #pragma once
    
  3. 在 file_reader.h 中,创建名为 file_reader 的类,该类是从 agent 中派生的。

    class file_reader : public concurrency::agent
    {
    public:
    protected:
    private:
    };
    
  4. 将以下数据成员添加到您的类的 private 节中。

    std::string _file_name;
    concurrency::ITarget<std::string>& _target;
    concurrency::overwrite_buffer<std::exception> _error;
    

    _file_name 成员是代理从中进行读取的文件名。 _target 成员是代理向其写入文件内容的 concurrency::ITarget 对象。 _error 成员将保存在代理生命期内发生的任何错误。

  5. file_reader 构造函数的以下代码添加到 file_reader 类的 public 节中。

    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target)
       : _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target,
       concurrency::Scheduler& scheduler)
       : agent(scheduler)
       , _file_name(file_name)
       , _target(target)
    {
    }
    
    explicit file_reader(const std::string& file_name, 
       concurrency::ITarget<std::string>& target,
       concurrency::ScheduleGroup& group)
       : agent(group) 
       , _file_name(file_name)
       , _target(target)
    {
    }
    

    每个构造函数重载都将设置 file_reader 数据成员。 第二个和第三个构造函数重载使您的应用程序能够对您的代理使用特定的计划程序。 第一个重载对您的代理使用默认计划程序。

  6. get_error 方法添加到 file_reader 类的 public 节中。

    bool get_error(std::exception& e)
    {
       return try_receive(_error, e);
    }
    

    get_error 方法将检索在代理生命期内发生的任何错误。

  7. 在您类的 protected 部分实现 concurrency::agent::run 方法。

    void run()
    {
       FILE* stream;
       try
       {
          // Open the file.
          if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
          {
             // Throw an exception if an error occurs.            
             throw std::exception("Failed to open input file.");
          }
    
          // Create a buffer to hold file data.
          char buf[1024];
    
          // Set the buffer size.
          setvbuf(stream, buf, _IOFBF, sizeof buf);
    
          // Read the contents of the file and send the contents
          // to the target.
          while (fgets(buf, sizeof buf, stream))
          {
             asend(_target, std::string(buf));
          }   
    
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Close the file.
          fclose(stream);
       }
       catch (const std::exception& e)
       {
          // Send the empty string to the target to indicate the end of processing.
          asend(_target, std::string(""));
    
          // Write the exception to the error buffer.
          send(_error, e);
       }
    
       // Set the status of the agent to agent_done.
       done();
    }
    

    run 方法将打开该文件,并读取其中的数据。 run 方法使用异常处理功能捕获文件处理过程中发生的任何错误。

    每次该方法从文件读取数据时,它都会调用 concurrency::asend 函数以将该数据发送至目标缓冲区。 它会将空字符串发送到其目标缓冲区以指示处理结束。

下面的示例演示了 file_reader.h 的完整内容。

#pragma once

class file_reader : public concurrency::agent
{
public:
   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target)
      : _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target,
      concurrency::Scheduler& scheduler)
      : agent(scheduler)
      , _file_name(file_name)
      , _target(target)
   {
   }

   explicit file_reader(const std::string& file_name, 
      concurrency::ITarget<std::string>& target,
      concurrency::ScheduleGroup& group)
      : agent(group) 
      , _file_name(file_name)
      , _target(target)
   {
   }

   // Retrieves any error that occurs during the life of the agent.
   bool get_error(std::exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   void run()
   {
      FILE* stream;
      try
      {
         // Open the file.
         if (fopen_s(&stream, _file_name.c_str(), "r") != 0)
         {
            // Throw an exception if an error occurs.            
            throw std::exception("Failed to open input file.");
         }

         // Create a buffer to hold file data.
         char buf[1024];

         // Set the buffer size.
         setvbuf(stream, buf, _IOFBF, sizeof buf);

         // Read the contents of the file and send the contents
         // to the target.
         while (fgets(buf, sizeof buf, stream))
         {
            asend(_target, std::string(buf));
         }   

         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Close the file.
         fclose(stream);
      }
      catch (const std::exception& e)
      {
         // Send the empty string to the target to indicate the end of processing.
         asend(_target, std::string(""));

         // Write the exception to the error buffer.
         send(_error, e);
      }

      // Set the status of the agent to agent_done.
      done();
   }

private:
   std::string _file_name;
   concurrency::ITarget<std::string>& _target;
   concurrency::overwrite_buffer<std::exception> _error;
};

[Top]

在应用程序中使用 file_reader 类

本节演示如何使用 file_reader 类读取文本文件的内容。 它还演示如何创建接收该文件数据并计算其 Adler-32 校验和的 concurrency::call 对象。

在应用程序中使用 file_reader 类

  1. 在 BasicAgent.cpp 中,添加下面的 #include 语句。

    #include "file_reader.h"
    
  2. 在 BasicAgent.cpp 中,添加下面的 using 指令。

    using namespace concurrency;
    using namespace std;
    
  3. _tmain 函数中,创建发送处理结束信号的 concurrency::event 对象。

    event e;
    
  4. 创建一个在接收数据时更新校验和的 call 对象。

    // The components of the Adler-32 sum. 
    unsigned int a = 1;
    unsigned int b = 0;
    
    // A call object that updates the checksum when it receives data.
    call<string> calculate_checksum([&] (string s) {
       // If the input string is empty, set the event to signal 
       // the end of processing. 
       if (s.size() == 0)
          e.set();
       // Perform the Adler-32 checksum algorithm.
       for_each(begin(s), end(s), [&] (char c) {
          a = (a + c) % 65521;
          b = (b + a) % 65521;
       });
    });
    

    call 对象在收到空字符串时还将设置 event 对象,以通知处理结束。

  5. 创建 file_reader 对象,该对象从文件 test.txt 中进行读取并将该文件的内容写入到 call 对象。

    file_reader reader("test.txt", calculate_checksum);
    
  6. 启动代理并等待其完成。

    reader.start();
    agent::wait(&reader);
    
  7. 等待 call 对象接收所有数据并完成。

    e.wait();
    
  8. 检查文件读取器是否有错误。 如果没有发生错误,则计算最终的 Adler-32 总和并将该总和打印到控制台。

    std::exception error;
    if (reader.get_error(error))
    {
       wcout << error.what() << endl;
    }   
    else
    {      
       unsigned int adler32_sum = (b << 16) | a;
       wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
    }
    

下面的示例演示了完整的 BasicAgent.cpp 文件。

// BasicAgent.cpp : Defines the entry point for the console application. 
//

#include "stdafx.h"
#include "file_reader.h" 

using namespace concurrency;
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
   // An event object that signals the end of processing. 
   event e;

   // The components of the Adler-32 sum. 
   unsigned int a = 1;
   unsigned int b = 0;

   // A call object that updates the checksum when it receives data.
   call<string> calculate_checksum([&] (string s) {
      // If the input string is empty, set the event to signal 
      // the end of processing. 
      if (s.size() == 0)
         e.set();
      // Perform the Adler-32 checksum algorithm.
      for_each(begin(s), end(s), [&] (char c) {
         a = (a + c) % 65521;
         b = (b + a) % 65521;
      });
   });

   // Create the agent.
   file_reader reader("test.txt", calculate_checksum);

   // Start the agent and wait for it to complete.
   reader.start();
   agent::wait(&reader);

   // Wait for the call object to receive all data and complete.
   e.wait();

   // Check the file reader for errors. 
   // If no error occurred, calculate the final Adler-32 sum and print it  
   // to the console.
   std::exception error;
   if (reader.get_error(error))
   {
      wcout << error.what() << endl;
   }   
   else
   {      
      unsigned int adler32_sum = (b << 16) | a;
      wcout << L"Adler-32 sum is " << hex << adler32_sum << endl;
   }
}

[Top]

示例输入

以下是输入文件 text.txt 的示例内容:

  

示例输出

与示例输入结合使用时,此程序将生成以下输出:

  

可靠编程

为了防止同时访问数据成员,我们建议您将执行工作的方法添加到类的 protected 或 private 节中。 仅将向代理发送消息或从代理接收消息的方法添加到类的 public 节中。

始终调用 concurrency::agent::done 方法以将代理移至已完成状态。 通常在从 run 方法返回之前调用此方法。

后续步骤

有关基于代理的应用程序的另一个示例,请参见演练:使用联接避免死锁

请参见

任务

演练:使用联接避免死锁

概念

异步代理库

异步消息块

消息传递函数

同步数据结构