Compartilhar via


Práticas recomendadas na biblioteca assíncrona de agentes

Este documento descreve como fazer uso eficiente de biblioteca de agentes assíncrono.A biblioteca de agentes promove uma programação ator- base no modelo e a mensagem em processo que passa para o fluxo de dados de alta granularidade e que canaliza tarefas.

Para obter mais informações sobre a biblioteca de agentes, consulte Biblioteca de agentes assíncrono.

Seções

Este documento contém as seções a seguir:

  • Use agentes para isolar o estado

  • Use um mecanismo de regulagem para limitar o número de mensagens em um canal de dados

  • Não faça o trabalho mais aguçado em um canal de dados

  • Não passar tanto grandes úteis de mensagens por valor

  • Use o shared_ptr em uma rede de dados quando a propriedade é indefinida

Use agentes para isolar o estado

A biblioteca de agentes forneça alternativas ao estado compartilhado deixando o conectar-se componentes isolados através de um mecanismo de passagem mensagem- assíncrono.Agentes assíncronos são os mais vigor quando isolam seu estado interno de outros componentes.Isolando o estado, vários componentes não atuam normalmente em dados compartilhados.Isolamento de estado pode ativar seu aplicativo para dimensionar porque reduz a conflito na memória compartilhado.Isolamento de estado também reduzem a possibilidade de bloqueio completa e as condições de corrida porque os componentes não precisam sincronizar acesso a dados compartilhados.

Normalmente você isolar o estado em um agente mantendo membros de dados nas seções de private ou de protected da classe do agente e usando buffers de mensagem para informar alterações de estado.O exemplo a seguir mostra a classe de basic_agent , que deriva de concurrency::agent.A classe de basic_agent usa dois buffers de mensagem para se comunicar com componentes externos.Um buffer de mensagem segura mensagens de entrada; o outro buffer de mensagem segura mensagens de saída.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Para exemplos completos sobre como definir e usar agentes, consulte Passo a passo: Criando um aplicativo Baseado Agente- e Passo a passo: Criando um agente do fluxo de dados.

Superior[]

Use um mecanismo de regulagem para limitar o número de mensagens em um canal de dados

Muitos tipos de mensagem- buffer, como concurrency::unbounded_buffer, podem armazenar um número ilimitado de mensagens.Quando um produtor de mensagem envia mensagens para um pipeline de dados mais rápido do que o consumidor pode processar essas mensagens, o aplicativo pode entrar em um estado de memória baixa ou de para fora de memória.Você pode usar um mecanismo de estreitamento, por exemplo, um semáforo, para limitar o número de mensagens que são ativos simultaneamente em um canal de dados.

O seguinte exemplo básicas demonstra como usar um semáforo para limitar o número de mensagens em um canal de dados.O pipeline de dados usa a função de concurrency::wait para simular uma operação que tem pelo menos 100 milissegundos.Porque o remetente gera mensagens mais rápido do que o consumidor pode processar as mensagens, este exemplo define a classe de semaphore para ativar o aplicativo limitar o número de mensagens ativos.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

O objeto de semaphore limita a pipeline para processar ao mesmo tempo no máximo duas mensagens.

O produtor neste exemplo envia relativamente poucas mensagens para o consumidor.Como consequência, este exemplo não demonstra uma memória baixa ou uma condição de memória suficientes em potencial.No entanto, esse mecanismo é útil quando um pipeline de dados contém um número de mensagens relativamente alto.

Para obter mais informações sobre como criar o semáforo classe que é usado neste exemplo, consulte Como: Use a classe de contexto para implementar um semáforo cooperativo.

Superior[]

Não faça o trabalho mais aguçado em um canal de dados

A biblioteca de agentes é útil quando o trabalho que é executado por um pipeline de dados é bastante de alta granularidade.Por exemplo, um componente do aplicativo pode ler dados de um arquivo ou uma conexão de rede e ocasionalmente um enviar os dados para outro componente.O protocolo que a biblioteca de agentes usa para se propagar causas de mensagens o mecanismo de passagem mensagem- ter mais sobrecarga do que as compilações de paralela a tarefa que são fornecidas por Paralela à biblioteca modelos PPL ().Como consequência, certifique-se de que o trabalho que é executado por um pipeline de dados é o suficiente para deslocar essa sobrecarga.

Embora um pipeline de dados é mais eficiente quando suas tarefas são de alta granularidade, cada estágio de pipeline de dados pode usar construções de PPL como grupos de trabalho e algoritmos paralelos para executar um trabalho mais mais aguçado.Para um exemplo de uma rede de dados de alta granularidade que usa o paralelismo mais aguçado em cada estágio de processamento, consulte Passo a passo: Criando uma rede Processamento de imagens.

Superior[]

Não passar tanto grandes úteis de mensagens por valor

Em alguns casos, o tempo de execução cria uma cópia de cada mensagem que passa de um buffer de mensagem para outro buffer de mensagem.Por exemplo, a classe de concurrency::overwrite_buffer oferece uma cópia de cada mensagem que receba a cada um dos seus alvos.O runtime também cria uma cópia dos dados de mensagem quando você usa mensagem- passar funções como concurrency::send e concurrency::receive para escrever mensagens a e ler mensagens de um buffer de mensagem.Embora esse mecanismo ajudasse para eliminar o risco de simultaneamente gravar dados compartilhados, pode levar a um desempenho ruim de memória quando a carga de mensagem é relativamente grande.

Você pode usar ponteiros ou referências para melhorar o desempenho de memória quando você passa as mensagens que têm uma grande carga.O exemplo a seguir compara passar grandes mensagens pelo valor para passar ponteiros para o mesmo tipo de mensagem.O exemplo a seguir define dois tipos, producer e consumerdo, que atuam em objetos de message_data .O exemplo compara o tempo necessário para que o produtor que vários objetos de message_data consumidor para o tempo necessário para que o agente de produtor que vários ponteiros para objetos de message_data para o consumidor.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Este exemplo produz a seguinte saída de exemplo:

  
  
  
  

A versão que usa ponteiros executa melhor porque elimina a necessidade para que o tempo de execução criar uma cópia total de cada objeto de message_data que passa de produtor para o consumidor.

Superior[]

Use o shared_ptr em uma rede de dados quando a propriedade é indefinida

Quando você envia mensagens pelo ponteiro através de um pipeline ou de uma rede mensagem- passando, você normalmente aloca memória para cada mensagem na frente de rede e libera a memória no final da rede.Embora esse mecanismo funciona bem frequentemente, há casos em que é difícil ou não possível usar o.Por exemplo, considere o caso em que a rede de dados contém vários nós de extremidade.Nesse caso, não haverá lugar certo para liberar memória para mensagens.

Para resolver esse problema, você pode usar um mecanismo, por exemplo, std::shared_ptr, que permite que um ponteiro para ser possuído por vários componentes.Quando o objeto final de shared_ptr que possui um recurso é destruído, o recurso é liberado também.

O exemplo seguinte demonstra como usar shared_ptr para compartilhar valores de ponteiro entre vários buffers de mensagem.O exemplo conecta um objeto de concurrency::overwrite_buffer a três objetos de concurrency::call .A classe oferece overwrite_buffer de mensagens para cada um dos seus alvos.Porque há vários proprietários de dados no final da rede de dados, usa shared_ptr este exemplo ativar cada objeto de call à propriedade de compartilhamento de mensagens.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Este exemplo produz a seguinte saída de exemplo:

  
  
  
  

Consulte também

Tarefas

Passo a passo: Criando um aplicativo Baseado Agente-

Passo a passo: Criando um agente do fluxo de dados

Passo a passo: Criando uma rede Processamento de imagens

Conceitos

Biblioteca de agentes assíncrono

As práticas recomendadas de paralela da biblioteca

Práticas recomendadas gerais em tempo de execução de concorrência

Outros recursos

Práticas recomendadas em tempo de execução de concorrência