异步代理库

异步代理库(或简称为“代理库”)提供了一个编程模型,通过它可以增加启用并发的应用程序开发的可靠性。 代理库是一个 C++ 模板库,它可以提升基于参与者的编程模型和进程内消息传递,以执行细化数据流任务和流水线操作任务。 代理库基于并发运行时的计划和资源管理组件生成。

编程模型

代理库可提供共享状态的备选方案,方法是让您通过基于数据流(而不是控制流)的异步通信模型来连接独立的组件。 “数据流”指的是在所有必需的数据都可用时进行计算的编程模型;“控件流”指的是按预先确定的顺序进行计算的编程模型。

数据流编程模型与“消息传递”概念相关,在消息传递中,某个程序的独立组件通过发送消息与另一个程序进行通信。

代理库由三个组件组成:“异步代理”、“异步消息块”和“消息传递函数”。 代理维护状态,并使用消息块和消息传递函数与另一个组件和外部组件进行通信。 通过消息传递函数,代理可以将消息发送到外部组件和从外部组件接收消息。 异步消息块存放消息,并使通信能够以同步的方式进行通信。

下图显示了两个代理如何使用消息块和消息传递函数进行通信。 在此图中,通过使用 Concurrency::send 函数和 Concurrency::unbounded_buffer 对象,agent1 可将消息发送到 agent2agent2 使用 Concurrency::receive 函数读取消息。 agent2 使用相同的方法向 agent1 发送消息。 虚线箭头表示代理之间的数据流。 实线箭头将代理连接到它们在其中写入或读取的消息块。

代理库的组件

本主题后面将显示实现此图的代码示例。

代理编程模型具有其他并发和同步机制(如事件)所没有的一些优点。 一个优点是:通过使用消息传递在对象之间传输状态更改,您可以隔离对共享资源的访问,从而提高可伸缩性。 消息传递的优点是它将同步绑定到数据,而不是绑定到外部的同步对象。 这简化了组件之间的数据传输,并可以消除应用程序中的编程错误。

何时使用代理库

当您有多个必须彼此异步通信的操作时,使用代理库。 使用消息块和消息传递函数,无需同步机制(如锁)即可编写并行应用程序。 这使您可以将精力集中在应用程序逻辑上。

代理编程模型通常用于创建数据管道或网络。 数据管道是一系列组件,每个组件执行一个参与更大目标的特定任务。 数据流管道中的每个组件从另一个组件接收消息时执行工作。 该工作的结果传递到管道或网络中的其他组件。 这些组件可以使用其他库(如并行模式库 (PPL))中的更细化的并发功能。

示例

下面的示例实现本主题前面所示的图示。

// basic-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// This agent writes a string to its target and reads an integer
// from its source.
class agent1 : public agent 
{
public:
   explicit agent1(ISource<int>& source, ITarget<wstring>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Send the request.
      wstringstream ss;
      ss << L"agent1: sending request..." << endl;
      wcout << ss.str();

      send(_target, wstring(L"request"));

      // Read the response.
      int response = receive(_source);

      ss = wstringstream();
      ss << L"agent1: received '" << response << L"'." << endl;
      wcout << ss.str();

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<int>& _source;
   ITarget<wstring>& _target;
};

// This agent reads a string to its source and then writes an integer
// to its target.
class agent2 : public agent 
{
public:
   explicit agent2(ISource<wstring>& source, ITarget<int>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Read the request.
      wstring request = receive(_source);

      wstringstream ss;
      ss << L"agent2: received '" << request << L"'." << endl;
      wcout << ss.str();

      // Send the response.
      ss = wstringstream();
      ss << L"agent2: sending response..." << endl;
      wcout << ss.str();

      send(_target, 42);

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<wstring>& _source;
   ITarget<int>& _target;
};

int wmain()
{
   // Step 1: Create two message buffers to serve as communication channels
   // between the agents.

   // The first agent writes messages to this buffer; the second
   // agents reads messages from this buffer.
   unbounded_buffer<wstring> buffer1;

   // The first agent reads messages from this buffer; the second
   // agents writes messages to this buffer.
   overwrite_buffer<int> buffer2;

   // Step 2: Create the agents.
   agent1 first_agent(buffer2, buffer1);
   agent2 second_agent(buffer1, buffer2);

   // Step 3: Start the agents. The runtime calls the run method on
   // each agent.
   first_agent.start();
   second_agent.start();

   // Step 4: Wait for both agents to finish.
   agent::wait(&first_agent);
   agent::wait(&second_agent);
}

该示例产生下面的输出:

agent1: sending request...
agent2: received 'request'.
agent2: sending response...
agent1: received '42'.

下面的主题描述该示例中使用的功能。

相关主题