Partilhar via


Passo a passo: Criando um bloco de mensagens personalizado

Este documento descreve como criar um tipo de bloco de mensagens personalizado que ordena as mensagens recebidas por prioridade.

Embora os tipos de bloco de mensagem internos forneçam uma ampla gama de funcionalidades, você pode criar seu próprio tipo de bloco de mensagem e personalizá-lo para atender aos requisitos do seu aplicativo. Para obter uma descrição dos tipos de bloco de mensagens internos fornecidos pela Biblioteca de Agentes Assíncronos, consulte Blocos de mensagens assíncronas.

Pré-requisitos

Leia os seguintes documentos antes de iniciar este passo a passo:

Secções

Este passo a passo contém as seguintes seções:

Projetando um bloco de mensagens personalizado

Os blocos de mensagens participam no ato de enviar e receber mensagens. Um bloco de mensagens que envia mensagens é conhecido como um bloco de origem. Um bloco de mensagens que recebe mensagens é conhecido como um bloco de destino. Um bloco de mensagens que envia e recebe mensagens é conhecido como bloco propagador. A Biblioteca de Agentes utiliza a classe abstrata concurrency::ISource para representar blocos de origem e a classe abstrata concurrency::ITarget para representar blocos de destino. Tipos de bloco de mensagem que atuam como fontes derivam de ISource; tipos de bloco de mensagem que atuam como destinos derivam de ITarget.

Embora possa derivar o tipo de bloco de mensagens diretamente de ISource e ITarget, a Agents Library define três classes base que executam grande parte da funcionalidade comum a todos os tipos de bloco de mensagens, por exemplo, manipulação de erros e conexão de blocos de mensagens de forma segura para a concorrência. A classe concurrency::source_block deriva de ISource e envia mensagens para outros blocos. A classe concurrency::target_block deriva de ITarget e recebe mensagens de outros blocos. A classe concurrency::propagator_block deriva de ISource e ITarget, envia mensagens para outros blocos e recebe mensagens de outros blocos. Recomendamos que você use essas três classes base para manipular detalhes de infraestrutura para que você possa se concentrar no comportamento do seu bloco de mensagens.

As classes source_block, target_block e propagator_block são modelos que são parametrizados por um tipo que gere as conexões ou links entre os blocos de origem e de destino e por um tipo que gere como as mensagens são processadas. A Biblioteca de Agentes define dois tipos que realizam a gestão de links, concorrência::single_link_registry e concorrência::multi_link_registry. A single_link_registry classe permite que um bloco de mensagem seja vinculado a uma fonte ou a um destino. A multi_link_registry classe permite que um bloco de mensagens seja vinculado a várias fontes ou vários destinos. A Biblioteca de Agentes define uma classe que realiza a gestão de mensagens, concorrência::ordered_message_processor. A ordered_message_processor classe permite que os blocos de mensagens processem mensagens na ordem em que as recebe.

Para entender melhor como os blocos de mensagens se relacionam com suas fontes e destinos, considere o exemplo a seguir. Este exemplo mostra a declaração da classe concurrency::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

A transformer classe deriva de propagator_block, e, portanto, atua como um bloco de origem e como um bloco de destino. Ele aceita mensagens do tipo _Input e envia mensagens do tipo _Output. A transformer classe especifica single_link_registry como o gerenciador de links para quaisquer blocos de destino e multi_link_registry como o gerenciador de links para quaisquer blocos de origem. Portanto, um transformer objeto pode ter até um destino e um número ilimitado de fontes.

Uma classe que deriva de source_block deve implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation. Uma classe que deriva de target_block deve implementar o método propagate_message e pode, opcionalmente, implementar o método send_message . Derivar de propagator_block é funcionalmente equivalente a derivar de ambos source_block e target_block.

O propagate_to_any_targets método é chamado pelo tempo de execução para processar de forma assíncrona ou síncrona todas as mensagens recebidas e propagar todas as mensagens enviadas. O accept_message método é chamado por blocos de destino para aceitar mensagens. Muitos tipos de bloco de mensagens, como unbounded_buffer, enviam mensagens apenas para o primeiro destino que as receberia. Portanto, ele transfere a propriedade da mensagem para o destino. Outros tipos de bloco de mensagens, como concorrência::overwrite_buffer, oferecem mensagens a cada um dos seus blocos de destino. Portanto, overwrite_buffer cria uma cópia da mensagem para cada um de seus destinos.

Os métodos reserve_message, consume_message, release_message e resume_propagation permitem que os blocos de mensagens participem da reserva de mensagens. Os blocos de destino chamam o reserve_message método quando lhes é oferecida uma mensagem e têm de reservar a mensagem para utilização posterior. Depois que um bloco de destino reserva uma mensagem, ele pode chamar o consume_message método para consumir essa mensagem ou o release_message método para cancelar a reserva. Tal como acontece com o accept_message método, a implementação de consume_message pode transferir a propriedade da mensagem ou retornar uma cópia da mensagem. Depois que um bloco de destino consome ou libera uma mensagem reservada, o tempo de execução chama o método resume_propagation. Normalmente, esse método continua a propagação de mensagens, começando com a próxima mensagem na fila.

O ambiente de execução chama o método propagate_message para transferir uma mensagem de outro bloco para o atual de forma assíncrona. O send_message método é semelhante ao propagate_message, exceto que ele de forma síncrona, em vez de assíncrona, envia a mensagem para os blocos de destino. A implementação padrão de send_message rejeita todas as mensagens recebidas. O runtime não chama nenhum destes métodos se a mensagem não passar esta função de filtro opcional associada ao bloco de destino. Para obter mais informações sobre filtros de mensagens, consulte Blocos de mensagens assíncronas.

[Topo]

Definindo a classe priority_buffer

A priority_buffer classe é um tipo de bloco de mensagens personalizado que ordena as mensagens recebidas primeiro por prioridade e, em seguida, pela ordem em que as mensagens são recebidas. A priority_buffer classe se assemelha à classe concurrency::unbounded_buffer porque contém uma fila de mensagens e também porque atua como um bloco de mensagens de origem e de destino e pode ter várias fontes e vários destinos. No entanto, unbounded_buffer baseia a propagação de mensagens apenas na ordem em que recebe mensagens de suas fontes.

A priority_buffer classe recebe mensagens do tipo std::tuple que contêm PriorityType elementos e Type . PriorityType refere-se ao tipo que detém a prioridade de cada mensagem; Type refere-se à parte de dados da mensagem. A priority_buffer classe envia mensagens do tipo Type. A priority_buffer classe também gerencia duas filas de mensagens: um objeto std::p riority_queue para mensagens de entrada e um objeto std::queue para mensagens de saída. Ordenar mensagens por prioridade é útil quando um priority_buffer objeto recebe várias mensagens simultaneamente ou quando recebe várias mensagens antes que qualquer mensagem seja lida pelos consumidores.

Além dos sete métodos que uma classe derivada de propagator_block deve implementar, priority_buffer classe também substitui os métodos link_target_notification e send_message. A priority_buffer classe também define dois métodos auxiliares públicos, enqueue e dequeue, e um método auxiliar privado, propagate_priority_order.

O procedimento a seguir descreve como implementar a priority_buffer classe.

Para definir a classe priority_buffer

  1. Crie um arquivo de cabeçalho C++ e nomeie-o priority_buffer.h. Como alternativa, você pode usar um arquivo de cabeçalho existente que faz parte do seu projeto.

  2. No priority_buffer.h, adicione o seguinte código.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. No std namespace, defina especializações de std::less e std::greater que atuam em objetos de concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    A priority_buffer classe armazena message objetos em um priority_queue objeto. Essas especializações de tipo permitem que a fila de prioridade classifique as mensagens de acordo com sua prioridade. A prioridade é o primeiro elemento do tuple objeto.

  4. concurrencyex No espaço de nomes, declare priority_buffer classe.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    A priority_buffer classe deriva de propagator_block. Portanto, ele pode enviar e receber mensagens. A priority_buffer classe pode ter vários destinos que recebem mensagens do tipo Type. Ele também pode ter várias fontes que enviam mensagens do tipo tuple<PriorityType, Type>.

  5. Na seção private da classe priority_buffer, adicione as seguintes variáveis membro.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    O priority_queue objeto contém mensagens recebidas, o queue objeto contém mensagens enviadas. Um priority_buffer objeto pode receber várias mensagens simultaneamente; o objeto critical_section sincroniza o acesso à fila de mensagens de entrada.

  6. Na seção private, defina o construtor de cópia e o operador de atribuição. Isso impede que priority_queue os objetos sejam atribuíveis.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public Na seção , defina os construtores que são comuns a muitos tipos de bloco de mensagem. Defina também o destruidor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Na seção public, defina os métodos enqueue e dequeue. Esses métodos auxiliares fornecem uma maneira alternativa de enviar e receber mensagens de um priority_buffer objeto.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Na protected seção, defina o método propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    O propagate_to_any_targets método transfere a mensagem que está na frente da fila de entrada para a fila de saída e propaga todas as mensagens na fila de saída.

  10. Na protected seção, defina o método accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Quando um bloco de destino chama o accept_message método, a classe transfere priority_buffer a propriedade da mensagem para o primeiro bloco de destino que a aceita. (Isso se assemelha ao comportamento de unbounded_buffer.)

  11. Na protected seção, defina o método reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    A priority_buffer classe permite que um bloco de destino reserve uma mensagem quando o identificador de mensagem fornecido corresponde ao identificador da mensagem que está na frente da fila. Em outras palavras, um destino pode reservar a mensagem se o objeto priority_buffer ainda não tiver recebido uma mensagem adicional e ainda não tiver propagado a atual.

  12. Na protected seção, defina o método consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Um bloco alvo chama consume_message para transferir a propriedade da mensagem reservada.

  13. Na protected seção, defina o método release_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Um bloco de destino chama release_message para cancelar a sua reservação para uma mensagem.

  14. Na protected seção, defina o método resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    O tempo de execução invoca resume_propagation depois que um bloco alvo consome ou libera uma mensagem reservada. Esse método propaga todas as mensagens que estão na fila de saída.

  15. Na protected seção, defina o método link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    A _M_pReservedFor variável membro é definida pela classe base, source_block. Esta variável de membro aponta para o bloco de destino, caso exista, que está mantendo uma reserva para a mensagem que está na parte dianteira da fila de saída. O tempo de execução chama link_target_notification quando um novo alvo está vinculado ao objeto priority_buffer. Este método propaga todas as mensagens que estão na fila de saída se nenhum destino tiver uma reserva.

  16. Na private seção, defina o método propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Esse método propaga todas as mensagens da fila de saída. Cada mensagem na fila é oferecida a cada bloco de destino até que um dos blocos de destino aceite a mensagem. A priority_buffer classe preserva a ordem das mensagens enviadas. Portanto, a primeira mensagem na fila de saída deve ser aceita por um bloco de destino antes que esse método ofereça qualquer outra mensagem para os blocos de destino.

  17. Na protected seção, defina o método propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    O propagate_message método permite que a priority_buffer classe atue como um recetor de mensagem ou destino. Esse método recebe a mensagem oferecida pelo bloco de origem fornecido e insere essa mensagem na fila de prioridade. O método propagate_message então envia de forma assíncrona todas as mensagens de saída para os blocos de destino.

    O tempo de execução chama este método quando utilizas a função concurrency::asend ou quando o bloco de mensagens está ligado a outros blocos de mensagens.

  18. Na protected seção, defina o método send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    O send_message método é semelhante ao propagate_message. No entanto, ele envia as mensagens de saída de forma síncrona em vez de assíncrona.

    O tempo de execução chama esse método durante uma operação de envio síncrona, como quando você chama a função concurrency::send .

A priority_buffer classe contém sobrecargas de construtor que são típicas em muitos tipos de bloco de mensagem. Algumas sobrecargas do construtor aceitam objetos concurrency::Scheduler ou concurrency::ScheduleGroup, que permitem gerir o bloco de mensagens por um específico agendador de tarefas. Outras sobrecargas do construtor assumem uma função de filtro. As funções de filtro permitem que os blocos de mensagens aceitem ou rejeitem uma mensagem com base na sua carga útil. Para obter mais informações sobre filtros de mensagens, consulte Blocos de mensagens assíncronas. Para obter mais informações sobre agendadores de tarefas, consulte Agendador de tarefas.

Como a classe ordena priority_buffer as mensagens por prioridade e, em seguida, pela ordem em que as mensagens são recebidas, essa classe é mais útil quando recebe mensagens de forma assíncrona, por exemplo, quando você chama a função concurrency::asend ou quando o bloco de mensagens está conectado a outros blocos de mensagens.

[Topo]

O exemplo completo

O exemplo a seguir mostra a definição completa da priority_buffer classe.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

O exemplo a seguir executa, de forma concorrente, várias operações de asend e concurrency::receive em um objeto priority_buffer.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Este exemplo gera a seguinte saída.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

A priority_buffer classe ordena as mensagens primeiro pela prioridade e, em seguida, pela ordem em que recebe as mensagens. Neste exemplo, as mensagens com maior prioridade numérica são inseridas na frente da fila.

[Topo]

Compilando o código

Copie o código de exemplo e cole-o em um projeto do Visual Studio ou cole a priority_buffer definição da classe em um arquivo nomeado priority_buffer.h e o programa de teste em um arquivo nomeado priority_buffer.cpp e, em seguida, execute o seguinte comando em uma janela de prompt de comando do Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Ver também

Passo a passo do Concurrency Runtime
Blocos de mensagens assíncronas
Funções de passagem de mensagens