异步代理库

异步代理库(简称代理库)提供了一个编程模型,该模型可提高支持并发的应用程序开发的可靠性。 代理库是一个 C++ 模板库,为粗粒度数据流和管道任务提升了基于角色的编程模型和进程内消息传递。 代理库构建在并发运行时的计划和资源管理组件上。

编程模型

代理库通过一个基于数据流而不是控制流的异步通信模型来连接隔离的组件,从而提供共享状态的替代方法。 数据流是指计算是在所有必需数据可用时进行的编程模型;控制流是指以预先确定的顺序进行计算的编程模型。

数据流编程模型与消息传递这一概念相关,其中程序的独立组件通过发送消息相互通信。

代理库由以下三个组件组成:异步代理、异步消息块和消息传递函数。 代理会维护状态,使用消息块和消息传递函数相互通信,并与外部组件通信。 消息传递函数使代理能够向外部组件发送消息,以及从其接收消息。 异步消息块可保存消息,并使代理能够以同步的方式进行通信。

下图显示了两个代理如何使用消息块和消息传递函数进行通信。 在此图中,通过使用 concurrency::send 函数和 concurrency::unbounded_buffer 对象,agent1 将消息发送到 agent2agent2 使用 concurrency::receive 函数读取消息。 agent2 使用相同的方法将消息发送到 agent1。 虚线箭头表示代理之间的数据流。 实心箭头将代理连接到可供写入内容或读取内容的消息块。

The components of the Agents Library.

本主题的后面部分显示了实现此图的代码示例。

代理编程模型相对于其他并发和同步机制(例如事件)具有多项优势。 一项优势是,通过使用消息传递在对象之间传输状态更改,可以隔离对共享资源的访问,从而提高可伸缩性。 消息传递的优势是,它将同步绑定到数据,而不是将其绑定到外部同步对象。 这简化了组件之间的数据传输,可以消除应用程序中的编程错误。

何时使用代理库

如果有多个必须以异步方式相互通信的操作,请使用代理库。 消息块和消息传递函数允许你编写并行应用程序,无不要求同步机制(例如锁)。 这使你可以专注于应用程序逻辑。

代理编程模型通常用于创建数据管道或网络。 数据管道是一系列组件,每个组件执行一个有助于实现更大目标的特定任务。 数据流管道中的每个组件会在收到来自另一组件的消息时执行工作。 该工作的结果将传递到管道或网络中的其他组件。 组件可以使用其他库的更精细的并发功能,例如并行模式库 (PPL)

示例

以下示例实现本主题前面所示的插图。

// basic-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// This agent writes a string to its target and reads an integer
// from its source.
class agent1 : public agent 
{
public:
   explicit agent1(ISource<int>& source, ITarget<wstring>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Send the request.
      wstringstream ss;
      ss << L"agent1: sending request..." << endl;
      wcout << ss.str();

      send(_target, wstring(L"request"));

      // Read the response.
      int response = receive(_source);

      ss = wstringstream();
      ss << L"agent1: received '" << response << L"'." << endl;
      wcout << ss.str();

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<int>& _source;
   ITarget<wstring>& _target;
};

// This agent reads a string to its source and then writes an integer
// to its target.
class agent2 : public agent 
{
public:
   explicit agent2(ISource<wstring>& source, ITarget<int>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Read the request.
      wstring request = receive(_source);

      wstringstream ss;
      ss << L"agent2: received '" << request << L"'." << endl;
      wcout << ss.str();

      // Send the response.
      ss = wstringstream();
      ss << L"agent2: sending response..." << endl;
      wcout << ss.str();

      send(_target, 42);

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<wstring>& _source;
   ITarget<int>& _target;
};

int wmain()
{
   // Step 1: Create two message buffers to serve as communication channels
   // between the agents.
   
   // The first agent writes messages to this buffer; the second
   // agents reads messages from this buffer.
   unbounded_buffer<wstring> buffer1;

   // The first agent reads messages from this buffer; the second
   // agents writes messages to this buffer.
   overwrite_buffer<int> buffer2;

   // Step 2: Create the agents.
   agent1 first_agent(buffer2, buffer1);
   agent2 second_agent(buffer1, buffer2);

   // Step 3: Start the agents. The runtime calls the run method on
   // each agent.
   first_agent.start();
   second_agent.start();

   // Step 4: Wait for both agents to finish.
   agent::wait(&first_agent);
   agent::wait(&second_agent);
}

该示例产生下面的输出:

agent1: sending request...
agent2: received 'request'.
agent2: sending response...
agent1: received '42'.

以下主题介绍此示例中使用的功能。

异步代理
描述异步代理在解决较大计算任务时的作用。

异步消息块
介绍代理库提供的各种消息块类型。

消息传递函数
介绍代理库提供的各种消息传递例程。

如何:实现各种制造者-使用者模式
介绍如何在应用程序中实现生产者-使用者模式。

如何:为 call 和 transformer 类提供工作函数
介绍几种用于向 concurrency::callconcurrency::transformer 类提供工作函数的方式。

如何:在数据管道中使用转换器
演示如何在数据管道中使用 concurrency::transformer 类。

如何:在已完成的任务之间选择
演示如何使用 concurrency::choiceconcurrency::join 类选择用于完成搜索算法的第一个任务。

如何:定期发送消息
演示如何使用 concurrency::timer 类定期发送消息。

如何:使用消息块筛选器
演示如何使用筛选器启用异步消息块来接受或拒绝消息。

并行模式库 (PPL)
介绍如何在应用程序中使用各种并行模式(例如并行算法)。

并发运行时
描述可以简化并发编程并包含相关主题链接的并发运行时。