Aracılığıyla paylaş


Zaman uyumsuz aracıları Kitaplığı'nda en iyi yöntemler

Bu belge, zaman uyumsuz aracıları Kitaplığı etkin olarak kullanılmasını sağlamak açıklar. Aktör-tabanlı bir programlama modeli ve kilitten dataflow için geçirerek ve görevleri ardışık işlem dışı ileti aracıları kitaplığı yükseltir.

Aracılar kitaplığı hakkında daha fazla bilgi için bkz: Zaman uyumsuz aracıları kitaplığı.

Bölümler

Bu belge aşağıdaki bölümleri içerir:

  • Ayırma durumu aracıları kullanın

  • Azaltma mekanizması veri potansiyel ileti sayısını sınırlamak için kullanın

  • Bir veri potansiyel iyi ayarlanmış iş yapmayın

  • Büyük ileti yüklerini değerine göre iletmeyen

  • Shared_ptr bir veri ağ zaman sahipliği ise tanımsız kullanın

Ayırma durumu aracıları kullanın

Aracılar kitaplığı paylaşılan durum alternatifleri, zaman uyumsuz bir ileti geçirme mekanizması ile Yalıtılmış bileşenlere bağlamanıza izin vererek sağlar. Zaman uyumsuz aracıları en etkili yolu, bunlar kendi iç durumu diğer bileşenlerinden ayırma. Durumu yalıtarak, birden çok bileşeni genellikle paylaşılan veriler üzerinde çalışmayan. Durum yalıtım uygulama paylaşılan bellek çakışması azalttığı ölçeklemek etkinleştirebilirsiniz. Paylaşılan verilere erişimi eşitlemek bileşenleri olmadığı için durum yalıtım de kilitlenme ve yarış koşulları olasılığını azaltır.

Tipik olarak bir aracıya durumda veri üyeleri tutarak yalıtmak private veya protected Aracısı sınıfının ve durum değişiklikleri iletişim kurmak için ileti arabellekleri kullanarak bölümler. Aşağıdaki örnekte gösterildiği basic_agent türetildiği sınıfı concurrency::agent. basic_agent Sınıfı iki ileti arabellekleri dış bileşenleriyle iletişim kurmak için kullanır. Gelen iletileri bir ileti arabelleği tutar; Giden iletilere ileti arabelleği tutar.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Tanımlamak ve aracıları nasıl kullanılacağı hakkında tam örnek için bkz: İzlenecek yol: Agent tabanlı bir uygulama oluşturma ve İzlenecek yol: Dataflow Aracısı oluşturma.

Top

Azaltma mekanizması veri potansiyel ileti sayısını sınırlamak için kullanın

Çok sayıda ileti arabelleği türleri gibi concurrency::unbounded_buffer, iletileri sınırsız sayıda basılı tutabilirsiniz. Uygulama ileti producer iletileri veri potansiyel tüketici bu iletileri işlemek daha hızlı gönderdiğinde, düşük bellek veya bellek içi duruma girebilirsiniz. Örneğin, bir semafor bir azaltma mekanizması bir veri kanalı aynı anda etkin ileti sayısını sınırlamak için kullanabilirsiniz.

Aşağıdaki temel örnek bir semafor veri potansiyel ileti sayısını sınırlamak için nasıl kullanılacağını gösterir. Verileri kullanan kanal concurrency::wait en az 100 milisaniye olarak geçen bir işlemi benzetimini yapmak için işlevi. Gönderenden İletiler tüketici bu iletileri işlemek daha hızlı oluşturduğu için bu örnek tanımlar semaphore etkin ileti sayısını sınırlamak için uygulamayı etkinleştirmek için sınıf.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

semaphore Nesnesi, aynı anda en fazla iki iletileri işlemek için potansiyel sınırlar.

Bu örnekte producer tüketiciye nispeten daha az sayıda ileti gönderir. Bu nedenle, olası bir bellek içi ve düşük bellek durumu bu örnek göstermek değil. Görece yüksek sayıda ileti verileri ardışık düzen içerir ancak, bu mekanizma yararlıdır.

Bu örnekte kullanılan semafor sınıfı oluşturma hakkında daha fazla bilgi için bkz: Nasıl yapılır: işbirlikçi bir semafor uygulamak için içerik sınıfını kullanın.

Top

Bir veri potansiyel iyi ayarlanmış iş yapmayın

Veri kanalı tarafından gerçekleştirilen çalışmanın oldukça kilitten aracıları kitaplığı yararlıdır. Örneğin, bir uygulama bileşeni, bir dosya ya da bir ağ bağlantısı veri okuma ve bazen başka bir bileşen, verileri göndermek. İleti geçirme mekanizması tarafından sağlanan görev paralel yapıları daha fazla yüke sahip iletiler yaymak için aracıları kitaplığını kullanan protokol neden Paralel Desenler kitaplığının (ppl). Bu nedenle, veri kanalı tarafından gerçekleştirilen çalışmanın bu yükünü kaydırmak yeterince uzun olduğundan emin olun.

Görevlerinin kilitten olduğunda veri potansiyel en etkili olmakla birlikte, verileri ardışık her aşaması daha iyi ayarlanmış çalışma gerçekleştirmek için görev gruplarını ve paralel algoritmalar gibi ppl yapılar kullanabilirsiniz. Her işlem aşamasında iyi ayarlanmış paralellik kullanan bir kilitten veri ağına ilişkin bir örnek için bkz: İzlenecek yol: görüntü işleme ağ oluşturma.

Top

Büyük ileti yüklerini değerine göre iletmeyen

Bazı durumlarda, çalışma zamanı, ileti arabelleğinden başka bir ileti arabelleğe geçirmeden her iletinin bir kopyasını oluşturur. Örneğin, concurrency::overwrite_buffer sınıfı, her biri kendi hedefleri aldığı her iletinin bir kopyasını sunar. İleti geçirme işlevleri gibi kullandığınızda, çalışma zamanı da ileti verilerini bir kopyasını oluşturur concurrency::send ve concurrency::receive iletilerine yazmak ve ileti arabelleğinden iletileri okumak için. Bu mekanizma, aynı anda paylaşılan veri yazma riskini ortadan kaldırılmasına yardımcı olur, ancak ileti yükü nispeten daha büyük olduğunda düşük bellek performansını neden olabilir.

İşaretçiler kullanabilirsiniz veya iletileri geçirdiðinizde bellek performansını artırmak için başvurular büyük bir yük vardır. Aşağıdaki örnek passing büyük iletilerin aynı ileti türü işaretçilerinin geçirilmesi için değer ile karşılaştırır. Bu örnek iki Aracısı türünü tanımlayan producer ve consumer, davranma message_data nesneler. Örnek producer birkaç göndermek gerekli zamanı karşılaştıran message_data producer agent birkaç işaretçiler göndermek için gerekli olan zaman tüketiciye nesnelere message_data tüketici nesneler.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Bu örnek, aşağıdaki örnek çıktı üretir:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Tam bir kopyasını oluşturmak çalışma zamanı gereksinimini ortadan kaldırdığından işaretçiler kullanan sürüm daha iyi yapar her message_data , üreticiden tüketiciye geçen nesne.

Top

Shared_ptr bir veri ağ zaman sahipliği ise tanımsız kullanın

İşaretçiyi bir ileti geçirme potansiyel veya ağ üzerinden tarafından iletiler gönderdiğinizde, genellikle ağ önünde her ileti için bellek ayrılamadı ve ağ sonunda bu belleği serbest bırakmak. Bu mekanizma sık iyi çalışmasına rağmen zor ya da kullanmak mümkün değil olduğu durumlar vardır. Örneğin, verileri ağ birden çok uç düğüm içeren bir durumu düşünün. Bu durumda, iletiler için bellek boşaltmak için şifresiz konum yoktur.

Bu sorunu çözmek için bir mekanizma gibi kullanabileceğiniz std::shared_ptr, birden çok bileşeni gözüktüğü işaretçiyi etkinleştirir. Zaman en son shared_ptr bir kaynağın sahibi olan nesne bozulur, kaynak da serbest kalır.

Aşağıdaki örnek, nasıl kullanılacağını gösterir shared_ptr işaretçisi değerler arasında birden çok ileti arabellekleri paylaşmak için. Örneğin birbirine bağlayan bir concurrency::overwrite_buffer üç nesneye concurrency::call nesneler. overwrite_buffer İletiler hedeflerine her sınıf sunar. Birden çok veri veri ağ sonunda sahiplerini olduğundan, bu örnek kullanır shared_ptr her etkinleştirmek için call iletileri sahipliğini paylaşmak için nesne.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Bu örnek, aşağıdaki örnek çıktı üretir:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Ayrıca bkz.

Görevler

İzlenecek yol: Agent tabanlı bir uygulama oluşturma

İzlenecek yol: Dataflow Aracısı oluşturma

İzlenecek yol: görüntü işleme ağ oluşturma

Kavramlar

Zaman uyumsuz aracıları kitaplığı

Paralel desen Kitaplığı'nda en iyi yöntemler

Eşzamanlılık çalışma zamanında genel en iyi yöntemler

Diğer Kaynaklar

Eşzamanlılık çalışma zamanı en iyi yöntemler