チュートリアル: データフロー エージェントの作成

ここでは、制御フローではなくデータ フローに基づくエージェント ベースのアプリケーションを作成する方法について説明します。

制御フローとは、プログラムでの操作の実行順序のことです。 制御フローは、条件付きステートメントやループなどの制御構造体を使用して調整されます。 一方、データ フローとは、必要なすべてのデータが使用可能な場合にのみ計算が行われるプログラミング モデルのことです。 データ フロー プログラミング モデルは、プログラムの独立したコンポーネントどうしがメッセージを送信することによって通信する、メッセージ パッシングの概念に関連しています。

非同期エージェントでは、制御フロー モデルとデータ フロー プログラミング モデルの両方がサポートされています。 制御フロー モデルはほとんどの状況に適していますが、エージェントがデータを受信し、そのデータのペイロードに基づいてアクションを実行するなどの状況ではデータ フロー モデルの方が適しています。

必須コンポーネント

このチュートリアルを開始する前に、次のドキュメントを参照してください。

セクション

このチュートリアルは、次のセクションで構成されています。

  • 基本的な制御フロー エージェントの作成

  • 基本的なデータ フロー エージェントの作成

  • メッセージ ログ エージェントの作成

基本的な制御フロー エージェントの作成

control_flow_agent クラスを定義する次の例について考えます。 control_flow_agent クラスは、1 つの入力バッファーと 2 つの出力バッファーの合計 3 つのメッセージ バッファーに対して作用します。 run メソッドは、ループ内でソース メッセージ バッファーから読み取り、条件付きステートメントを使用してプログラムの実行フローを調整します。 エージェントは、ゼロ以外の負の値に対してカウンターの 1 つをインクリメントし、ゼロ以外の正の値に対して別のカウンターをインクリメントします。 エージェントは、sentinel 値のゼロを受信すると、カウンターの値を出力メッセージ バッファーに送信します。 negatives メソッドと positives メソッドは、アプリケーションがエージェントから負の値と正の値のカウントを読み取ることができるようにします。

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

この例では、エージェントで制御フローを使用する基本的な方法しか示していませんが、制御フロー ベースのプログラミングの逐次的性質は明らかです。 入力メッセージ バッファーに複数のメッセージが存在していても、各メッセージは逐次的に処理される必要があります。 データ フロー モデルでは、条件付きステートメントの両方の分岐を同時に評価できます。 データ フロー モデルでは、データが使用可能になったときにそのデータに作用する複雑なメッセージング ネットワークを作成することもできます。

[ページのトップへ]

基本的なデータ フロー エージェントの作成

ここでは、control_flow_agent クラスからデータ フロー モデルに切り替えて、同じタスクを実行する方法について説明します。

データ フロー エージェントを使用するには、メッセージ バッファーのネットワークを作成し、それぞれのメッセージ バッファーが特定の用途で機能するようにします。 一部のメッセージ ブロックでは、フィルター関数を使用して、メッセージの受け入れまたは拒否をメッセージ ペイロードに基づいて行います。 フィルター関数を使用すると、メッセージ ブロックが特定の値のみを受信するようになります。

制御フロー エージェントからデータ フロー エージェントに切り替えるには

  1. control_flow_agent クラスの本体を別のクラス (dataflow_agent など) にコピーします。 代わりに、control_flow_agent クラスの名前を変更してもかまいません。

  2. receive を呼び出すループの本体を run メソッドから削除します。

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. run メソッド内の negative_count および positive_count 変数の初期化コードの後に、アクティブな操作のカウントを追跡する countdown_event オブジェクトを追加します。

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    countdown_event クラスについては、このトピックの後半で説明します。

  4. データ フロー ネットワークに参加するメッセージ バッファー オブジェクトを作成します。

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. メッセージ バッファーを接続してネットワークを形成します。

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. event オブジェクトと countdown event オブジェクトが設定されるまで待ちます。 これらのイベントは、エージェントが sentinel 値を受信したこと、およびすべての操作が終了したことを通知します。

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

次の図は、dataflow_agent クラスの完全なデータ フロー ネットワークを示しています。

データ フロー ネットワーク

ネットワークのメンバーを次の表に示します。

メンバー

説明

increment_active

アクティブなイベント カウンターをインクリメントし、入力値をネットワークの残りのメンバーに渡す Concurrency::transformer オブジェクト。

negatives, positives

数値のカウントをインクリメントし、アクティブなイベント カウンターをデクリメントする Concurrency::call オブジェクト。 各オブジェクトは、フィルターを使用して、負の数または正の数を受け入れます。

sentinel

sentinel 値のゼロのみを受け入れ、アクティブなイベント カウンターをデクリメントする Concurrency::call オブジェクト。

connector

ソース メッセージ バッファーを内部ネットワークに接続する Concurrency::unbounded_buffer オブジェクト。

run メソッドは別個のスレッドで呼び出されるため、ネットワークが完全に接続される前に、他のスレッドからネットワークにメッセージが送信されることがあります。 _source データ メンバーは、アプリケーションからエージェントに送信されるすべての入力をバッファーに格納する unbounded_buffer オブジェクトです。 ネットワークですべての入力メッセージが処理されるように、エージェントは最初にネットワークの内部ノードをリンクしてから、そのネットワークの始端 connector_source データ メンバーにリンクします。 これにより、ネットワークの形成中にメッセージが処理されることがなくなります。

この例のネットワークは制御フローではなくデータ フローに基づいているため、各入力値の処理が終了したこと、および sentinel ノードがその値を受信したことがエージェントに通知される必要があります。 この例では、countdown_event オブジェクトを使用して各入力値が処理されたことを通知し、Concurrency::event オブジェクトを使用して sentinel ノードがその値を受信したことを示しています。 countdown_event クラスは、event オブジェクトを使用して、カウンター値がゼロになったときに通知します。 データ フロー ネットワークのヘッド ノードは、値を受信するたびにカウンターをインクリメントします。 ネットワークの各ターミナル ノードは、入力値を処理した後、カウンターをデクリメントします。 エージェントは、データ フロー ネットワークを形成した後、sentinel ノードが event オブジェクトを設定し、カウンターがゼロになったことが countdown_event オブジェクトから通知されるまで待機します。

次の例では、control_flow_agentdataflow_agent、および countdown_event クラスを示します。 wmain 関数は、control_flow_agent オブジェクトと dataflow_agent オブジェクトを作成し、send_values 関数を使用して一連のランダム値をエージェントに送信します。

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

この例では、次のサンプル出力が生成されます。

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

コードのコンパイル

プログラム例をコピーし、Visual Studio プロジェクトに貼り付けるか、dataflow-agent.cpp という名前のファイルに貼り付け、Visual Studio 2010 のコマンド プロンプト ウィンドウで次のコマンドを実行します。

cl.exe /EHsc dataflow-agent.cpp

[ページのトップへ]

メッセージ ログ エージェントの作成

次の例では、dataflow_agent クラスに似た log_agent クラスを示します。 log_agent クラスは、ログ メッセージをファイルおよびコンソールに書き込む非同期ログ エージェントを実装します。 log_agent クラスは、アプリケーションがメッセージを情報、警告、またはエラーとして分類できるようにします。 また、アプリケーションが各ログ カテゴリをファイルとコンソールのどちらに書き込むか、または両方に書き込むかを指定できるようにします。 この例では、ファイルにすべてのログ メッセージが書き込まれ、コンソールにはエラー メッセージのみが書き込まれます。

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

この例では、次の出力がコンソールに書き込まれます。

error: This is a sample error message.

また、次のテキストを含む log.txt ファイルが生成されます。

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

コードのコンパイル

プログラム例をコピーし、Visual Studio プロジェクトに貼り付けるか、log-filter.cpp という名前のファイルに貼り付け、Visual Studio 2010 のコマンド プロンプト ウィンドウで次のコマンドを実行します。

cl.exe /EHsc log-filter.cpp

[ページのトップへ]

参照

その他の技術情報

同時実行ランタイムのチュートリアル