Sdílet prostřednictvím


Osvědčené postupy v knihovně asynchronních agentů

Tento dokument popisuje, jak efektivně používat knihovnu asynchronních agentů. Knihovna agentů podporuje programovací model založený na objektech actor a předávání zpráv v procesu pro hrubé odstupňované toky dat a kanálování úloh.

Další informace o knihovně agentů najdete v tématu Knihovna asynchronních agentů.

Oddíly

Tento dokument obsahuje následující části:

Izolace stavu pomocí agentů

Knihovna agentů poskytuje alternativy ke sdílenému stavu tím, že umožňuje připojit izolované komponenty prostřednictvím asynchronního mechanismu předávání zpráv. Asynchronní agenti jsou nejúčinnější, když izolují svůj interní stav od jiných komponent. Izolováním stavu obvykle nečiní více komponent se sdílenými daty. Izolace stavu může vaší aplikaci umožnit škálování, protože snižuje kolize ve sdílené paměti. Izolace stavu také snižuje pravděpodobnost zablokování a časování podmínek, protože komponenty nemusí synchronizovat přístup ke sdíleným datům.

Obvykle izolujete stav v agentu tím, že v oddílech třídy agenta držíte datové členy privateprotected a pomocí vyrovnávacích pamětí zpráv ke komunikaci změn stavu. Následující příklad ukazuje basic_agent třídu, která je odvozena z concurrency::agent. Třída basic_agent používá ke komunikaci s externími komponentami dvě vyrovnávací paměti zpráv. Jedna vyrovnávací paměť zpráv obsahuje příchozí zprávy; druhá vyrovnávací paměť zprávy obsahuje odchozí zprávy.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Kompletní příklady definování a používání agentů najdete v tématu Návod: Vytvoření aplikace založené na agentech a návod: Vytvoření agenta toku dat.

[Nahoře]

Omezení počtu zpráv v datovém kanálu pomocí mechanismu omezování

Mnoho typů vyrovnávací paměti zpráv, jako je souběžnost::unbounded_buffer, může obsahovat neomezený počet zpráv. Když producent zpráv odesílá zprávy do datového kanálu rychleji, než může příjemce zpracovávat tyto zprávy, může aplikace zadat nedostatek paměti nebo nedostatek paměti. K omezení počtu zpráv, které jsou současně aktivní v datovém kanálu, můžete použít mechanismus omezování, například semafor.

Následující základní příklad ukazuje, jak pomocí semaphore omezit počet zpráv v datovém kanálu. Datový kanál používá funkci concurrency::wait k simulaci operace, která trvá aspoň 100 milisekund. Vzhledem k tomu, že odesílatel vytváří zprávy rychleji, než příjemce může tyto zprávy zpracovat, tento semaphore příklad definuje třídu, která aplikaci umožní omezit počet aktivních zpráv.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Objekt semaphore omezuje kanál tak, aby zpracovával maximálně dvě zprávy současně.

Producent v tomto příkladu odesílá příjemci relativně málo zpráv. Tento příklad proto neukazuje potenciální nedostatek paměti nebo nedostatek paměti. Tento mechanismus je ale užitečný v případě, že datový kanál obsahuje relativně velký počet zpráv.

Další informace o tom, jak vytvořit semaphore třídy, která se používá v tomto příkladu, naleznete v tématu How to: Use the Context Class to Implement a Cooperative Semaphore.

[Nahoře]

Neprovádějte jemně odstupňovanou práci v datovém kanálu

Knihovna agentů je nejužitečnější v případech, kdy je práce prováděná datovým kanálem poměrně hrubě odstupňovaná. Jedna komponenta aplikace může například číst data ze souboru nebo síťového připojení a příležitostně odesílat tato data do jiné komponenty. Protokol, který knihovna agentů používá k šíření zpráv, způsobuje, že mechanismus předávání zpráv má větší režii než paralelní konstrukce úloh, které poskytuje knihovna PPL (Parallel Patterns Library ). Proto se ujistěte, že práce prováděná datovým kanálem je dostatečně dlouhá, aby se tato režie vyrovnala.

I když je datový kanál nejúčinnější, když jsou jeho úlohy hrubě odstupňované, může každá fáze datového kanálu používat konstruktory PPL, jako jsou skupiny úloh a paralelní algoritmy, k provádění jemněji odstupňované práce. Příklad hrubě odstupňované datové sítě, která v každé fázi zpracování používá jemně odstupňovaný paralelismus, najdete v části Návod: Vytvoření sítě pro zpracování obrázků.

[Nahoře]

Nepředávejte velké datové části zpráv podle hodnoty

V některých případech modul runtime vytvoří kopii každé zprávy, kterou předává z jedné vyrovnávací paměti zprávy do jiné vyrovnávací paměti zprávy. Například souběžnost::overwrite_buffer třída nabízí kopii každé zprávy, kterou obdrží do každého cíle. Modul runtime také vytvoří kopii dat zprávy při použití funkcí předávání zpráv, jako je concurrency::send a concurrency::receive pro zápis zpráv do vyrovnávací paměti zprávy a čtení zpráv. I když tento mechanismus pomáhá eliminovat riziko souběžného zápisu do sdílených dat, mohlo by to vést k nízkému výkonu paměti, když je datová část zprávy relativně velká.

Ukazatele nebo odkazy můžete použít ke zlepšení výkonu paměti při předávání zpráv, které mají velkou datovou část. Následující příklad porovnává předávání velkých zpráv podle hodnoty a předávání ukazatelů na stejný typ zprávy. Příklad definuje dva typy producer agentů a consumer, které se chovají na message_data objekty. Příklad porovnává čas potřebný k tomu, aby producent odeslal uživateli několik message_data objektů do času, který je nutný pro agenta producenta, aby odeslal několik ukazatelů na message_data objekty příjemci.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Tento příklad vytvoří následující ukázkový výstup:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Verze, která používá ukazatele, funguje lépe, protože eliminuje požadavek, aby modul runtime vytvořil úplnou kopii každého message_data objektu, který předává od producenta příjemci.

[Nahoře]

Použití shared_ptr v datové síti při nedefinované vlastnictví

Když odesíláte zprávy ukazatelem přes kanál nebo síť s předáváním zpráv, obvykle přidělíte paměť pro každou zprávu na přední straně sítě a uvolníte tuto paměť na konci sítě. I když tento mechanismus často funguje dobře, existují případy, kdy je obtížné nebo není možné ho použít. Představte si například případ, kdy datová síť obsahuje více koncových uzlů. V tomto případě není k dispozici žádné jasné umístění pro uvolnění paměti pro zprávy.

K vyřešení tohoto problému můžete použít mechanismus, například std::shared_ptr, který umožňuje, aby ukazatel byl vlastněný více komponentami. Když dojde ke zničení konečného shared_ptr objektu, který vlastní prostředek, uvolní se také prostředek.

Následující příklad ukazuje použití shared_ptr ke sdílení hodnot ukazatele mezi více vyrovnávacích pamětí zpráv. Příklad připojí objekt concurrency::overwrite_buffer ke třem objektům concurrency::call . Třída overwrite_buffer nabízí zprávy jednotlivým cílům. Vzhledem k tomu, že na konci datové sítě existuje více vlastníků dat, tento příklad umožňuje shared_ptr každému call objektu sdílet vlastnictví zpráv.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Tento příklad vytvoří následující ukázkový výstup:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Viz také

Osvědčené postupy v Concurrency Runtime
Knihovna asynchronních agentů
Návod: Vytvoření aplikace založené na agentovi
Postupy: Vytvoření agenta toku dat
Návod: Vytvoření sítě pro zpracování obrázků
Osvědčené postupy v knihovně PPL (Parallel Patterns Library)
Obecné osvědčené postupy v Concurrency Runtime