Udostępnij za pośrednictwem


Biblioteka agentów asynchronicznych — Najlepsze praktyki

W tym dokumencie opisano sposób efektywnego korzystania z biblioteki asynchronicznych agentów. Biblioteka agentów promuje model programowania oparty na aktorach i przekazywanie komunikatów w procesie dla grubszego przepływu danych i zadań potokowych.

Aby uzyskać więcej informacji na temat biblioteki agentów, zobacz Biblioteka asynchronicznych agentów.

Sekcje

Ten dokument zawiera następujące sekcje:

Używanie agentów do izolowania stanu

Biblioteka agentów udostępnia alternatywy dla stanu udostępnionego, umożliwiając łączenie izolowanych składników za pomocą asynchronicznego mechanizmu przekazywania komunikatów. Agenci asynchroniczny są najbardziej efektywni, gdy izolują swój stan wewnętrzny od innych składników. Stan izolowania polega na tym, że wiele składników zwykle nie działa na udostępnionych danych. Izolacja stanu może umożliwić skalowanie aplikacji, ponieważ zmniejsza rywalizację o pamięć udostępnioną. Izolacja stanu zmniejsza również prawdopodobieństwo zakleszczenia i warunków wyścigu, ponieważ składniki nie muszą synchronizować dostępu do udostępnionych danych.

Zazwyczaj stan w agencie jest izolowany przez przechowywanie składowych danych w private sekcjach klasy agenta lub protected przy użyciu komunikatów w celu komunikowania zmian stanu. W poniższym przykładzie przedstawiono klasę basic_agent , która pochodzi z współbieżności::agent. Klasa basic_agent używa dwóch komunikatów do komunikowania się ze składnikami zewnętrznymi. Jeden bufor komunikatów przechowuje komunikaty przychodzące; drugi bufor komunikatów przechowuje komunikaty wychodzące.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Aby zapoznać się z kompletnymi przykładami dotyczącymi sposobu definiowania i używania agentów, zobacz Przewodnik: tworzenie aplikacji opartej na agencie i przewodnik: tworzenie agenta przepływu danych.

[Top]

Używanie mechanizmu ograniczania przepustowości w celu ograniczenia liczby komunikatów w potoku danych

Wiele typów komunikatów, takich jak współbieżność::unbounded_buffer, może zawierać nieograniczoną liczbę komunikatów. Gdy producent komunikatów wysyła komunikaty do potoku danych szybciej niż odbiorca może przetworzyć te komunikaty, aplikacja może wprowadzić stan o niskiej ilości pamięci lub braku pamięci. Można użyć mechanizmu ograniczania, na przykład semafora, aby ograniczyć liczbę komunikatów, które są współbieżnie aktywne w potoku danych.

W poniższym przykładzie podstawowym pokazano, jak za pomocą semafora ograniczyć liczbę komunikatów w potoku danych. Potok danych używa funkcji concurrency::wait , aby zasymulować operację, która zajmuje co najmniej 100 milisekund. Ponieważ nadawca generuje komunikaty szybciej niż odbiorca może przetwarzać te komunikaty, w tym przykładzie zdefiniowano semaphore klasę umożliwiającą aplikacji ograniczenie liczby aktywnych komunikatów.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Obiekt semaphore ogranicza potok do przetwarzania co najwyżej dwóch komunikatów w tym samym czasie.

Producent w tym przykładzie wysyła stosunkowo niewiele komunikatów do konsumenta. W związku z tym w tym przykładzie nie przedstawiono potencjalnego stanu braku pamięci lub braku pamięci. Jednak ten mechanizm jest przydatny, gdy potok danych zawiera stosunkowo dużą liczbę komunikatów.

Aby uzyskać więcej informacji na temat tworzenia klasy semafora używanej w tym przykładzie, zobacz How to: Use the Context Class to Implement a Cooperative Semaphore (Jak używać klasy kontekstu do implementowania semafora kooperatywnego).

[Top]

Nie wykonuj szczegółowej pracy w potoku danych

Biblioteka agentów jest najbardziej przydatna, gdy praca wykonywana przez potok danych jest dość gruba. Na przykład jeden składnik aplikacji może odczytywać dane z pliku lub połączenia sieciowego i od czasu do czasu wysyłać te dane do innego składnika. Protokół używany przez bibliotekę agentów do propagowania komunikatów powoduje, że mechanizm przekazywania komunikatów ma większe obciążenie niż równoległe konstrukcje zadań udostępniane przez bibliotekę równoległych wzorców (PPL). W związku z tym upewnij się, że praca wykonywana przez potok danych jest wystarczająco długa, aby zrównoważyć to obciążenie.

Chociaż potok danych jest najbardziej skuteczny, gdy jego zadania są gruboziarniste, każdy etap potoku danych może używać konstrukcji PPL, takich jak grupy zadań i algorytmy równoległe do wykonywania bardziej szczegółowej pracy. Aby zapoznać się z przykładem grubszej sieci danych, która używa precyzyjnego równoległości na każdym etapie przetwarzania, zobacz Przewodnik: tworzenie sieci przetwarzania obrazów.

[Top]

Nie przekazuj dużych ładunków komunikatów według wartości

W niektórych przypadkach środowisko uruchomieniowe tworzy kopię każdego komunikatu, który przekazuje z jednego buforu komunikatów do innego buforu komunikatów. Na przykład klasa concurrency::overwrite_buffer oferuje kopię każdego komunikatu odbieranego do każdego z jego obiektów docelowych. Środowisko uruchomieniowe tworzy również kopię danych komunikatów podczas korzystania z funkcji przekazywania komunikatów, takich jak współbieżność::send i współbieżność::receive , aby zapisywać komunikaty w buforze komunikatów i odczytywać je. Mimo że ten mechanizm pomaga wyeliminować ryzyko współbieżnego zapisywania danych udostępnionych, może to prowadzić do niskiej wydajności pamięci, gdy ładunek komunikatu jest stosunkowo duży.

Możesz użyć wskaźników lub odwołań, aby zwiększyć wydajność pamięci podczas przekazywania komunikatów, które mają duży ładunek. Poniższy przykład porównuje przekazywanie dużych komunikatów według wartości do przekazywania wskaźników do tego samego typu komunikatu. W przykładzie zdefiniowano dwa typy agentów i producer consumer, które działają na message_data obiektach. W przykładzie porównaliśmy czas wymagany przez producenta do wysłania kilku message_data obiektów do konsumenta do czasu wymaganego przez agenta producenta do wysłania kilku wskaźników do message_data obiektów do konsumenta.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

W tym przykładzie są generowane następujące przykładowe dane wyjściowe:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Wersja używająca wskaźników działa lepiej, ponieważ eliminuje wymaganie, aby środowisko uruchomieniowe tworzyło pełną kopię każdego message_data obiektu, który przekazuje od producenta do odbiorcy.

[Top]

Używanie shared_ptr w sieci danych, gdy własność jest niezdefiniowana

W przypadku wysyłania komunikatów przez wskaźnik za pośrednictwem potoku lub sieci z przekazywaniem komunikatów pamięć jest zwykle przydzielana dla każdego komunikatu z przodu sieci i zwalniania tej pamięci na końcu sieci. Mimo że ten mechanizm często działa dobrze, istnieją przypadki, w których jest to trudne lub niemożliwe do użycia. Rozważmy na przykład przypadek, w którym sieć danych zawiera wiele węzłów końcowych. W takim przypadku nie ma jasnej lokalizacji, aby zwolnić pamięć dla komunikatów.

Aby rozwiązać ten problem, można użyć mechanizmu, na przykład std::shared_ptr, który umożliwia własnością wskaźnika przez wiele składników. Gdy ostatni shared_ptr obiekt, który jest właścicielem zasobu, zostanie również zwolniony.

W poniższym przykładzie pokazano, jak używać shared_ptr do udostępniania wartości wskaźnika między wieloma komunikatów. Przykład łączy obiekt concurrency::overwrite_buffer z trzema obiektami współbieżności::call . Klasa overwrite_buffer oferuje komunikaty do każdego z jego celów. Ponieważ na końcu sieci danych znajduje się wiele właścicieli danych, w tym przykładzie użyto shared_ptr polecenia , aby umożliwić każdemu call obiektowi współużytkowania własności komunikatów.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

W tym przykładzie są generowane następujące przykładowe dane wyjściowe:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Zobacz też

Środowisko uruchomieniowe współbieżności — najlepsze praktyki
Biblioteki agentów asynchronicznych
Przewodnik: tworzenie aplikacji opartej o agentów
Przewodnik: tworzenie agenta przepływu danych
Przewodnik: tworzenie sieci przetwarzania obrazów
Biblioteka wzorów równoległych — najlepsze praktyki
Środowisko uruchomieniowe współbieżności — najlepsze praktyki ogólne