Megosztás a következőn keresztül:


Ajánlott eljárások az aszinkron ügynökök könyvtárában

Ez a dokumentum az Aszinkron ügynökkódtár hatékony használatát ismerteti. Az Ügynökkódtár támogat egy aktoralapú programozási modellt és folyamati üzenettovábbítást durva szemcsés adatfolyam- és csővezetékes feladatokhoz.

Az Ügynöki könyvtárról további információt az Aszinkron ügynöki könyvtár-ban talál.

Szakaszok

Ez a dokumentum a következő szakaszokat tartalmazza:

Használjon ügynököket az állapot elkülönítésére

Az Ügynökök tára alternatív megoldásokat kínál a megosztott állapotok helyett azáltal, hogy lehetővé teszi izolált összetevők csatlakoztatását egy aszinkron üzenetátadási mechanizmuson keresztül. Az aszinkron ügynökök akkor a leghatékonyabbak, ha elkülönítik a belső állapotukat más összetevőktől. Az állapot elkülönítésével több összetevő általában nem működik a megosztott adatokon. Az állapotelkülönítés lehetővé teszi az alkalmazás skálázását, mivel csökkenti a megosztott memóriával kapcsolatos versengést. Az állapotelkülönítés emellett csökkenti a holtpont és a versenyfeltételek esélyét, mivel az összetevőknek nem kell szinkronizálni a megosztott adatokhoz való hozzáférést.

Az ügynök állapotának elkülönítése általában úgy történik, hogy az ügynökosztály adattagjait az private vagy a protected szakaszokban tartja, és az állapotváltozások közléséhez üzenetpuffereket használ. Az alábbi példa az basic_agent osztályt mutatja be, amely a concurrency::agent-ből származik. Az basic_agent osztály két üzenetpuffert használ a külső összetevőkkel való kommunikációhoz. Egy üzenetpuffer tárolja a bejövő üzeneteket; a másik üzenetpuffer a kimenő üzeneteket tárolja.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Az ügynökök definiálására és használatára vonatkozó teljes példákért tekintse meg az útmutatót: Agent-Based-alkalmazás létrehozása és útmutató: Adatfolyam-ügynök létrehozása.

[Felső]

Szabályozási mechanizmus használata az adatfolyamban lévő üzenetek számának korlátozásához

Számos üzenetpuffertípus, például egyidejűség::unbounded_buffer korlátlan számú üzenetet tartalmazhat. Ha egy üzenetkészítő gyorsabban küld üzeneteket egy adatfolyamnak, mint amennyit a fogyasztó feldolgozhat, az alkalmazás alacsony memória- vagy memóriahiányos állapotba léphet. Egy szabályozási mechanizmus, például egy szemafor használatával korlátozhatja az adatfolyamokban egyidejűleg aktív üzenetek számát.

Az alábbi alapvető példa bemutatja, hogyan használható szemafor az adatfolyamban lévő üzenetek számának korlátozására. Az adatfolyam az egyidejűség::wait függvény használatával szimulál egy legalább 100 ezredmásodpercet igénylő műveletet. Mivel a feladó gyorsabban állítja elő az üzeneteket, mint amennyit a fogyasztó feldolgozhat, ez a példa meghatározza azt az semaphore osztályt, amely lehetővé teszi az alkalmazás számára az aktív üzenetek számának korlátozását.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Az semaphore objektum legfeljebb két üzenet feldolgozására korlátozza a folyamatot.

A példában szereplő gyártó viszonylag kevés üzenetet küld a fogyasztónak. Ezért ez a példa nem mutat be egy esetleges alacsony memóriát vagy memóriakimerülést. Ez a mechanizmus azonban akkor hasznos, ha egy adatfolyam viszonylag nagy számú üzenetet tartalmaz.

Az ebben a példában használt semaphore-osztály létrehozásával kapcsolatos további információkért lásd: How to: Use the Context Class to Implement a Cooperative Semaphore.

[Felső]

Ne végezzen Fine-Grained munkát adatfolyamatban

Az Ügynökök Könyvtára akkor a leghasznosabb, ha az adatfolyam által végzett munka meglehetősen durva szemcsés. Előfordulhat például, hogy egy alkalmazásösszetevő adatokat olvas be egy fájlból vagy egy hálózati kapcsolatból, és időnként elküldi az adatokat egy másik összetevőnek. Az Ügynökök tára által az üzenetek propagálására használt protokoll miatt az üzenetátadási mechanizmus nagyobb többletterhelést okoz, mint a párhuzamos mintatár (PPL) által biztosított feladat-párhuzamos szerkezetek . Ezért győződjön meg arról, hogy az adatfolyam által végzett munka elég hosszú ahhoz, hogy ellensúlyozza ezt a többletterhelést.

Bár az adattovábbító csatorna akkor a leghatékonyabb, ha a tevékenységek durva szemcsés, az adattovábbító csatorna minden szakasza használhat PPL-szerkezeteket, például feladatcsoportokat és párhuzamos algoritmusokat a részletesebb munka elvégzéséhez. Az egyes feldolgozási szakaszokban részletes párhuzamosságot használó durva szemcsés adathálózatra példa : Útmutató: Image-Processing hálózat létrehozása.

[Felső]

Ne adja át a nagy üzenet hasznos adatait érték szerint

Bizonyos esetekben a futtatókörnyezet létrehozza az összes üzenet másolatát, amelyet az egyik üzenetpufferből egy másik üzenetpufferbe továbbít. Például az egyidejűség::overwrite_buffer osztály minden egyes üzenetről másolatot biztosít minden célpontjának. A futtatókörnyezet is létrehoz egy másolatot az üzenetadatokról, amikor üzenetátadási függvényeket használ , például egyidejűség::küldés és egyidejűség::fogadás üzenetek írásához és üzenetek olvasásához egy üzenetpufferből. Bár ez a mechanizmus segít kiküszöbölni a megosztott adatokba való egyidejű írás kockázatát, gyenge memóriateljesítményhez vezethet, ha az üzenet hasznos adattartalma viszonylag nagy.

A nagy hasznos adatmennyiségű üzenetek átadásakor mutatókkal vagy hivatkozásokkal javíthatja a memória teljesítményét. Az alábbi példa összehasonlítja a nagy üzenetek érték szerinti átadását ugyanazon üzenettípusú mutatók átadásával. A példa két ügynöktípust határoz meg, producer és consumer, amelyek message_data objektumokra hatnak. A példa összehasonlítja azt az időt, amely ahhoz szükséges, hogy a gyártó több message_data objektumot küldjön a fogyasztónak ahhoz az időhöz, amely ahhoz szükséges, hogy a gyártóügynök több mutatót küldjön az objektumoknak message_data a fogyasztónak.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Ez a példa a következő mintakimenetet hozza létre:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

A mutatókat használó verzió azért teljesít jobban, mert kiküszöböli a futtatókörnyezet azon követelményét message_data , hogy minden olyan objektum teljes másolatát hozza létre, amelyet a gyártótól a fogyasztónak továbbít.

[Felső]

Shared_ptr használata adathálózatban, ha a tulajdonjog nincs meghatározva

Amikor pointerrel üzeneteket küld üzenetátadási folyamaton vagy hálózaton keresztül, általában a hálózat elején lefoglalja az üzenetek memóriáját, és a hálózat végén felszabadítja azt. Bár ez a mechanizmus gyakran jól működik, vannak olyan esetek, amikor nehéz vagy nem lehet használni. Vegyük például azt az esetet, amikor az adathálózat több végpontot tartalmaz. Ebben az esetben nincs egyértelmű hely az üzenetek memóriájának felszabadítására.

A probléma megoldásához használhat olyan mechanizmust, például std::shared_ptr, amely lehetővé teszi, hogy egy mutató több összetevő tulajdonában legyen. Amikor az erőforrást birtokoló végső shared_ptr objektum megsemmisül, az erőforrás is felszabadul.

Az alábbi példa bemutatja, hogyan oszthat shared_ptr meg mutatóértékeket több üzenetpuffer között. A példa összekapcsol egy concurrency::overwrite_buffer objektumot három concurrency::call objektummal. Az overwrite_buffer osztály üzeneteket kínál az egyes céloknak. Mivel az adathálózat végén több tulajdonos is található, ez a példa shared_ptr lehetővé teszi, hogy minden call objektum megossza az üzenetek tulajdonjogát.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Ez a példa a következő mintakimenetet hozza létre:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Lásd még

A párhuzamos futtatási környezet legjobb gyakorlatai
Aszinkron ügynökök könyvtára
Útmutató: Agent-Based-alkalmazás létrehozása
Útmutató: Adatfolyam-ügynök létrehozása
Útmutató: Image-Processing-hálózat létrehozása
Ajánlott eljárások a párhuzamos minták könyvtárában
Általános ajánlott eljárások az egyidejűségi futtatókörnyezetben