Compartilhar via


Práticas recomendadas na Biblioteca de Agentes Assíncrona

Este documento descreve como fazer uso mais eficiente da biblioteca assíncrona de agentes. A biblioteca dos agentes promove uma programação baseada ator- modelo e a mensagem em processo que passa para o fluxo de dados de alta granularidade e que canaliza tarefas.

Para obter mais informações sobre a biblioteca de agentes, consulte Biblioteca de Agentes Assíncronos.

Seções

Este documento contém as seções a seguir:

  • Usar Agentes para Isolar Estado

  • Usar um Mecanismo de Limitação para Restringir o Número de Mensagens em Pipeline de Dados

  • Não Realizar Trabalho Granulado em um Pipeline de Dados

  • Não Passar Cargas de Mensagem Grandes por Valor

  • Usar shared_ptr em uma Rede de Dados Quando a Propriedade for Indefinida

Usar Agentes para Isolar Estado

A biblioteca dos agentes fornece alternativas para estado compartilhado deixando o conectar componentes isolados por meio de um mecanismo mensagem- passando assíncrona. Os agentes assíncronas são mais efetivos quando eles isolam seu estado interno de outros componentes. Isolando o estado, vários componentes não atuam normalmente em dados compartilhados. O isolamento do estado pode habilitar o aplicativo dimensionar como reduz a contenção na memória compartilhada. O isolamento de estado também reduz a possibilidade de deadlock e as situações de competição como os componentes não precisam sincronizar o acesso a dados compartilhados.

Você normalmente isola o estado em um agente com membros de dados nas seções de private ou de protected da classe do agent e usando buffers de mensagem para comunicar alterações de estado. O exemplo a seguir mostra a classe de basic_agent , que se deriva de concurrency::agent. A classe de basic_agent usa dois buffers de mensagem para se comunicar com outros componentes externos. Um buffer de mensagem mantém mensagens de entrada; o outro buffer de mensagem mantém mensagens de saída.

// basic-agent.cpp 
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate 
// with other components. 
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer. 
         int value = concurrency::receive(_input);

         // TODO: Do something with the value. 
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Para obter exemplos completos sobre como definir e usar os agentes, consulte Instruções passo a passo: criando um aplicativo com base no agente e Instruções passo a passo: criando um agente de fluxo de dados.

[Superior]

Usar um Mecanismo de Limitação para Restringir o Número de Mensagens em Pipeline de Dados

Muitos tipos retornadas buffer, como concurrency::unbounded_buffer, podem manter um número ilimitado de mensagens. Quando um produtor de mensagens envia mensagens a um pipeline de dados mais rápido que o consumidor pode processar essas mensagens, o aplicativo pode entrar em um estado de memória baixa ou de falta de memória. Você pode usar um mecanismo de estrangulamento, por exemplo, um sinal, para limitar o número de mensagens que estão ativos simultaneamente em um pipeline de dados.

O exemplo básico demonstra como usar um sinal para limitar o número de mensagens em um pipeline de dados. O pipeline de dados usa a função de concurrency::wait para simular uma operação que faça pelo menos 100 milissegundos. Como o remetente gerencia mensagens mais rápido do que o consumidor pode processar as mensagens, este exemplo define a classe de semaphore para habilitar o aplicativo limitar o número de mensagens ativa.

// message-throttling.cpp 
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics. 
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore. 
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count  
      // falls below zero. When this happens, add the current context to the  
      // back of the wait queue and block the current context. 
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore. 
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context. 
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not 
         // yet finished adding the context to the queue.  
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to  
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its  
// count reaches zero. 
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero. 
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter. 
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter. 
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set. 
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero. 
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer. 
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time. 
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest 
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion 
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages. 

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The second stage of the pipeline simulates a  
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The third stage of the pipeline releases the semaphore 
   // and signals to the main appliation that the message has 
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

O objeto de semaphore limita o pipeline para processar simultaneamente no máximo duas mensagens.

O produtor nesse exemplo envia poucas mensagens ao consumidor. Em virtude disso, este exemplo não demonstra uma memória baixa ou uma condição de memória insuficiente potencial. No entanto, esse mecanismo é útil quando um pipeline de dados contém um número de mensagens relativamente alto.

Para obter mais informações sobre como criar o sinal classe que é usado neste exemplo, consulte Como usar a classe de contexto para implementar um semáforo cooperativo.

[Superior]

Não Realizar Trabalho Granulado em um Pipeline de Dados

A biblioteca dos agentes é mais útil quando o trabalho que é executado por um pipeline de dados é razoavelmente de alta granularidade. Por exemplo, um componente de aplicativo pode ler dados de um arquivo ou uma conexão de rede e ocasionalmente um envio esses dados para outro componente. O protocolo que a biblioteca dos agentes usa para propagar causas de mensagens o mecanismo mensagem- passando ter mais sobrecarga do que as construções de paralela da tarefa que são fornecidas por A paralela da biblioteca (PPL). Consequentemente, certifique-se de que o trabalho executado por um pipeline de dados é por tempo suficiente deslocar essa sobrecarga.

Embora um pipeline de dados seja a mais efetivo quando suas tarefas são de alta granularidade, cada fase de pipeline de dados pode usar construções de PPL como grupos de trabalho e algoritmos paralelos para executar um trabalho mais refinado. Para obter um exemplo de uma rede de dados de alta granularidade que usa o paralelismo refinado em cada fase de processamento, consulte Instruções passo a passo: criando uma rede de processamento de imagem.

[Superior]

Não Passar Cargas de Mensagem Grandes por Valor

Em alguns casos, o tempo de execução cria uma cópia de cada mensagem que transmite de um buffer de mensagem para outro buffer de mensagem. Por exemplo, a classe de concurrency::overwrite_buffer oferece uma cópia de cada mensagem que recebe a cada um de seus destinos. O tempo de execução também cria uma cópia dos dados de mensagem quando você usa mensagem- passar funções como concurrency::send e concurrency::receive para gravar mensagens para e lê mensagens de um buffer de mensagem. Embora esse mecanismo ajuda para eliminar o risco simultaneamente de gravação a dados compartilhados, pode resultar em desempenho ruim da memória quando a carga útil da mensagem é relativamente grande.

Você pode usar ponteiros ou referências para melhorar o desempenho da memória quando você passa as mensagens que têm uma grande carga do. O exemplo a seguir compara transmitir mensagens grandes pelo valor a ser passado ponteiros ao mesmo tipo de mensagem. O exemplo define dois tipos, producer e consumerdo agent, que atuam em objetos de message_data . O exemplo a seguir compara o tempo necessário para que o produtor envia vários objetos de message_data ao consumidor a tempo necessário para que o agente do produtor envia vários ponteiros para os objetos de message_data ao consumidor.

// message-payloads.cpp 
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds  
// that it takes to call that function. 
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data. 
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values. 
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*. 
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values. 
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.  
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ... 

      // Release the memory for the message. 
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send. 
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents. 
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time. 
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Este exemplo gerencia a seguinte saída de exemplo:

  

A versão usando ponteiros executa melhor porque elimina o requisito para que o tempo de execução criar uma cópia completa de cada objeto de message_data que transmite do produtor ao consumidor.

[Superior]

Usar shared_ptr em uma Rede de Dados Quando a Propriedade for Indefinida

Quando você envia mensagens pelo ponteiro por meio de um pipeline ou de uma rede mensagem- passando, você normalmente aloca memória para cada mensagem em frente de rede e libera essa memória no final da rede. Embora esse mecanismo funcione bem frequentemente, há casos em que é difícil ou não possível usá-lo. Por exemplo, considere o caso em que a rede de dados contém vários nós de extremidade. Nesse caso, não haverá nenhum local claro para liberar a memória das mensagens.

Para resolver esse problema, você pode usar um mecanismo, por exemplo, std::shared_ptr, que permite que um ponteiro a ser de propriedade de vários componentes. Quando o objeto final de shared_ptr que possui um recurso é destruído, o recurso será liberado também.

O exemplo a seguir demonstra como usar shared_ptr para compartilhar valores de ponteiro entre vários buffers de mensagem. O exemplo conecta um objeto de concurrency::overwrite_buffer a três objetos de concurrency::call . A classe de overwrite_buffer oferece mensagens a cada um de seus destinos. Como há vários proprietários de dados no final da rede de dados, o usa shared_ptr este exemplo habilite cada objeto de call a propriedade de compartilhamento de mensagens.

// message-sharing.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource. 
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource. 
   int id() const { return _id; }

   // TODO: Add additional members here. 
private:
   // An identifier for the resource. 
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects 
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to  
   // receive its message.
   e.wait();
}

Este exemplo gerencia a seguinte saída de exemplo:

  

Consulte também

Tarefas

Instruções passo a passo: criando um aplicativo com base no agente

Instruções passo a passo: criando um agente de fluxo de dados

Instruções passo a passo: criando uma rede de processamento de imagem

Conceitos

Biblioteca de Agentes Assíncronos

Práticas recomendadas na Biblioteca de Padrões Paralelos

Práticas recomendadas gerais no Tempo de Execução de Simultaneidade

Outros recursos

Práticas recomendadas do Tempo de Execução de Simultaneidade