共用方式為


逐步解說:建立資料流程代理程式

本文件說明如何建立以資料流程 (而不是控制流程) 為基礎的代理程式架構應用程式。

「控制流程」(Control Flow) 是指程式中作業執行的順序。 控制流程是透過條件陳述式、迴圈等控制結構來規範。 另一方面,「資料流程」(Dataflow) 是指只在所有必要資料皆可用的情況下執行計算的程式撰寫模型。 資料流程程式撰寫模型與訊息傳遞的概念有關,在此模型中,程式的獨立元件可藉由訊息傳送相互通訊。

非同步代理程式支援控制流程和資料流程程式撰寫模型。 雖然控制流程模型適合許多情況,但在某些情況下,資料流程模型更適用,例如當代理程式接收資料並根據該資料的裝載來執行動作時。

必要條件

在您開始閱讀此逐步解說前,請先參閱下列文件:

章節

此逐步解說包含下列章節:

  • 建立基本的控制流程代理程式

  • 建立基本的資料流程代理程式

  • 建立訊息記錄代理程式

建立基本的控制流程代理程式

請考慮下列會定義 control_flow_agent 類別的範例。 control_flow_agent 類別作用於三個訊息緩衝區:一個輸入緩衝區和兩個輸出緩衝區。 run 方法會在迴圈中從來源訊息緩衝區讀取,並使用條件陳述式來導向程式執行流程。 如果是非零的負值,代理程式會遞增一個計數值,如果是非零的正值,則會遞增另一個計數器。 在代理程式收到零的 Sentinel 值之後,它會將計數器的值傳送至輸出訊息緩衝區。 negativespositives 方法可讓應用程式從代理程式讀取負值和正值的計數。

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

雖然這個範例在代理程式中使用控制流程的基本用法,但是它示範控制流程架構程式設計的序列本質。 每則訊息都必須循序處理,即使在輸入訊息緩衝區中有多個訊息也一樣。 資料流程模型可讓條件陳述式的分支同時評估。 資料流程模型也可讓您建立更複雜的訊息網路,在資料可用時作用於資料。

回到頁首

建立基本的資料流程代理程式

本節說明如何將 control_flow_agent 類別轉換為使用資料流程模型,執行相同的工作。

資料流程代理程式的運作方式是建立訊息緩衝區網路,每個訊息緩衝區都會做為特定用途。 某些訊息區塊使用篩選函式,根據訊息裝載來接受或拒絕訊息。 篩選函式可確保訊息區塊只接收特定值。

若要將控制流程代理程式轉換為資料流程代理程式

  1. control_flow_agent 類別的主體複製到另一個類別,例如 dataflow_agent。 或者,您可以重新命名 control_flow_agent 類別。

  2. run 方法中移除會呼叫 receive 的迴圈主體。

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. run 方法中,於變數 negative_countpositive_count 的初始化之後,加入 countdown_event 物件,追蹤使用中作業的計數。

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    本主題稍後會說明 countdown_event 類別。

  4. 建立會參與資料流程網路的訊息緩衝區物件。

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. 連接訊息緩衝區以形成網路。

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. 等候 eventcountdown event 物件設定。 這些事件表示代理程式已收到 Sentinel 值,而且所有作業都已完成。

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

下圖顯示 dataflow_agent 類別的完整資料流程網路:

資料流程網路

下表說明網路成員。

成員

描述

increment_active

Concurrency::transformer 物件,會遞增使用中事件計數器,並將輸入值傳遞給網路的其餘部分。

negatives, positives

Concurrency::call 物件,會遞增數字計數,並遞減使用中事件計數器。 每個這些物件都會使用篩選條件以接受負數或正數。

sentinel

Concurrency::call 物件,只會接受零的 Sentinel 值,並遞減使用中事件計數器。

connector

Concurrency::unbounded_buffer 物件,會將來源訊息緩衝區連接至內部網路。

因為 run 方法是在個別執行緒上呼叫的,所以其他執行緒可以在網路完全連接之前傳送訊息給網路。 _source 資料成員是 unbounded_buffer 物件,會緩衝從應用程式傳送至代理程式的所有輸入。 為了確定網路處理所有輸入訊息,代理程式會先連結網路的內部節點,然後將該網路前端 (connector) 連結至 _source 資料成員。 這會確保不會在網路形成過程中處理訊息。

因為在這個範例中網路是以資料流程 (而不是控制流程) 為基礎,網路必須向代理程式表示它已完成處理每個輸入值,以及 Sentinel 節點已收到其值。 這個範例使用 countdown_event 物件以表示所有輸入值都已處理,使用 Concurrency::event 物件以表示 Sentinel 節點收到其值。 countdown_event 類別使用 event 物件,在計數器值達到零時發出訊號。 每次資料流程網路前端收到值時,它會遞增計數器。 網路的每個終端節點在處理輸入值之後,它會遞減計數器。 在代理程式形成資料流程網路之後,它會等候 Sentinel 節點設定 event 物件,以及等候 countdown_event 物件表示其計數器已達到零。

下列範例顯示 control_flow_agentdataflow_agentcountdown_event 類別。 wmain 函式會建立 control_flow_agentdataflow_agent 物件,並使用 send_values 函式將一系列的隨機值傳送給代理程式。

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace Concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

這個範例 (Example) 會產生下列範例 (Sample) 輸出:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 dataflow-agent.cpp 的檔案中,然後在 Visual Studio 2010 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc dataflow-agent.cpp

回到頁首

建立訊息記錄代理程式

下列範例顯示 log_agent 類別,這個類別與 dataflow_agent 類別類似。 log_agent 類別實作非同步記錄代理程式,會將記錄訊息寫入檔案和主控台。 log_agent 類別可讓應用程式將訊息分類為資訊、警告或錯誤。 它也會讓應用程式指定每個記錄分類要寫入檔案、主控台或兩者。 這個範例會將所有記錄訊息寫入檔案,而且只將錯誤訊息寫入主控台。

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
   log_info    = 0x1,
   log_warning = 0x2,
   log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
   // Holds a message string and its logging type.
   struct log_message
   {
      wstring message;
      log_message_type type;
   };

public:
   log_agent(const wstring& file_path, log_message_type file_messages, 
      log_message_type console_messages)
      : _file(file_path)
      , _file_messages(file_messages)
      , _console_messages(console_messages)    
      , _active(0)
   {
      if (_file.bad())
         throw invalid_argument("Unable to open log file.");
   }

   // Writes the provided message to the log.
   void log(const wstring& message, log_message_type type)
   {  
      // Increment the active message count.
      _active.add_count();

      // Send the message to the network.
      log_message msg = { message, type };      
      send(_log_buffer, msg);
   }

   void close()
   {
      // Signal that the agent is now closed.
      _closed.set();
   }

protected:

   void run()
   {
      //
      // Create the members of the dataflow network.
      //

      // Offers messages to the file writer and the console writer.
      overwrite_buffer<log_message> connector;

      // Writes a log message to file.
      call<log_message> file_writer(
         [this](log_message msg) {
            // Write the message to the file.
            write_to_stream(msg, _file);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool {
            // Accept only message types that are to be written to file.
            return (msg.type & _file_messages) != 0;
         });

       // Writes a log message to the console.
      call<log_message> console_writer(
         [this](log_message msg) {
            // Write the message to the console.
            write_to_stream(msg, wcout);
            // Decrement the active counter.
            _active.signal();
         },
         [this](const log_message& msg) -> bool  {
            // Accept only message types that are to be written to file.
            return (msg.type & _console_messages) != 0;
         });

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&file_writer);
      connector.link_target(&console_writer);

      // Connect _log_buffer to the internal network to begin data flow.
      _log_buffer.link_target(&connector);

      // Wait for the closed event to be signaled.
      _closed.wait();

      // Wait for all messages to be processed.
      _active.wait();

      // Close the log file and flush the console.
      _file.close();
      wcout.flush();

      // Set the agent to the completed state.
      done();
   }

private:
   // Writes a logging message to the specified output stream.
   void write_to_stream(const log_message& msg, wostream& stream)
   {
      // Write the message to the stream.
      wstringstream ss;

      switch (msg.type)
      {
      case log_info:
         ss << L"info: ";
         break;
      case log_warning:
         ss << L"warning: ";
         break;
      case log_error:
         ss << L"error: ";
      }

      ss << msg.message << endl;
      stream << ss.str();
   }

private:   
   // The file stream to write messages to.
   wofstream _file;   

   // The log message types that are written to file.
   log_message_type _file_messages;

   // The log message types that are written to the console.
   log_message_type _console_messages;

   // The head of the network. Propagates logging messages
   // to the rest of the network.
   unbounded_buffer<log_message> _log_buffer;   

   // Counts the number of active messages in the network.
   countdown_event _active;

   // Signals that the agent has been closed.
   event _closed;
};

int wmain()
{
   // Union of all log message types.
   log_message_type log_all = 
      log_message_type(log_info | log_warning  | log_error);

   // Create a logging agent that writes all log messages to file and error 
   // messages to the console.
   log_agent logger(L"log.txt", log_all, log_error);

   // Start the agent.
   logger.start();

   // Log a few messages.

   logger.log(L"===Logging started.===", log_info);

   logger.log(L"This is a sample warning message.", log_warning);
   logger.log(L"This is a sample error message.", log_error);

   logger.log(L"===Logging finished.===", log_info);

   // Close the logger and wait for the agent to finish.
   logger.close();
   agent::wait(&logger);
}

這個範例會將下列輸出寫入主控台。

error: This is a sample error message.

這個範例也會產生 log.txt 檔案,包含下列文字。

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 log-filter.cpp 的檔案中,然後在 Visual Studio 2010 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc log-filter.cpp

回到頁首

請參閱

概念

並行執行階段逐步解說