다음을 통해 공유


방법: 다양한 공급자/소비자 패턴 구현

이 항목에서는 응용 프로그램에서 공급자/소비자 패턴을 구현하는 방법에 대해 설명합니다. 이 패턴에서 공급자는 메시지 블록에 메시지를 보내고 소비자는 해당 블록에서 메시지를 읽습니다.

이 항목에서는 두 가지 시나리오를 보여 줍니다. 첫 번째 시나리오에서 소비자는 공급자가 보내는 각 메시지를 받아야 합니다. 두 번째 시나리오에서 소비자는 주기적으로 데이터를 폴링하므로 각 메시지를 받을 필요가 없습니다.

이 항목에 나오는 두 예제에서는 모두 에이전트, 메시지 블록 및 메시지 전달 함수를 사용하여 공급자가 소비자에게 메시지를 전송합니다. 공급자 에이전트는 Concurrency::asend 함수를 사용하여 Concurrency::ITarget 개체에 메시지를 작성하고, 소비자 에이전트는 Concurrency::receive 함수를 사용하여 Concurrency::ISource 개체에서 메시지를 읽습니다. 두 에이전트 모두 처리의 끝을 조정하는 데 사용할 센티널 값을 포함합니다.

비동기 에이전트에 대한 자세한 내용은 비동기 에이전트를 참조하십시오. 메시지 블록 및 메시지 전달 함수에 대한 자세한 내용은 비동기 메시지 블록메시지 전달 함수를 참조하십시오.

예제

이 예제에서 공급자 에이전트는 일련의 숫자를 소비자 에이전트에게 보냅니다. 소비자는 이러한 각 숫자를 받아 평균 값을 계산합니다. 그러면 응용 프로그램에서 평균 값을 콘솔에 씁니다.

이 예제에서는 Concurrency::unbounded_buffer 개체를 사용하여 공급자가 메시지를 큐에 넣을 수 있도록 합니다. unbounded_buffer 클래스는 ITargetISource를 구현하여 공급자와 소비자가 공유 버퍼에 메시지를 보내거나 공유 버퍼에서 메시지를 받을 수 있도록 합니다. sendreceive 함수는 공급자에서 소비자로 데이터를 전파하는 작업을 조정합니다.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

이 예제의 결과는 다음과 같습니다.

The average is 50.

이 예제에서 공급자 에이전트는 일련의 주식 시세를 소비자 에이전트에게 보냅니다. 소비자 에이전트는 현재 시세를 주기적으로 읽고 콘솔에 출력합니다.

이 예제는 이전 예제와 유사하지만 Concurrency::overwrite_buffer 개체를 사용하여 공급자가 하나의 메시지를 소비자와 공유할 수 있도록 한다는 점이 다릅니다. 이전 예제와 같이 overwrite_buffer 클래스는 ITargetISource를 구현하여 공급자와 소비자가 공유 메시지 버퍼에서 동작할 수 있도록 합니다.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (quotes.begin(), quotes.end(), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         Concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         Concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

unbounded_buffer 개체와 달리 receive 함수는 overwrite_buffer 개체에서 메시지를 제거하지 않습니다. 공급자가 메시지를 덮어쓰기 전에 소비자가 해당 메시지 버퍼를 두 번 이상 읽으면 수신자는 매번 같은 메시지를 가져옵니다.

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트 또는 producer-consumer.cpp 파일에 붙여넣고 Visual Studio 2010 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc producer-consumer.cpp

참고 항목

개념

비동기 에이전트 라이브러리

비동기 에이전트

비동기 메시지 블록

메시지 전달 함수