Aracılığıyla paylaş


İzlenecek Yol: Özel bir İleti Bloğu Oluşturma

Bu belgede, gelen iletileri önceliğe göre sıralayan özel bir ileti bloğu türünün nasıl oluşturulacağı açıklanır.

Yerleşik ileti bloğu türleri çok çeşitli işlevler sağlasa da, kendi ileti bloğu türünüzü oluşturabilir ve uygulamanızın gereksinimlerini karşılayacak şekilde özelleştirebilirsiniz. Zaman Uyumsuz Aracılar Kitaplığı tarafından sağlanan yerleşik ileti bloğu türlerinin açıklaması için bkz . Zaman Uyumsuz İleti Blokları.

Ön koşullar

Bu izlenecek yolu başlatmadan önce aşağıdaki belgeleri okuyun:

Bölümler

Bu izlenecek yol aşağıdaki bölümleri içerir:

Özel İleti Bloğu Tasarlama

İleti blokları, ileti gönderme ve alma eylemine katılır. İleti gönderen bir ileti bloğu, kaynak blok olarak bilinir. İletileri alan bir ileti bloğu hedef blok olarak bilinir. İletileri hem gönderen hem de alan bir ileti bloğu, bir yayıcı bloğu olarak bilinir. Aracılar Kitaplığı, kaynak blokları temsil etmek için soyut sınıf eşzamanlılığı::ISource ve hedef blokları temsil etmek için soyut sınıf eşzamanlılığı::ITarget kullanır. Kaynak olarak davranan ileti bloğu türleri ; ISourcehedef olarak davranan ileti bloğu türleri adresinden ITargettüretilir.

İleti bloğu türünüzü doğrudan ve ITargetöğesinden ISource türetebilirsiniz ancak Aracılar Kitaplığı, tüm ileti bloğu türleri için ortak olan işlevlerin çoğunu gerçekleştiren üç temel sınıf tanımlar; örneğin, hataları işleme ve ileti bloklarını eşzamanlılık açısından güvenli bir şekilde birbirine bağlama. concurrency::source_block sınıfından ISource türetilir ve diğer bloklara ileti gönderir. concurrency::target_block sınıfı diğer bloklardan ITarget türetilir ve ileti alır. concurrency::p ropagator_block sınıfı, diğer bloklardan ISourceITarget türetilir ve diğer bloklara ileti gönderir ve diğer bloklardan iletiler alır. İleti bloğunuzun davranışına odaklanabilmeniz için altyapı ayrıntılarını işlemek için bu üç temel sınıfı kullanmanızı öneririz.

source_block, target_blockve sınıfları, kaynak ve propagator_block hedef bloklar arasındaki bağlantıları veya bağlantıları yöneten bir türde ve iletilerin nasıl işlendiğini yöneten bir tür üzerinde parametreleştirilmiş şablonlardır. Aracılar Kitaplığı, bağlantı yönetimi gerçekleştiren iki tür tanımlar: eşzamanlılık::single_link_registry ve eşzamanlılık::multi_link_registry. sınıfı, single_link_registry bir ileti bloğunun bir kaynağa veya bir hedefe bağlanmasına olanak tanır. sınıfı, multi_link_registry bir ileti bloğunun birden çok kaynağa veya birden çok hedefe bağlanmasına olanak tanır. Aracılar Kitaplığı, ileti yönetimi gerçekleştiren bir sınıf tanımlar: eşzamanlılık::ordered_message_processor. sınıfı, ordered_message_processor ileti bloklarının iletileri aldığı sırayla işlemesini sağlar.

İleti bloklarının kaynakları ve hedefleri ile ilişkisini daha iyi anlamak için aşağıdaki örneği göz önünde bulundurun. Bu örnekte concurrency::transformer sınıfının bildirimi gösterilir.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer sınıfı öğesinden propagator_blocktüretilir ve bu nedenle hem kaynak blok hem de hedef blok olarak davranır. türünde _Input iletileri kabul eder ve türünde _Outputiletiler gönderir. transformer sınıfı, herhangi bir hedef blok için bağlantı yöneticisi ve multi_link_registry herhangi bir kaynak blok için bağlantı yöneticisi olarak belirtirsingle_link_registry. Bu nedenle, bir transformer nesnenin en fazla bir hedefi ve sınırsız sayıda kaynağı olabilir.

öğesinden source_block türetilen bir sınıf altı yöntem uygulamalıdır: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message ve resume_propagation. öğesinden target_block türetilen bir sınıf propagate_message yöntemini uygulamalıdır ve isteğe bağlı olarak send_message yöntemini uygulayabilir. 'den propagator_block türetme işlevi, hem target_blockhem de source_block 'den türetmeye eşdeğerdir.

yöntemi propagate_to_any_targets , çalışma zamanı tarafından zaman uyumsuz veya zaman uyumlu olarak gelen iletileri işlemek ve giden iletileri yaymak için çağrılır. yöntemi accept_message , iletileri kabul etmek için hedef bloklar tarafından çağrılır. gibi unbounded_bufferbirçok ileti bloğu türü, iletileri yalnızca alacak ilk hedefe gönderir. Bu nedenle, iletinin sahipliğini hedefe aktarır. Concurrency::overwrite_buffer gibi diğer ileti bloğu türleri, hedef bloklarının her birine ileti sunar. Bu nedenle, overwrite_buffer hedeflerinin her biri için iletinin bir kopyasını oluşturur.

reserve_message, consume_message, release_messageve resume_propagation yöntemleri, ileti bloklarının ileti ayırmaya katılmasını sağlar. Hedef bloklar, bir ileti sunulduğunda yöntemini çağırır reserve_message ve iletiyi daha sonra kullanmak üzere ayırmaları gerekir. Hedef blok bir iletiyi ayırdıktan sonra, bu iletiyi kullanmak için yöntemini veya release_message rezervasyonu iptal etmek için yöntemini çağırabilirconsume_message. yönteminde accept_message olduğu gibi, uygulaması consume_message iletinin sahipliğini aktarabilir veya iletinin bir kopyasını döndürebilir. Hedef blok ayrılmış bir iletiyi tükettiğinde veya serbest bıraktığında çalışma zamanı yöntemini çağırır resume_propagation . Bu yöntem genellikle kuyruktaki bir sonraki iletiden başlayarak ileti yaymaya devam eder.

Çalışma zamanı, bir iletiyi başka bir bloktan geçerli bir bloka zaman uyumsuz olarak aktarmak için yöntemini çağırır propagate_message . send_message yöntemine benzerpropagate_message, ancak zaman uyumsuz olarak değil zaman uyumlu olarak iletiyi hedef bloklara gönderir. varsayılan uygulaması send_message tüm gelen iletileri reddeder. İleti hedef blokla ilişkili isteğe bağlı filtre işlevini geçirmezse çalışma zamanı bu yöntemlerden herhangi birini çağırmaz. İleti filtreleri hakkında daha fazla bilgi için bkz . Zaman Uyumsuz İleti Blokları.

[Üst]

priority_buffer Sınıfını Tanımlama

priority_buffer sınıfı, gelen iletileri önce iletilerin alınma sırasına göre sıralayan özel bir ileti bloğu türüdür. sınıfı priority_buffer eşzamanlılık::unbounded_buffer sınıfına benzer çünkü bir ileti kuyruğu barındırır ve hem kaynak hem de hedef ileti bloğu işlevi görür ve hem birden çok kaynağa hem de birden çok hedefe sahip olabilir. Bununla birlikte, unbounded_buffer ileti yayma işlemini yalnızca kaynaklarından ileti alma sırasına göre temel alır.

sınıfı, priority_buffer ve Type öğelerini içeren PriorityType std::tuple türünde iletiler alır. PriorityType her iletinin önceliğini tutan türü ifade eder; Type iletinin veri bölümünü ifade eder. priority_buffer sınıfı türünde Typeiletiler gönderir. sınıfı priority_buffer iki ileti kuyruğu da yönetir: gelen iletiler için std ::p riority_queue nesnesi ve giden iletiler için std::queue nesnesi. Bir nesne aynı anda birden çok ileti aldığında veya tüketiciler tarafından okunmadan önce birden çok ileti aldığında iletileri önceliğe göre sıralamak yararlı olur priority_buffer .

türetilen propagator_block bir sınıfın uygulaması gereken yedi yönteme ek olarak, priority_buffer sınıfı ve send_message yöntemlerini de geçersiz kılarlink_target_notification. priority_buffer sınıfı ayrıca iki ortak yardımcı yöntemi enqueue ve ile dequeueözel yardımcı yöntemi olarak propagate_priority_ordertanımlar.

Aşağıdaki yordamda sınıfın priority_buffer nasıl uygulandığı açıklanmaktadır.

Priority_buffer sınıfını tanımlamak için

  1. Bir C++ üst bilgi dosyası oluşturun ve adını verin priority_buffer.h. Alternatif olarak, projenizin parçası olan mevcut bir üst bilgi dosyasını kullanabilirsiniz.

  2. içine priority_buffer.haşağıdaki kodu ekleyin.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std Ad alanında, eşzamanlılık::ileti nesneleri üzerinde hareket eden std::less ve std::greater özelleştirmelerini tanımlayın.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    priority_buffer sınıfı nesneleri bir priority_queue nesnede depolarmessage. Bu tür özelleştirmeleri, öncelik kuyruğunun iletileri önceliklerine göre sıralamasını sağlar. Öncelik, nesnenin ilk öğesidir tuple .

  4. Ad alanında concurrencyex sınıfını priority_buffer bildirin.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer sınıfı öğesinden propagator_blocktüretilir. Bu nedenle, hem ileti gönderebilir hem de alabilir. sınıfı, priority_buffer türünde Typeiletiler alan birden çok hedefe sahip olabilir. Ayrıca, türünde tuple<PriorityType, Type>iletiler gönderen birden çok kaynağı da olabilir.

  5. private sınıfının bölümünde priority_buffer aşağıdaki üye değişkenlerini ekleyin.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Nesne priority_queue gelen iletileri, nesne ise queue giden iletileri barındırıyor. Bir priority_buffer nesne aynı anda birden çok ileti alabilir; critical_section nesne giriş iletileri kuyruğuna erişimi eşitler.

  6. private bölümünde kopyalama oluşturucuyu ve atama işlecini tanımlayın. Bu, nesnelerin atanabilir olmasını engeller priority_queue .

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public bölümünde, birçok ileti bloğu türü için ortak olan oluşturucuları tanımlayın. Yıkıcıyı da tanımlayın.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. public bölümünde ve dequeueyöntemlerini enqueue tanımlayın. Bu yardımcı yöntemler, bir nesneye ileti göndermek ve nesneden ileti almak için alternatif bir priority_buffer yol sağlar.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. protected bölümünde yöntemini tanımlayınpropagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    yöntemi, propagate_to_any_targets giriş kuyruğunun önündeki iletiyi çıkış kuyruğuna aktarır ve çıkış kuyruğundaki tüm iletileri yayılır.

  10. protected bölümünde yöntemini tanımlayınaccept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Hedef blok yöntemini çağırdığında accept_message , priority_buffer sınıf iletinin sahipliğini bunu kabul eden ilk hedef bloğuna aktarır. (Bu, .) davranışına unbounded_bufferbenzer.

  11. protected bölümünde yöntemini tanımlayınreserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    sınıfı, priority_buffer sağlanan ileti tanımlayıcısı kuyruğun önündeki iletinin tanımlayıcısı ile eşleştiğinde hedef bloğun bir iletiyi ayırmasına izin verir. Başka bir deyişle, nesne henüz ek bir ileti almadıysa priority_buffer ve geçerli iletiyi henüz yaymadıysa, hedef iletiyi ayırabilir.

  12. protected bölümünde yöntemini tanımlayınconsume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Hedef blok, ayrıldığı iletinin sahipliğini aktarmak için çağrılar consume_message .

  13. protected bölümünde yöntemini tanımlayınrelease_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Bir iletiye yapılan rezervasyon iptal etmek için hedef blok çağrıları release_message .

  14. protected bölümünde yöntemini tanımlayınresume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Çalışma zamanı, bir hedef bloğun ayrılmış bir iletiyi kullanmasından veya serbest bırakmasından sonra çağırır resume_propagation . Bu yöntem çıkış kuyruğundaki tüm iletileri yayılım.

  15. protected bölümünde yöntemini tanımlayınlink_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    _M_pReservedFor Üye değişkeni, temel sınıfı source_blocktarafından tanımlanır. Bu üye değişkeni, varsa çıkış kuyruğunun önündeki iletiye rezervasyon tutan hedef bloğu gösterir. Nesneye priority_buffer yeni bir hedef bağlandığında çalışma zamanı çağırırlink_target_notification. Bu yöntem, rezervasyon tutan bir hedef yoksa çıkış kuyruğundaki tüm iletileri yayılır.

  16. private bölümünde yöntemini tanımlayınpropagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Bu yöntem çıkış kuyruğundaki tüm iletileri yayılım. Hedef bloklardan biri iletiyi kabul edene kadar kuyruktaki her ileti her hedef bloğuna sunulur. sınıfı giden priority_buffer iletilerin sırasını korur. Bu nedenle, bu yöntem hedef bloklara başka bir ileti sunmadan önce çıkış kuyruğundaki ilk ileti bir hedef blok tarafından kabul edilmelidir.

  17. protected bölümünde yöntemini tanımlayınpropagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    yöntemi, propagate_message sınıfın priority_buffer ileti alıcısı veya hedef olarak davranmasını sağlar. Bu yöntem, sağlanan kaynak bloğu tarafından sunulan iletiyi alır ve bu iletiyi öncelik kuyruğuna ekler. Yöntemi propagate_message daha sonra tüm çıkış iletilerini zaman uyumsuz olarak hedef bloklara gönderir.

    eşzamanlılık::asend işlevini çağırdığınızda veya ileti bloğu diğer ileti bloklarına bağlandığında çalışma zamanı bu yöntemi çağırır.

  18. protected bölümünde yöntemini tanımlayınsend_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    send_message yöntemine benzer.propagate_message Ancak çıkış iletilerini zaman uyumsuz yerine zaman uyumlu olarak gönderir.

    Çalışma zamanı zaman uyumlu gönderme işlemi sırasında bu yöntemi çağırır, örneğin concurrency::send işlevini çağırdığınızda.

sınıfı, priority_buffer birçok ileti bloğu türünde tipik olan oluşturucu aşırı yüklemeleri içerir. Bazı oluşturucu aşırı yüklemeleri eşzamanlılık::Scheduler veya concurrency::ScheduleGroup nesnelerini alır ve bu da ileti bloğunun belirli bir görev zamanlayıcı tarafından yönetilmesini sağlar. Diğer oluşturucu aşırı yüklemeleri bir filtre işlevi alır. Filtre işlevleri, ileti bloklarının yükü temelinde bir iletiyi kabul etmelerini veya reddetmelerini sağlar. İleti filtreleri hakkında daha fazla bilgi için bkz . Zaman Uyumsuz İleti Blokları. Görev zamanlayıcıları hakkında daha fazla bilgi için bkz . Görev Zamanlayıcı.

priority_buffer Sınıf iletileri önceliğe ve sonra iletilerin alınma sırasına göre sıraladığından, bu sınıf en çok iletileri zaman uyumsuz olarak aldığında, örneğin eşzamanlılık::asend işlevini çağırdığınızda veya ileti bloğu diğer ileti bloklarına bağlandığında kullanışlıdır.

[Üst]

Tam Örnek

Aşağıdaki örnekte sınıfın tam tanımı gösterilmektedir priority_buffer .

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Aşağıdaki örnek, bir priority_buffer nesne üzerinde eşzamanlı olarak bir dizi asend ve eşzamanlılık::alma işlemi gerçekleştirir.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Bu örnek aşağıdaki örnek çıktıyı oluşturur.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer sınıfı iletileri önce önce iletileri aldığı sırayla sıralar. Bu örnekte, daha yüksek sayısal önceliğe sahip iletiler kuyruğun önüne eklenir.

[Üst]

Kod Derleniyor

Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya sınıfın priority_buffer tanımını adlandırılmış bir dosyaya ve test programını adlandırılmış priority_buffer.hpriority_buffer.cpp bir dosyaya yapıştırın ve ardından visual studio komut istemi penceresinde aşağıdaki komutu çalıştırın.

cl.exe /EHsc priority_buffer.cpp

Ayrıca bkz.

Eşzamanlılık Çalışma Zamanı İzlenecek Yollar
Zaman Uyumsuz İleti Blokları
İleti Geçirme İşlevleri