Passo a passo: Criando um bloco personalizado de mensagem
Este documento descreve como criar uma mensagem personalizado em blocos regra que mensagens de entrada por prioridade.
Embora os tipos internos do bloco de mensagem fornecer uma ampla variedade de funcionalidade, você pode criar sua própria mensagem em blocos e personalizá-lo para atender aos requisitos do seu aplicativo.Para obter uma descrição dos tipos internos do bloco de mensagem que são fornecidos pela biblioteca assíncrona de agentes, consulte Blocos assíncronas de mensagem.
Pré-requisitos
Ler os seguintes documentos antes de iniciar esta explicação passo a passo:
Seções
Essa explicação passo a passo contém as seções a seguir:
Criando um bloco personalizado de mensagem
Definindo a classe de priority_buffer
O exemplo completo
Criando um bloco personalizado de mensagem
Blocos de mensagem participam em pleno acto de enviar e receber mensagens de.Um bloco de mensagem que envia mensagens é conhecido como um bloco de origem.Um bloco de mensagem que receber mensagens é conhecido como um bloco de destino.Um bloco de mensagem que enviar e receber mensagens é conhecido como um bloco de propagator.A biblioteca de agentes usa a classe abstrata concurrency::ISource para representar blocos de origem e a classe abstrata concurrency::ITarget para representar blocos de destino.Tipos de bloco de mensagem que atuam como as fontes derivam de ISource; os tipos de bloco de mensagem que atuam como destinos derivam de ITarget.
Embora você possa derivar sua mensagem em blocos diretamente de ISource e de ITarget, a biblioteca de agentes define três classes base que executam grande parte da funcionalidade que são comuns a todos os tipos de bloco de mensagem, por exemplo, tratamento de erros e conectando-se a mensagem bloqueia juntos em uma simultaneidade- maneira segura.A classe deriva de concurrency::source_block de ISource e enviar mensagens a outros blocos.A classe deriva de concurrency::target_block de ITarget e recebe-se mensagens de outros blocos.A classe deriva de concurrency::propagator_block de ISource e de ITarget e enviar mensagens a outros blocos e receber mensagens de outros blocos.É recomendável usar essas três classes base para manipular detalhes de infraestrutura de modo que você possa se concentrar no comportamento do bloco de mensagem.
source_block, target_block, e as classes de propagator_block são os modelos que são parametrizadas de um tipo que gerencia as conexões, ou nos links, entre a fonte e blocos de destino e um tipo que gerencia como mensagens são processadas.A biblioteca de agentes define dois tipos que executam o gerenciamento, o concurrency::single_link_registry e o concurrency::multi_link_registryde link.A classe de single_link_registry permite que um bloco de mensagem a ser associado a uma fonte ou a um destino.A classe de multi_link_registry permite que um bloco de mensagem a ser associado a múltiplas fontes ou para destinos de múltiplos.A biblioteca de agentes define uma classe que executa o gerenciamento de mensagem, concurrency::ordered_message_processor.A classe de ordered_message_processor permite blocos de mensagens para processar mensagens na ordem em que o recebe.
Para compreender melhor como blocos de mensagem se referem a fontes e seus alvos, considere o seguinte exemplo.Este exemplo mostra a declaração de classe de concurrency::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
A classe deriva de transformer de propagator_block, e atua-se como consequência como um bloco de origem e como um bloco de destino.Aceita mensagens de tipo _Input e envia mensagens de tipo _Output.A classe de transformer especifica single_link_registry como o gerenciador de link para todos os blocos de destino e multi_link_registry como o gerenciador de link para quaisquer blocos de origem.Como consequência, um objeto de transformer pode ter até um destino e um número ilimitado de fontes.
Uma classe que deriva de source_block deve implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, e resume_propagation.Uma classe que deriva de target_block deve implementar o método de propagate_message e opcionalmente pode implementar o método de send_message .Derivar de propagator_block é funcional equivalente à derivação de source_block e de target_block.
O método de propagate_to_any_targets é chamado em tempo de execução processa a de forma assíncrona de forma síncrona ou todas as mensagens de entrada para fora e bolha quaisquer mensagens de saída.O método de accept_message é chamado por blocos de destino para aceitar mensagens.Muitos tipos de bloco de mensagem, como enviar mensagens de unbounded_buffer, somente o primeiro destino que deve receber o.Como consequência, transfere a propriedade da mensagem de destino.Outros tipos de bloco de mensagem, como concurrency::overwrite_buffermensagens, oferecem a cada um dos seus blocos de destino.Como consequência, overwrite_buffer cria uma cópia de mensagem para cada um dos seus alvos.
reserve_message, consume_message, release_message, os métodos e de resume_propagation permitem blocos de mensagem para participar da reserva de mensagem.Blocos de destino chama o método de reserve_message quando são oferecidos uma mensagem e têm permitir que a mensagem para uso posterior.Depois que um bloco de destino reserva uma mensagem, pode chamar o método de consume_message para consumir essa mensagem ou o método de release_message para cancelar a reserva.Como com o método de accept_message , a implementação de consume_message enlata a propriedade de transferência de mensagem ou retorna uma cópia de mensagem.Depois que um bloco de destino consome ou libera uma mensagem reservado, o tempo de execução chama o método de resume_propagation .Normalmente, esse método continua a bolha, começando de mensagem com a seguinte mensagem na fila.
O tempo de execução chama o método de propagate_message para transferir de forma assíncrona uma mensagem de outro bloco ao atual.O método de send_message é semelhante a propagate_message, exceto que forma síncrona, em vez de forma assíncrona, envia a mensagem aos blocos de destino.A implementação padrão de send_message descarta todas as mensagens de entrada.O tempo de execução não chama se nenhum desses métodos a mensagem não passa a função opcional de filtro que está associada com o bloco de destino.Para obter mais informações sobre os filtros de mensagem, consulte Blocos assíncronas de mensagem.
Superior[]
Definindo a classe de priority_buffer
A classe de priority_buffer é uma mensagem personalizado em blocos regra que mensagens de entrada primeiro por prioridade, e em seguida pela ordem em que as mensagens são recebidas.A classe de priority_buffer é semelhante à classe de concurrency::unbounded_buffer porque contém uma fila de mensagens, e também porque atua como fonte e um bloco de mensagem de destino e pode ter ambas as várias fontes e alvos de múltiplos.No entanto, unbounded_buffer baseia a bolha apenas de mensagem na ordem em que recebe mensagens das fontes.
A classe de priority_buffer receber mensagens de tipo std::tuple que contêm PriorityType e elementos de Type .PriorityType refere-se ao tipo que contém a prioridade de cada mensagem; Type refere-se a parte de dados de mensagem.A classe de priority_buffer envia mensagens de tipo Type.A classe de priority_buffer também gerencia duas filas de mensagens: um objeto de std::priority_queue para mensagens de entrada e std::queue objeto para mensagens de saída.Classificar mensagens por prioridade é útil quando um objeto de priority_buffer recebe mais mensagens simultaneamente ou quando receber vários mensagens antes que as mensagens sejam lidos pelos consumidores.
Além dos sete métodos que uma classe que deriva de propagator_block deve implementar, a classe de priority_buffer ela sobrescreve os métodos de link_target_notification e de send_message .A classe de priority_buffer também define dois métodos públicos auxiliar, enqueue e dequeue, e um método particular auxiliar, propagate_priority_order.
O procedimento a seguir descreve como implementar a classe de priority_buffer .
Para definir a classe de priority_buffer
Crie o arquivo de cabeçalho de c++ e denomine-o priority_buffer.h.Como alternativa, você pode usar um arquivo de cabeçalho existente que é parte do seu projeto.
Em priority_buffer.h, adicione o seguinte código.
#pragma once #include <agents.h> #include <queue>
No namespace de std , defina as especializações de std::less e de std::greater que atuam em objetos de concurrency::message .
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Os objetos de message dos armazenamentos da classe de priority_buffer em priority_queue objeto.Essas especializações do tipo permitem a fila de prioridade para classificar mensagens de acordo com sua prioridade.A prioridade é o primeiro elemento de objeto de tuple .
No namespace de concurrency , declare a classe de priority_buffer .
namespace concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
A classe deriva de priority_buffer de propagator_block.Como consequência, pode enviar e receber mensagens.A classe de priority_buffer pode ter vários destinos que recebem mensagens de tipo Type.Também pode ter várias fontes que envia mensagens de tipo tuple<PriorityType, Type>.
Na seção de private da classe de priority_buffer , adicione os seguintes variáveis de membro.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
O objeto de priority_queue contém mensagens de entrada; o objeto de queue contém mensagens de saída.Um objeto de priority_buffer pode receber mensagens mais simultaneamente; o objeto de critical_section sincroniza acesso à fila de mensagens de entrada.
Na seção de private , defina o construtor de impressão e o operador de atribuição.Isso impede que os objetos de priority_queue sejam assinalávéis.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Na seção de public , defina os construtores que são comuns a muitos tipos de bloco de mensagem.Também define o destrutor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Na seção de public , defina os métodos enqueue e dequeue.Esses métodos auxiliares fornecem uma maneira alternativa para enviar mensagens para e para receber mensagens de priority_buffer objeto.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Na seção de protected , defina o método de propagate_to_any_targets .
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
O método de propagate_to_any_targets transfere a mensagem que está na frente da fila de entrada a fila de saída e propaga para fora todas as mensagens na fila de saída.
Na seção de protected , defina o método de accept_message .
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Quando um bloco de destino chamar o método de accept_message , a propriedade da transferências da classe de priority_buffer de mensagem para o primeiro bloco de destino que aceita a.(Isso se assemelha ao comportamento de unbounded_buffer.)
Na seção de protected , defina o método de reserve_message .
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
A classe de priority_buffer permite um bloco de destino reservar uma mensagem quando o identificador de mensagens fornecido coincidir com o identificador de mensagem que está na frente da fila.Ou é um destino pode permitir a mensagem se o objeto de priority_buffer ainda não recebeu uma mensagem adicional e não propagou ainda para fora atual.
Na seção de protected , defina o método de consume_message .
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Um bloco de destino chama consume_message para transferir para a propriedade de mensagem que ele reservava.
Na seção de protected , defina o método de release_message .
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Um bloco de destino chama release_message para cancelar a sua reserva uma mensagem.
Na seção de protected , defina o método de resume_propagation .
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Chamadas resume_propagation de tempo de execução após um bloco de destino consome ou libera uma mensagem reservado.Este método propaga para fora todas as mensagens que estão na fila de saída.
Na seção de protected , defina o método de link_target_notification .
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
A variável de membro de _M_pReservedFor é definido pela classe base, source_block.Pontos variáveis desse membro para o bloco de destino, se houver, que está mantendo uma reserva a mensagem que está na frente da fila de saída.O tempo de execução chama link_target_notification quando um novo destino é associado ao objeto de priority_buffer .Este método propaga para fora todas as mensagens que estão na fila de saída se nenhum destino está mantendo uma reserva.
Na seção de private , defina o método de propagate_priority_order .
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Este método propaga para fora todas as mensagens de fila de saída.Cada mensagem na fila está oferecida a cada bloco alvo até que um dos blocos de destino aceita a mensagem.A classe de priority_buffer preserva a ordem das mensagens de saída.Como consequência, a primeira mensagem na fila de saída deve ser aceita por um bloco de destino antes que esse método oferece qualquer outra mensagem aos blocos de destino.
Na seção de protected , defina o método de propagate_message .
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
O método de propagate_message permite que a classe de priority_buffer para atuar como um receptor de mensagem, ou o destino.Este método recebe a mensagem que é oferecida pelo bloco e insere fornecidos pela origem da mensagem na fila de prioridade.O método de propagate_message em envia de forma assíncrona todas as mensagens de saída para os blocos de destino.
O tempo de execução chama esse método quando você chama a função de concurrency::asend ou quando o bloco de mensagens está conectado a outros blocos de mensagem.
Na seção de protected , defina o método de send_message .
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
O método de send_message é semelhante a propagate_message.No entanto envia mensagens de saída de forma síncrona em vez de forma assíncrona.
O tempo de execução chama esse método durante uma operação de enviar síncrono, como quando você chama a função de concurrency::send .
A classe de priority_buffer contém as sobrecargas de construtor que são típicos em muitos tipos de bloco de mensagem.Algumas sobrecargas de construtor têm concurrency::Scheduler ou objetos de concurrency::ScheduleGroup , que permitem que o bloco de mensagens a serem gerenciado por um agendador de tarefa específica.Outras sobrecargas de construtor têm uma função de filtro.Funções de filtro permitem blocos de mensagem para aceitar ou descartar uma mensagem com base na sua carga.Para obter mais informações sobre os filtros de mensagem, consulte Blocos assíncronas de mensagem.Para obter mais informações sobre agendadores de tarefas, consulte Agendador de tarefa (tempo de execução de simultaneidade).
Porque a classe de priority_buffer de mensagens por prioridade e em seguida pela ordem em que as mensagens são recebidas, essa classe é útil quando receber mensagens de forma assíncrona, por exemplo, quando você chama a função de concurrency::asend ou quando o bloco de mensagens está conectado à outra mensagem bloqueia.
Superior[]
O exemplo completo
O exemplo a seguir mostra a definição da classe completa de priority_buffer .
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
O exemplo execução simultaneamente um número de asend e operações de concurrency::receive em priority_buffer objeto.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Este exemplo produz a seguinte saída de exemplo.
A classe de priority_buffer de mensagens primeiro por prioridade e em seguida pela ordem em que recebe mensagens.Nesse exemplo, as mensagens com maior prioridade numérica são inseridas para a frente da fila.
Superior[]
Compilando o código
Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole a definição de classe de priority_buffer em um arquivo denominado priority_buffer.h e o programa de teste em um arquivo denominado priority_buffer.cpp e execute o seguinte comando em uma janela de prompt de comando do Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Consulte também
Conceitos
Blocos assíncronas de mensagem