共用方式為


非同步代理程式程式庫中的最佳作法

本文件說明非同步代理程式程式庫的有效用法。 代理程式程式庫可提升以動作項目為基礎的程式撰寫模型,以及針對粗略資料流程和管線工作的同處理序訊息傳遞。

如需代理程式程式庫的詳細資訊,請參閱非同步代理程式程式庫

章節

本文件包括下列章節:

  • 使用代理程式隔離狀態

  • 使用節流機制來限制資料管線中的訊息數目

  • 不要在資料管線中執行精細工作

  • 不要以傳值方式傳遞大型訊息裝載

  • 當擁有權未定義時,在資料網路中使用 shared_ptr

使用代理程式隔離狀態

代理程式程式庫可讓您透過非同步訊息傳遞機制來連接隔離的元件,以共用狀態以外的形式作業。 非同步代理程式在將其內部狀態與其他元件隔離時最有效。 藉由隔離狀態,多個元件通常不會作用於共用資料。 狀態隔離可讓您的應用程式延展,因為它會減少共用記憶體爭用。 狀態隔離也會減少死結和競爭條件的機率,因為元件不必同步處理對共用資料的存取。

您通常藉由將資料成員保存在代理程式類別的 private 或 protected 區段中,或藉由使用訊息緩衝區溝通狀態變更,在代理程式中隔離狀態。 下列範例示範衍生自 Concurrency::agent 類別的 basic_agent 類別。 basic_agent 類別使用兩個訊息緩衝區,與外部元件通訊。 一個訊息緩衝區保存傳入訊息,另一個訊息緩衝區則保存外寄訊息。

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

如需如何定義及使用代理程式的完整範例,請參閱逐步解說:建立代理程式架構應用程式逐步解說:建立資料流程代理程式

回到頁首

使用節流機制來限制資料管線中的訊息數目

許多訊息緩衝區類型 (例如 Concurrency::unbounded_buffer) 都會保存不限數目的訊息。 當訊息生產者傳送訊息至資料管線的速度比消費者處理這些速度更快時,應用程式可能會進入記憶體不足或記憶體用完狀態。 您可以使用節流機制 (例如信號),限制資料管線中同時使用的訊息數目。

在下列基本範例中,會示範如何使用信號來限制資料管線中的訊息數目。 資料管線使用 Concurrency::wait 函式,模擬所需時間至少為 100 毫秒的作業。 因為傳送者產生訊息的速度比消費者處理這些訊息更快,所以這個範例定義了 semaphore 類別,讓應用程式限制使用中訊息的數目。

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity);

   // Acquires access to the semaphore.
   void acquire();

   // Releases access to the semaphore.
   void release();

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L);

   // Decrements the event counter.
   void signal();

   // Increments the event counter.
   void add_count();

   // Blocks the current context until the event is set.
   void wait();

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};


int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();   
}

//
// semaphore class implementation.
//

semaphore::semaphore(LONG capacity)
   : _semaphore_count(capacity)
{
}

// Acquires access to the semaphore.
void semaphore::acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (InterlockedDecrement(&_semaphore_count) < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}

// Releases access to the semaphore.
void semaphore::release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (InterlockedIncrement(&_semaphore_count) <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      if (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

//
// countdown_event class implementation.
//

countdown_event::countdown_event(unsigned int count)
   : _current(static_cast<long>(count)) 
{
   // Set the event if the initial count is zero.
   if (_current == 0L)
      _event.set();
}

// Decrements the event counter.
void countdown_event::signal() {
   if(InterlockedDecrement(&_current) == 0L) {
      _event.set();
   }
}

// Increments the event counter.
void countdown_event::add_count() {
   if(InterlockedIncrement(&_current) == 1L) {
      _event.reset();
   }
}

// Blocks the current context until the event is set.
void countdown_event::wait() {
   _event.wait();
}

這個範例 (Example) 會產生下列範例 (Sample) 輸出:

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

semaphore 物件會限制管線每次最多處理兩則訊息。

在這個範例中,生產者會將相當少的訊息傳送給消費者。 因此,這個範例不會示範潛在的記憶體不足或記憶體用完狀況。 不過,當資料管線包含相當大量的訊息時,這個機制很實用。

如需如何建立這個範例中所用信號類別的詳細資訊,請參閱 HOW TO:使用內容類別實作合作式信號

回到頁首

不要在資料管線中執行精細工作

當資料管線所執行的工作相當粗略時,代理程式程式庫最實用。 例如,某個應用程式元件可能從檔案或網路連接中讀取資料,並偶爾將該資料傳送至另一個元件。 代理程式程式庫用來傳播訊息的通訊協定會導致訊息傳遞機制的額外負荷大於平行模式程式庫 (PPL) 所提供的工作平行建構。 因此,請確定資料管線所執行的工作所需時間夠長,足以抵銷此額外負荷。

雖然資料管線在其工作是粗略時最有效,但資料管線的每個階段都可以使用 PPL 建構,例如工作群組和平行演算法,來執行更精細的工作。 如需在每個處理階段使用精細平行處理原則之粗略資料網路的範例,請參閱逐步解說:建立影像處理網路

回到頁首

不要以傳值方式傳遞大型訊息裝載

在某些情況下,執行階段會針對它從某個訊息緩衝區傳遞至另一個訊息緩衝區的每則訊息建立複本。 例如,Concurrency::overwrite_buffer 類別會針對它所收到的每則訊息,將複本提供給每個目標。 當您使用訊息傳遞函式,例如 Concurrency::sendConcurrency::receive,對訊息緩衝區寫入訊息及讀取訊息時,執行階段也會建立訊息資料的複本。 雖然這個機制有助於排除同時寫入共用資料的風險,但是當訊息裝載相當大時,它可能會導致記憶體效能降低。

當您傳遞大型裝載的訊息時,可以使用指標或參考改善記憶體效能。 下列範例比較兩種作法:以傳值方式傳遞大型訊息,以及傳遞相同訊息類型的指標。 範例定義兩個作用於 message_data 物件的代理程式類型 producerconsumer。 範例比較生產者傳遞數個 message_data 物件至消費者的所需時間與生產者代理程式傳送數個 message_data 物件指標至消費者的所需時間。

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

這個範例 (Example) 會產生下列範例 (Sample) 輸出:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

使用指標的版本效能較佳,因為它不需要執行階段針對它從生產者傳遞至消費者的每個 message_data 物件建立完整複本。

回到頁首

當擁有權未定義時,在資料網路中使用 shared_ptr

當您透過訊息傳遞管線或網路以指標方式傳送訊息時,通常會在網路前端配置每則訊息的記憶體,而在網路末端釋放該記憶體。 雖然這個機制通常運作良好,但在某些情況下難以使用或無法使用。 例如,當資料網路包含多個結束節點的情況。 在此情況下,沒有清楚位置可釋放訊息的記憶體。

若要解決此問題,您可以使用讓多個元件擁有一個指標的機制,例如 std::shared_ptr。 當擁有資源的最終 shared_ptr 物件終結時,同時也會釋放資源。

下列範例示範如何使用 shared_ptr,在多個訊息緩衝區之間共用指標值。 範例將 Concurrency::overwrite_buffer 物件連接至三個 Concurrency::call 物件。 overwrite_buffer 類別會將訊息提供給每個目標。 因為在資料網路末端有多個資料擁有者,所以這個範例使用 shared_ptr 讓每個 call 物件共用訊息擁有權。

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

這個範例 (Example) 會產生下列範例 (Sample) 輸出:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

請參閱

工作

逐步解說:建立代理程式架構應用程式

概念

並行執行階段最佳作法

非同步代理程式程式庫

其他資源

逐步解說:建立資料流程代理程式

逐步解說:建立影像處理網路

平行模式程式庫中的最佳作法

並行執行階段中的一般最佳作法