Partager via


Procédure pas à pas : création d'un bloc de message personnalisé

Ce document décrit comment créer un type de bloc de messages personnalisé qui classe les messages entrants par ordre de priorité.

Bien que les types de blocs de messages intégrés offrent une large gamme de fonctionnalités, vous pouvez créer votre propre type de bloc de messages et le personnaliser pour répondre aux besoins de votre application. Pour une description des types de blocs de messages intégrés fournis par la bibliothèque Asynchronous Agents, veuillez consulter la section Blocs de messages asynchrones.

Prerequisites

Nous vous recommandons de consulter les documents suivants avant de vous plonger dans ce guide pratique :

Rubriques

Cette procédure pas à pas contient les sections suivantes :

Conception d’un bloc de messages personnalisé

Les blocs de messages participent à l’envoi et à la réception de messages. Un bloc de messages qui envoie des messages est appelé bloc source. Un bloc de messages qui reçoit des messages est appelé bloc cible. Un bloc de messages qui envoie et reçoit des messages est appelé bloc de propagation. La bibliothèque Agents utilise la classe abstraite concurrency::ISource pour représenter les blocs source et la classe abstraite concurrency::ITarget pour représenter les blocs cibles. Les types de blocs de messages agissant en tant que sources dérivent de ISource ; les types de blocs de messages agissant en tant que cibles dérivent de ITarget.

Bien que vous puissiez dériver votre type de bloc de messages directement de ISource et ITarget, la bibliothèque Agents définit trois classes de base qui assurent une grande partie des fonctionnalités communes à tous les types de blocs de messages, par exemple, la gestion des erreurs et la connexion de blocs de messages de manière sécurisée pour la concurrence. La classe concurrency::source\_block dérive de ISource et envoie des messages à d’autres blocs. La classe concurrency::target\_block dérive de ITarget et reçoit des messages d’autres blocs. La classe concurrency::propagator\_block dérive de ISource et ITarget et envoie et reçoit des messages d’autres blocs. Nous vous recommandons d’utiliser ces trois classes de base pour gérer les aspects d’infrastructure, afin de pouvoir vous concentrer sur le comportement de votre bloc de messages.

Les classes source_block, target_block et propagator_block sont des modèles paramétrés par un type qui gère les connexions, ou liens, entre blocs source et blocs cibles, ainsi que par un type qui gère la manière dont les messages sont traités. La bibliothèque Agents définit deux types qui assurent la gestion des liens : concurrency::single\_link\_registry et concurrency::multi\_link\_registry. La classe single_link_registry permet de lier un bloc de messages à une seule source ou à un seul récepteur. La classe multi_link_registry permet de lier un bloc de messages à plusieurs sources ou à plusieurs récepteurs. La bibliothèque Agents définit une classe qui assure la gestion des messages : concurrency::ordered\_message\_processor. La classe ordered_message_processor permet aux blocs de messages de traiter les messages dans l’ordre de leur réception.

Pour mieux comprendre les relations entre les blocs de messages, leurs sources et leurs cibles, consultez l’exemple suivant. Cet exemple montre la déclaration de la classe concurrency::transformer.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

La classe transformer dérive de propagator_block et agit donc à la fois comme bloc source et comme bloc cible. Elle accepte les messages de type _Input et envoie des messages de type _Output. La classe transformer spécifie single_link_registry comme gestionnaire de lien pour les blocs cibles et multi_link_registry comme gestionnaire de lien pour les blocs sources. Ainsi, un objet transformer peut avoir jusqu’à un bloc cible et un nombre illimité de blocs sources.

Une classe dérivée de source_block doit implémenter six méthodes : propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, et resume_propagation. Une classe dérivée de target_block doit implémenter la méthode propagate\_message et peut implémenter facultativement la méthode send\_message. Hériter de propagator_block revient fonctionnellement à hériter à la fois de source_block et target_block.

La méthode propagate_to_any_targets est appelée par l’environnement d’exécution pour traiter de manière asynchrone ou synchrone les messages entrants et propager les messages sortants. La méthode accept_message est appelée par les blocs cibles pour accepter des messages. De nombreux types de blocs de messages, tels que unbounded_buffer, envoient leurs messages uniquement au premier bloc cible susceptible de le recevoir. Dans ce cas, la propriété du message est transférée au récepteur. D’autres types de blocs de messages, comme concurrency::overwrite\_buffer, proposent le message à chacun de leurs blocs cibles. Ainsi, overwrite_buffer crée une copie du message pour chacun de ses récepteurs.

Les méthodes reserve_message, consume_message, release_message et resume_propagation permettent aux blocs de messages de participer à la réservation de messages. Les blocs cibles appellent la méthode reserve_message lorsqu’un message leur est proposé, afin de le réserver pour une utilisation ultérieure. Une fois le message réservé, le bloc cible peut appeler la méthode consume_message pour consommer le message ou la méthode release_message pour annuler la réservation. Comme pour la méthode accept_message, l’implémentation de consume_message peut soit transférer la propriété du message, soit en retourner une copie. Après qu’un bloc cible a consommé ou libéré un message réservé, l’environnement d’exécution appelle la méthode resume_propagation. Cette méthode poursuit généralement la propagation des messages, en commençant par le suivant dans la file d’attente.

L’environnement d’exécution appelle la méthode propagate_message pour transférer de manière asynchrone un message d’un autre bloc vers celui en cours. La méthode send_message ressemble à propagate_message, mais envoie le message de manière synchrone, et non asynchrone, aux blocs cibles. L’implémentation par défaut de send_message rejette tous les messages entrants. L’environnement d’exécution n’appelle aucune de ces méthodes si le message ne passe pas la fonction de filtrage optionnelle associée au bloc cible. Pour plus d’informations sur les filtres de messages, veuillez consulter la section Blocs de messages asynchrones.

[Haut]

Définition de la classe priority\_buffer

La classe priority_buffer est un type personnalisé de bloc de messages qui classe les messages entrants par ordre de priorité, puis par ordre de réception. La classe priority_buffer ressemble à la classe concurrency::unbounded\_buffer car elle maintient une file d’attente de messages, agit à la fois comme bloc source et bloc cible, et peut avoir plusieurs sources et plusieurs récepteurs. Toutefois, unbounded_buffer base la propagation des messages uniquement sur l’ordre de réception à partir de ses sources.

La priority_buffer classe reçoit les messages de type std::tuple contenant des éléments PriorityType et Type. PriorityType désigne le type qui contient la priorité de chaque message ; Type désigne la partie donnée du message. La classe priority_buffer envoie des messages de type Type. La priority_buffer classe gère également deux files d’attente de messages : un objet std ::p riority_queue pour les messages entrants et un objet std ::queue pour les messages sortants. Le classement des messages par priorité est utile lorsqu’un objet priority_buffer reçoit plusieurs messages simultanément ou avant que tout message ne soit lu par un consommateur.

En plus des sept méthodes qu’une classe dérivée de propagator_block doit implémenter, la classe priority_buffer substitue également les méthodes link_target_notification et send_message. La classe priority_buffer définit également deux méthodes utilitaires publiques, enqueue et dequeue, et une méthode utilitaire privée, propagate_priority_order.

La procédure suivante décrit comment implémenter la classe priority_buffer.

Pour définir la classe priority\_buffer

  1. Créez un fichier d’en-tête C++ et nommez-le priority_buffer.h. Vous pouvez aussi utiliser un fichier d’en-tête existant de votre projet.

  2. Dans priority_buffer.h, ajoutez le code suivant.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. Dans l'espace de noms std, définissez des spécialisations de std::less et std::greater qui agissent sur les objets concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    La classe priority_buffer stocke les objets message dans un objet priority_queue. Ces spécialisations de type permettent à la file de priorité de trier les messages selon leur priorité. La priorité est le premier élément de l’objet tuple.

  4. Dans l’espace de noms concurrencyex, déclarez la classe priority_buffer.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    La classe priority_buffer est dérivée de propagator_block. Elle peut donc envoyer et recevoir des messages. La classe priority_buffer peut avoir plusieurs récepteurs qui reçoivent des messages de type Type. Elle peut aussi avoir plusieurs sources qui envoient des messages de type tuple<PriorityType, Type>.

  5. Dans la section private de la classe priority_buffer, ajoutez les variables membres suivantes.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    L’objet priority_queue contient les messages entrants ; l’objet queue contient les messages sortants. Un objet priority_buffer peut recevoir plusieurs messages en même temps ; l’objet critical_section synchronise l’accès à la file d’entrée.

  6. Dans la section private, définissez le constructeur de copie et l’opérateur d’affectation. Cela empêche les objets priority_queue d’être affectables.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. Dans la section public, définissez les constructeurs communs à de nombreux types de blocs de messages. Définissez aussi le destructeur.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Dans la section public, définissez les méthodes enqueue et dequeue. Ces méthodes utilitaires offrent un autre moyen d’envoyer ou de recevoir des messages à/de l’objet priority_buffer.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Dans la section protected, définissez la méthode propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    La méthode propagate_to_any_targets transfère le message en tête de la file d’entrée vers la file de sortie et propage tous les messages présents dans la file de sortie.

  10. Dans la section protected, définissez la méthode accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Lorsqu’un bloc cible appelle la méthode accept_message, la classe priority_buffer transfère la propriété du message au premier récepteur qui l’accepte. (Ce comportement est similaire à celui de unbounded_buffer.)

  11. Dans la section protected, définissez la méthode reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    La classe priority_buffer permet à un bloc cible de réserver un message si l’identifiant fourni correspond à celui du message en tête de file. En d’autres termes, un récepteur peut réserver le message si l’objet priority_buffer n’a pas encore reçu de message supplémentaire ni propagé le message en cours.

  12. Dans la section protected, définissez la méthode consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Un bloc cible appelle consume_message pour transférer la propriété du message qu’il a réservé.

  13. Dans la section protected, définissez la méthode release_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Un bloc cible appelle release_message pour annuler la réservation d’un message.

  14. Dans la section protected, définissez la méthode resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    L’environnement d’exécution appelle resume_propagation après qu’un bloc cible a consommé ou libéré un message réservé. Cette méthode propage les messages présents dans la file de sortie.

  15. Dans la section protected, définissez la méthode link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    La variable membre _M_pReservedFor est définie par la classe de base source_block Cette variable membre pointe vers le bloc cible, s’il existe, qui détient une réservation sur le message en tête de file de sortie. L’environnement d’exécution (runtime) appelle link_target_notification lorsqu’un nouveau bloc cible est lié à l’objet priority_buffer. Cette méthode propage les messages présents dans la file de sortie si aucun bloc cible ne détient de réservation.

  16. Dans la section private, définissez la méthode propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Cette méthode propage tous les messages de la file de sortie. Chaque message de la file est proposé à tous les blocs cibles jusqu’à ce que l’un d’eux l’accepte. La classe priority_buffer conserve l’ordre des messages sortants. Ainsi, le premier message de la file doit être accepté avant que cette méthode n’en propose d’autres.

  17. Dans la section protected, définissez la méthode propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    La méthode propagate_message permet à la classe priority_buffer d’agir en tant que récepteur de messages, ou cible. Cette méthode reçoit le message proposé par le bloc source et l’insère dans la file de priorité. La méthode propagate_message envoie ensuite de manière asynchrone tous les messages sortants vers les blocs cibles.

    Le runtime appelle cette méthode lorsque vous appelez la fonction concurrency::asend ou lorsque le bloc est connecté à d’autres blocs.

  18. Dans la section protected, définissez la méthode send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    La méthode send_message ressemble à propagate_message. Cependant, elle envoie les messages sortants de façon synchrone au lieu de façon asynchrone.

    Le runtime appelle cette méthode lors d’un envoi synchrone, par exemple quand vous appelez la fonction concurrency::send.

La classe priority_buffer contient des surcharges de constructeur typiques de nombreux types de blocs de messages. Certaines surcharges prennent des objets concurrency::Scheduler ou concurrency::ScheduleGroup, permettant au bloc de messages d’être géré par un planificateur de tâches spécifique. D’autres surcharges acceptent une fonction de filtrage. Les fonctions de filtrage permettent aux blocs de messages d’accepter ou de rejeter un message en fonction de sa charge utile. Pour plus d’informations sur les filtres de messages, veuillez consulter la section Blocs de messages asynchrones. Pour plus d’informations sur les planificateurs de tâches, veuillez consulter la section Planificateur de tâches.

Comme la classe priority_buffer classe les messages par priorité puis par ordre de réception, elle est particulièrement utile lorsqu’elle reçoit des messages de manière asynchrone, par exemple en appelant la fonction concurrency::asend ou lorsqu’elle est connectée à d’autres blocs de messages.

[Haut]

Exemple complet

L’exemple suivant montre la définition complète de la classe priority_buffer.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

L’exemple suivant exécute simultanément plusieurs opérations asend et concurrency::receive sur un objet priority_buffer.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Cet exemple produit la sortie suivante.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

La classe priority_buffer classe les messages d’abord par priorité, puis par ordre de réception. Dans cet exemple, les messages avec une priorité numérique plus élevée sont insérés plus en avant dans la file.

[Haut]

Compilation du code

Copiez le code d’exemple et collez-le dans un projet Visual Studio, ou collez la définition de la classe priority_buffer dans un fichier nommé priority_buffer.h et le programme de test dans un fichier nommé priority_buffer.cpp, puis exécutez la commande suivante dans une fenêtre d’invite de commandes Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Voir aussi

Procédures pas à pas relatives au runtime d’accès concurrentiel
Blocs de messages asynchrones
Fonctions de passage de messages