다음을 통해 공유


비동기 에이전트 라이브러리의 유용한 정보

이 문서에서는 비동기 에이전트 라이브러리를 효과적으로 사용하는 방법에 대해 설명합니다. 에이전트 라이브러리는 정교하지 않은 데이터 흐름 및 파이프라인 작업을 위해 행위자 기반 프로그래밍 모델과 in-process 메시지 전달을 지원합니다.

에이전트 라이브러리에 대한 자세한 내용은 비동기 에이전트 라이브러리를 참조하십시오.

단원

이 문서에는 다음과 같은 단원이 포함되어 있습니다.

  • 에이전트를 사용하여 상태 격리

  • 스로틀 메커니즘을 사용하여 데이터 파이프라인의 메시지 수 제한

  • 데이터 파이프라인에서 세분화된 작업을 수행하지 않음

  • 큰 메시지 페이로드는 값으로 전달하지 않음

  • 소유권이 정의되어 있지 않을 경우 데이터 네트워크에서 shared_ptr 사용

에이전트를 사용하여 상태 격리

에이전트 라이브러리는 비동기 메시지 전달 메커니즘을 통해 격리된 구성 요소를 연결할 수 있도록 하여 공유 상태에 대한 대안을 제공합니다. 비동기 에이전트는 내부 상태를 다른 구성 요소에서 격리할 때 가장 효과적입니다. 상태를 격리하여 일반적으로 여러 구성 요소가 공유 데이터에서 동작하지 않습니다. 상태 격리를 통해 공유 메모리에 대한 경합이 줄어들기 때문에 상태 격리를 사용하면 응용 프로그램을 확장할 수 있습니다. 또한 상태 격리를 사용하면 구성 요소가 공유 데이터에 대한 액세스를 동기화할 필요가 없으므로 교착 및 경합 상태가 발생할 가능성이 줄어듭니다.

일반적으로 에이전트의 private 또는 protected 섹션에 데이터 멤버를 포함하고 메시지 버퍼를 통해 상태 변경을 전달하여 에이전트에서 상태를 격리합니다. 다음 예제에서는 Concurrency::agent에서 파생되는 basic_agent 클래스를 보여 줍니다. basic_agent 클래스는 두 개의 메시지 버퍼를 사용하여 외부 구성 요소와 통신합니다. 이 메시지 버퍼 중 하나는 들어오는 메시지를 포함하고 나머지 메시지 버퍼는 보내는 메시지를 포함합니다.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

에이전트를 정의하고 사용하는 방법에 대한 전체 예제는 연습: 에이전트 기반 응용 프로그램 만들기연습: 사용자 지정 데이터 흐름 에이전트 만들기를 참조하십시오.

[맨 위로 이동]

스로틀 메커니즘을 사용하여 데이터 파이프라인의 메시지 수 제한

Concurrency::unbounded_buffer와 같은 대부분의 메시지 버퍼 형식은 개수와 관계없이 메시지를 포함할 수 있습니다. 소비자가 메시지를 처리하는 속도보다 메시지 생산자가 메시지를 데이터 파이프라인에 보내는 속도가 빠를 경우 응용 프로그램이 메모리 부족 상태가 될 수 있습니다. 스로틀 메커니즘(예: 세마포)을 사용하여 데이터 파이프라인에서 동시에 활성 상태가 될 수 있는 메시지 수를 제한할 수 있습니다.

다음 기본 예제에서는 세마포를 사용하여 데이터 파이프라인의 메시지 수를 제한하는 방법을 보여 줍니다. 데이터 파이프라인은 Concurrency::wait 함수를 사용하여 최소 100초가 걸리는 작업을 시뮬레이션합니다. 소비자가 메시지를 처리할 수 있는 속도보다 전송자가 메시지를 생성하는 속도가 빠르기 때문에 이 예제에서는 응용 프로그램이 활성 메시지 수를 제한할 수 있도록 semaphore 클래스를 정의합니다.

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity);

   // Acquires access to the semaphore.
   void acquire();

   // Releases access to the semaphore.
   void release();

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L);

   // Decrements the event counter.
   void signal();

   // Increments the event counter.
   void add_count();

   // Blocks the current context until the event is set.
   void wait();

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};


int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();   
}

//
// semaphore class implementation.
//

semaphore::semaphore(LONG capacity)
   : _semaphore_count(capacity)
{
}

// Acquires access to the semaphore.
void semaphore::acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (InterlockedDecrement(&_semaphore_count) < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}

// Releases access to the semaphore.
void semaphore::release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (InterlockedIncrement(&_semaphore_count) <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      if (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

//
// countdown_event class implementation.
//

countdown_event::countdown_event(unsigned int count)
   : _current(static_cast<long>(count)) 
{
   // Set the event if the initial count is zero.
   if (_current == 0L)
      _event.set();
}

// Decrements the event counter.
void countdown_event::signal() {
   if(InterlockedDecrement(&_current) == 0L) {
      _event.set();
   }
}

// Increments the event counter.
void countdown_event::add_count() {
   if(InterlockedIncrement(&_current) == 1L) {
      _event.reset();
   }
}

// Blocks the current context until the event is set.
void countdown_event::wait() {
   _event.wait();
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

semaphore 개체는 파이프라인에서 동시에 최대 두 개의 메시지를 처리하도록 제한합니다.

이 예제에서 생산자는 비교적 적은 수의 메시지를 소비자에게 보냅니다. 따라서 이 예제에서는 발생할 수 있는 메모리 부족 상태를 보여 주지 않습니다. 그러나 데이터 파이프라인에 비교적 많은 수의 메시지가 포함되어 있는 경우 이 메커니즘이 유용합니다.

이 예제에서 사용하는 세마포 클래스를 만드는 방법에 대한 자세한 내용은 방법: 컨텍스트 클래스를 사용하여 공동 작업 세마포 구현을 참조하십시오.

[맨 위로 이동]

데이터 파이프라인에서 세분화된 작업을 수행하지 않음

에이전트 라이브러리는 데이터 파이프라인에서 수행하는 작업이 정교하지 않은 경우 가장 유용합니다. 예를 들어 하나의 응용 프로그램 구성 요소가 파일 또는 네트워크 연결에서 데이터를 읽어 다른 구성 요소에 보내는 경우가 있습니다. 에이전트 라이브러리에서 메시지를 전파하는 데 사용하는 프로토콜을 사용할 경우 메시지 전파 메커니즘이 PPL(병렬 패턴 라이브러리)에서 제공하는 작업 병렬 구문보다 많은 오버헤드를 포함하게 됩니다. 따라서 데이터 파이프라인에서 수행하는 작업이 이러한 오버헤드를 상쇄할 만큼 충분히 긴지 확인해야 합니다.

데이터 파이프라인은 해당 작업이 정교하지 않은 경우 가장 효과적이지만 데이터 파이프라인의 각 단계는 작업 그룹 및 병렬 알고리즘과 같은 PPL 구문을 사용하여 더 세분화된 작업을 수행할 수 있습니다. 각 처리 단계에서 세분화된 병렬 처리를 사용하는 정교하지 않은 데이터 네트워크의 예제를 보려면 연습: 이미지 처리 네트워크 만들기를 참조하십시오.

[맨 위로 이동]

큰 메시지 페이로드는 값으로 전달하지 않음

런타임에서 메시지 버퍼 간에 전달하는 모든 메시지의 복사본을 만드는 경우가 있습니다. 예를 들어 Concurrency::overwrite_buffer 클래스는 받는 모든 메시지의 복사본을 각 대상에 제공합니다. 런타임에서는 Concurrency::sendConcurrency::receive와 같은 메시지 전달 함수를 사용하여 메시지 버퍼에 메시지를 쓰거나 메시지 버퍼에서 메시지를 읽을 때도 메시지 데이터의 복사본을 만듭니다. 이 메커니즘을 사용하면 공유 데이터에 동시에 쓰는 위험 부담이 없어지지만 메시지 페이로드가 비교적 클 경우 메모리 성능이 저하될 수 있습니다.

페이로드가 큰 메시지를 전달할 때는 포인터 또는 참조를 사용하여 메모리 성능을 향상시킬 수 있습니다. 다음 예제에서는 큰 메시지를 값으로 전달하는 경우와 동일한 메시지 형식에 포인터를 전달하는 경우를 비교합니다. 이 예제에서는 message_data 개체에서 작동하는 producerconsumer라는 두 가지 에이전트 형식을 정의합니다. 그리고 이 예제에서는 생산자가 message_data 개체를 소비자에게 보내는 데 필요한 시간과 생산자 에이전트가 message_data 개체에 대한 여러 포인터를 소비자에게 보내는 데 필요한 시간을 비교합니다.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

포인터를 사용하는 경우 런타임에서 생산자로부터 소비자에게 전달하는 모든 message_data 개체의 전체 복사본을 만들 필요가 없기 때문에 포인터를 사용하는 버전이 더 성능이 뛰어납니다.

[맨 위로 이동]

소유권이 정의되어 있지 않을 경우 데이터 네트워크에서 shared_ptr 사용

메시지 전달 파이프라인 또는 네트워크를 통해 포인터를 사용하여 메시지를 보낼 때 일반적으로 각 메시지에 사용할 메모리를 네트워크의 앞에 할당하고 네트워크의 끝 부분에서 해당 메모리 공간을 확보합니다. 이 메커니즘은 대부분 제대로 작동하지만 이 메커니즘을 사용할 수 없거나 힘든 경우가 있습니다. 예를 들어 데이터 네트워크에 여러 끝 노드가 포함되어 있는 경우를 가정해 봅니다. 이 경우에는 메시지에 사용할 메모리를 확보할 확실한 위치가 없습니다.

이 문제를 해결하려면 하나의 포인터를 여러 구성 요소가 소유할 수 있도록 설정하는 std::shared_ptr과 같은 메커니즘을 사용할 수 있습니다. 리소스가 속한 최종 shared_ptr 개체를 삭제하면 리소스도 해제됩니다.

다음 예제에서는 shared_ptr을 사용하여 여러 메시지 버퍼 간에 포인터 값을 공유하는 방법을 보여 줍니다. 그리고 Concurrency::overwrite_buffer 개체를 세 개의 Concurrency::call 개체에 연결합니다. overwrite_buffer 클래스는 각 대상에 메시지를 제공합니다. 데이터 네트워크의 끝 부분에 데이터의 여러 소유자가 있으므로 이 예제에서는 shared_ptr을 사용하여 각 call 개체가 메시지의 소유권을 공유할 수 있도록 설정합니다.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

참고 항목

작업

연습: 에이전트 기반 응용 프로그램 만들기

개념

동시성 런타임 유용한 정보

비동기 에이전트 라이브러리

기타 리소스

연습: 사용자 지정 데이터 흐름 에이전트 만들기

연습: 이미지 처리 네트워크 만들기

병렬 패턴 라이브러리의 유용한 정보

동시성 런타임의 유용한 일반 정보