Aracılığıyla paylaş


Zaman Uyumsuz Aracılar Kitaplığı'ndaki En İyi Yöntemler

Bu belgede, Zaman Uyumsuz Aracılar Kitaplığı'nın nasıl etkili bir şekilde kullanılacağı açıklanmaktadır. Aracılar Kitaplığı, kaba ayrıntılı veri akışı ve kanal oluşturma görevleri için aktör tabanlı bir programlama modelini ve işlem içi ileti geçirmeyi teşvik eder.

Aracılar Kitaplığı hakkında daha fazla bilgi için bkz . Zaman Uyumsuz Aracılar Kitaplığı.

Bölümler

Bu belgede aşağıdaki bölümler yer alır:

Durumu Yalıtmak için Aracıları Kullanma

Aracılar Kitaplığı, yalıtılmış bileşenleri zaman uyumsuz bir ileti geçirme mekanizması aracılığıyla bağlamanıza olanak sağlayarak paylaşılan duruma alternatifler sağlar. Zaman uyumsuz aracılar en çok iç durumlarını diğer bileşenlerden yalıttığında etkilidir. Durumu yalıtarak, birden çok bileşen genellikle paylaşılan veriler üzerinde işlem yapmaz. Durum yalıtımı, paylaşılan bellek üzerindeki çekişmeyi azalttığı için uygulamanızın ölçeklendirilmesini sağlayabilir. Durum yalıtımı, bileşenlerin paylaşılan verilere erişimi eşitlemesi gerekmediğinden kilitlenme ve yarış koşulları olasılığını da azaltır.

Genellikle aracı sınıfının veya protected bölümlerinde veri üyelerini private tutarak ve durum değişikliklerini iletmek için ileti arabelleklerini kullanarak bir aracıda durumu yalıtabilirsiniz. Aşağıdaki örnekte eşzamanlılık::agent öğesinden türetilen sınıfı gösterilmektedirbasic_agent. sınıfı, basic_agent dış bileşenlerle iletişim kurmak için iki ileti arabelleği kullanır. Bir ileti arabelleği gelen iletileri barındırıyor; diğer ileti arabelleği giden iletileri tutar.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Aracıları tanımlama ve kullanma hakkında tam örnekler için bkz . İzlenecek Yol: Aracı Tabanlı Uygulama Oluşturma ve İzlenecek Yol: Veri Akışı Aracısı Oluşturma.

[Üst]

Veri İşlem Hattındaki İleti Sayısını Sınırlamak için Azaltma Mekanizması Kullanma

Eşzamanlılık::unbounded_buffer gibi birçok ileti arabelleği türü sınırsız sayıda ileti barındırabilir. bir ileti üreticisi, tüketicinin bu iletileri işleyenenden daha hızlı bir şekilde veri işlem hattına ileti gönderdiğinde, uygulama yetersiz bellek veya yetersiz bellek durumu girebilir. Veri işlem hattında eşzamanlı olarak etkin olan iletilerin sayısını sınırlamak için bir azaltma mekanizması (örneğin, semafor) kullanabilirsiniz.

Aşağıdaki temel örnek, bir veri işlem hattındaki ileti sayısını sınırlamak için semafor kullanmayı gösterir. Veri işlem hattı, en az 100 milisaniye süren bir işlemin benzetimini yapmak için eşzamanlılık::wait işlevini kullanır. Gönderen, tüketicinin bu iletileri işleyebileceğinden daha hızlı ileti ürettiğinden, bu örnek uygulamanın etkin ileti sayısını sınırlamasını sağlamak için sınıfını tanımlar semaphore .

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

nesnesi, semaphore işlem hattını aynı anda en fazla iki iletiyi işlenmek üzere sınırlar.

Bu örnekteki üretici, tüketiciye görece az ileti gönderir. Bu nedenle, bu örnek olası bir yetersiz bellek veya yetersiz bellek koşulu göstermez. Ancak, bir veri işlem hattı görece yüksek sayıda ileti içerdiğinde bu mekanizma kullanışlıdır.

Bu örnekte kullanılan semafor sınıfını oluşturma hakkında daha fazla bilgi için bkz . Nasıl yapılır: İşbirlikçi Semafor Uygulamak için Bağlam Sınıfını Kullanma.

[Üst]

Veri İşlem Hattında Ayrıntılı Çalışma Gerçekleştirmeyin

Bir veri işlem hattı tarafından gerçekleştirilen çalışma oldukça kaba ayrıntılı olduğunda Aracılar Kitaplığı en kullanışlıdır. Örneğin, bir uygulama bileşeni bir dosyadan veya ağ bağlantısından verileri okuyabilir ve bazen bu verileri başka bir bileşene gönderebilir. Aracı kitaplığının iletileri yaymak için kullandığı protokol, ileti geçirme mekanizmasının Paralel Desenler Kitaplığı (PPL) tarafından sağlanan görev paralel yapılarından daha fazla ek yüke sahip olmasını sağlar. Bu nedenle, veri işlem hattı tarafından gerçekleştirilen çalışmanın bu ek yükü dengelemek için yeterince uzun olduğundan emin olun.

Veri işlem hattı, görevleri daha ayrıntılı olduğunda en etkili olsa da, veri işlem hattının her aşaması daha ayrıntılı işler gerçekleştirmek için görev grupları ve paralel algoritmalar gibi PPL yapılarını kullanabilir. Her işleme aşamasında ayrıntılı paralellik kullanan kaba taneli veri ağı örneği için bkz . İzlenecek Yol: Görüntü İşleme Ağı Oluşturma.

[Üst]

Büyük İleti Yüklerini Değere Göre Geçirme

Bazı durumlarda çalışma zamanı, bir ileti arabelleğinden başka bir ileti arabelleğine geçirdiği her iletinin bir kopyasını oluşturur. Örneğin eşzamanlılık::overwrite_buffer sınıfı, hedeflerinin her birine aldığı her iletinin bir kopyasını sunar. Çalışma zamanı, ileti arabelleğine ileti yazmak ve bu arabellekten ileti okumak için concurrency::send ve concurrency::receive gibi ileti geçirme işlevlerini kullandığınızda da ileti verilerinin bir kopyasını oluşturur. Bu mekanizma paylaşılan verilere eşzamanlı olarak yazma riskini ortadan kaldırmaya yardımcı olsa da, ileti yükü görece büyük olduğunda düşük bellek performansına neden olabilir.

Büyük bir yükü olan iletileri geçirirken bellek performansını geliştirmek için işaretçileri veya başvuruları kullanabilirsiniz. Aşağıdaki örnek, büyük iletileri değere göre geçirerek işaretçileri aynı ileti türüne geçirmeyi karşılaştırır. Örnek, producer nesneler üzerinde message_data hareket eden ve consumerolmak üzere iki aracı türünü tanımlar. Örnek, üreticinin tüketiciye birkaç message_data nesne göndermesi için gereken süreyi, üretici aracısının tüketicilere nesnelere birkaç işaretçi message_data göndermesi için gereken süreyle karşılaştırır.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Bu örnek aşağıdaki örnek çıkışı oluşturur:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

İşaretçileri kullanan sürüm, çalışma zamanının üreticiden tüketiciye geçirdiği her message_data nesnenin tam kopyasını oluşturma gereksinimini ortadan kaldırdığından daha iyi performans gösterir.

[Üst]

Sahiplik Tanımsız Olduğunda Veri Ağında shared_ptr Kullanma

İleti geçirme işlem hattı veya ağ üzerinden işaretçiyle ileti gönderdiğinizde, genellikle ağın önündeki her ileti için belleği ayırır ve bu belleği ağın sonunda serbest bırakırsınız. Bu mekanizma sıklıkla iyi çalışsa da, kullanmanın zor veya mümkün olmadığı durumlar vardır. Örneğin, veri ağının birden çok uç düğüm içerdiği durumu göz önünde bulundurun. Bu durumda, iletilerin belleğini boşaltmak için net bir konum yoktur.

Bu sorunu çözmek için, bir işaretçinin birden çok bileşene sahip olmasını sağlayan std::shared_ptr gibi bir mekanizma kullanabilirsiniz. Bir kaynağa sahip olan son shared_ptr nesne yok edildiğinde, kaynak da serbesttir.

Aşağıdaki örnekte, işaretçi değerlerini birden çok ileti arabelleği arasında paylaşmak için nasıl kullanılacağı shared_ptr gösterilmektedir. Örnek, bir eşzamanlılık::overwrite_buffer nesnesini üç eşzamanlılık::çağrı nesnesine bağlar. sınıfı overwrite_buffer , hedeflerinin her birine ileti sunar. Veri ağının sonunda birden çok veri sahibi olduğundan, bu örnek her call nesnenin iletilerin sahipliğini paylaşmasını sağlamak için kullanırshared_ptr.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Bu örnek aşağıdaki örnek çıkışı oluşturur:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Ayrıca bkz.

Eşzamanlılık Çalışma Zamanı En İyi Yöntemleri
Zaman Uyumsuz Aracılar Kitaplığı
İzlenecek Yol: Aracı Temelli Uygulama Oluşturma
İzlenecek Yol: Veri Akışı Aracısı Oluşturma
İzlenecek yol: Görüntü İşleme Ağı Oluşturma
Paralel Desen Kitaplığı'ndaki En İyi Yöntemler
Eşzamanlılık Çalışma Zamanındaki Genel En İyi Yöntemler