Panduan: Membuat Blok Pesan Kustom

Dokumen ini menjelaskan cara membuat jenis blok pesan kustom yang mengurutkan pesan masuk berdasarkan prioritas.

Meskipun jenis blok pesan bawaan menyediakan berbagai fungsionalitas, Anda dapat membuat jenis blok pesan Anda sendiri dan menyesuaikannya untuk memenuhi persyaratan aplikasi Anda. Untuk deskripsi jenis blok pesan bawaan yang disediakan oleh Pustaka Agen Asinkron, lihat Blok Pesan Asinkron.

Prasyarat

Baca dokumen berikut sebelum Anda memulai panduan ini:

Bagian

Panduan ini berisi bagian berikut:

Merancang Blok Pesan Kustom

Blok pesan berpartisipasi dalam tindakan mengirim dan menerima pesan. Blok pesan yang mengirim pesan dikenal sebagai blok sumber. Blok pesan yang menerima pesan dikenal sebagai blok target. Blok pesan yang mengirim dan menerima pesan dikenal sebagai blok penyebar. Pustaka Agen menggunakan konkurensi kelas abstrak::ISource untuk mewakili blok sumber dan konkurensi kelas abstrak::ITarget untuk mewakili blok target. Jenis blok pesan yang bertindak sebagai sumber yang berasal dari ISource; jenis blok pesan yang bertindak sebagai target yang berasal dari ITarget.

Meskipun Anda dapat memperoleh jenis blok pesan langsung dari ISource dan ITarget, Pustaka Agen menentukan tiga kelas dasar yang melakukan banyak fungsionalitas yang umum untuk semua jenis blok pesan, misalnya, menangani kesalahan dan menghubungkan blok pesan bersama-sama dengan cara yang aman konkurensi. Kelas konkurensi::source_block berasal dari ISource dan mengirim pesan ke blok lain. Kelas konkurensi::target_block berasal dari ITarget dan menerima pesan dari blok lain. Kelas concurrency::p ropagator_block berasal dari ISource dan ITarget dan mengirim pesan ke blok lain dan menerima pesan dari blok lain. Kami menyarankan agar Anda menggunakan ketiga kelas dasar ini untuk menangani detail infrastruktur sehingga Anda dapat fokus pada perilaku blok pesan Anda.

Kelas source_block, target_block, dan propagator_block adalah templat yang diparameterkan pada jenis yang mengelola koneksi, atau tautan, antara blok sumber dan target dan pada jenis yang mengelola cara pesan diproses. Pustaka Agen mendefinisikan dua jenis yang melakukan manajemen tautan, konkurensi::single_link_registry dan konkurensi::multi_link_registry. Kelas single_link_registry memungkinkan blok pesan ditautkan ke satu sumber atau ke satu target. Kelas memungkinkan multi_link_registry blok pesan ditautkan ke beberapa sumber atau beberapa target. Pustaka Agen mendefinisikan satu kelas yang melakukan manajemen pesan, konkurensi::ordered_message_processor. Kelas memungkinkan ordered_message_processor blok pesan memproses pesan dalam urutan penerimaannya.

Untuk lebih memahami bagaimana blok pesan berhubungan dengan sumber dan targetnya, pertimbangkan contoh berikut. Contoh ini menunjukkan deklarasi kelas konkurensi::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Kelas transformer ini berasal dari propagator_block, dan oleh karena itu bertindak sebagai blok sumber dan sebagai blok target. Ini menerima pesan jenis _Input dan mengirim pesan jenis _Output. Kelas transformer menentukan single_link_registry sebagai manajer tautan untuk blok target apa pun dan multi_link_registry sebagai manajer tautan untuk blok sumber apa pun. Oleh karena itu, transformer objek dapat memiliki hingga satu target dan jumlah sumber yang tidak terbatas.

Kelas yang berasal dari source_block harus menerapkan enam metode: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, dan resume_propagation. Kelas yang berasal dari target_block harus menerapkan metode propagate_message dan dapat secara opsional menerapkan metode send_message . Berasal dari propagator_block secara fungsional setara dengan turunan dari dan source_blocktarget_block.

Metode propagate_to_any_targets ini dipanggil oleh runtime untuk memproses pesan masuk secara asinkron atau sinkron dan menyebarluaskan pesan keluar apa pun. Metode accept_message ini dipanggil oleh blok target untuk menerima pesan. Banyak jenis blok pesan, seperti unbounded_buffer, kirim pesan hanya ke target pertama yang akan menerimanya. Oleh karena itu, ia mentransfer kepemilikan pesan ke target. Jenis blok pesan lainnya, seperti konkurensi::overwrite_buffer, menawarkan pesan ke setiap blok targetnya. Oleh karena itu, overwrite_buffer membuat salinan pesan untuk setiap targetnya.

Metode reserve_message, consume_message, release_message, dan resume_propagation memungkinkan blok pesan untuk berpartisipasi dalam reservasi pesan. Blok target memanggil reserve_message metode ketika mereka ditawarkan pesan dan harus memesan pesan untuk digunakan nanti. Setelah blok target memesan pesan, blok target dapat memanggil consume_message metode untuk menggunakan pesan tersebut release_message atau metode untuk membatalkan reservasi. Seperti halnya accept_message metode , implementasi consume_message dapat mentransfer kepemilikan pesan atau mengembalikan salinan pesan. Setelah blok target mengonsumsi atau merilis pesan yang dipesan, runtime memanggil resume_propagation metode . Biasanya, metode ini melanjutkan penyebaran pesan, dimulai dengan pesan berikutnya dalam antrean.

Runtime memanggil propagate_message metode untuk mentransfer pesan secara asinkron dari blok lain ke yang saat ini. Metode ini send_message menyerupai propagate_message, kecuali bahwa metode tersebut secara sinkron, alih-alih secara asinkron, mengirim pesan ke blok target. Implementasi send_message default menolak semua pesan masuk. Runtime tidak memanggil salah satu metode ini jika pesan tidak melewati fungsi filter opsional yang terkait dengan blok target. Untuk informasi selengkapnya tentang filter pesan, lihat Blok Pesan Asinkron.

[Atas]

Menentukan Kelas priority_buffer

Kelas priority_buffer adalah jenis blok pesan kustom yang mengurutkan pesan masuk terlebih dahulu berdasarkan prioritas, lalu berdasarkan urutan pesan diterima. Kelas menyerupankan priority_bufferkelas konkurensi::unbounded_buffer karena menyimpan antrean pesan, dan juga karena bertindak sebagai sumber dan blok pesan target dan dapat memiliki beberapa sumber dan beberapa target. Namun, unbounded_buffer penyebaran pesan dasar hanya pada urutan penerimaan pesan dari sumbernya.

Kelas priority_buffer menerima pesan jenis std::tuple yang berisi PriorityType elemen dan Type . PriorityType mengacu pada jenis yang memegang prioritas setiap pesan; Type mengacu pada bagian data pesan. Kelas priority_buffer mengirim pesan jenis Type. Kelas ini priority_buffer juga mengelola dua antrean pesan: objek std::p riority_queue untuk pesan masuk dan objek std::queue untuk pesan keluar. Memesan pesan berdasarkan prioritas berguna ketika priority_buffer objek menerima beberapa pesan secara bersamaan atau ketika menerima beberapa pesan sebelum pesan dibaca oleh konsumen.

Selain tujuh metode yang harus diimplementasikan oleh kelas yang propagator_block berasal, priority_buffer kelas juga mengambil alih link_target_notification metode dan send_message . Kelas ini priority_buffer juga mendefinisikan dua metode pembantu publik, enqueue dan dequeue, dan metode pembantu privat, propagate_priority_order.

Prosedur berikut menjelaskan cara mengimplementasikan priority_buffer kelas.

Untuk menentukan kelas priority_buffer

  1. Buat file header C++ dan beri nama priority_buffer.h. Atau, Anda dapat menggunakan file header yang sudah ada yang merupakan bagian dari proyek Anda.

  2. Di priority_buffer.h, tambahkan kode berikut.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std Di namespace, tentukan spesialisasi std::less dan std::greater yang bertindak pada konkurensi::objek pesan.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Kelas priority_buffer menyimpan message objek dalam priority_queue objek. Spesialisasi jenis ini memungkinkan antrean prioritas untuk mengurutkan pesan sesuai dengan prioritasnya. Prioritasnya adalah elemen pertama dari tuple objek.

  4. concurrencyex Di namespace layanan, deklarasikan priority_buffer kelas .

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Kelas priority_buffer ini berasal dari propagator_block. Oleh karena itu, dapat mengirim dan menerima pesan. Kelas priority_buffer dapat memiliki beberapa target yang menerima pesan jenis Type. Ini juga dapat memiliki beberapa sumber yang mengirim pesan jenis tuple<PriorityType, Type>.

  5. Di bagian privatepriority_buffer kelas, tambahkan variabel anggota berikut.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Objek priority_queue menyimpan pesan masuk; queue objek menyimpan pesan keluar. Objek priority_buffer dapat menerima beberapa pesan secara bersamaan; critical_section objek menyinkronkan akses ke antrean pesan input.

  6. Di bagian , private tentukan konstruktor salinan dan operator penugasan. Ini mencegah priority_queue agar objek tidak dapat ditetapkan.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. Di bagian , public tentukan konstruktor yang umum untuk banyak jenis blok pesan. Tentukan juga destruktor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Di bagian , public tentukan metode enqueue dan dequeue. Metode pembantu ini menyediakan cara alternatif untuk mengirim pesan ke dan menerima pesan dari priority_buffer objek.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Di bagian , protected tentukan propagate_to_any_targets metode .

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Metode ini propagate_to_any_targets mentransfer pesan yang berada di depan antrean input ke antrean output dan menyebarkan semua pesan dalam antrean output.

  10. Di bagian , protected tentukan accept_message metode .

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Ketika blok target memanggil accept_message metode , priority_buffer kelas mentransfer kepemilikan pesan ke blok target pertama yang menerimanya. (Ini menyerupan perilaku unbounded_buffer.)

  11. Di bagian , protected tentukan reserve_message metode .

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Kelas priority_buffer mengizinkan blok target untuk memesan pesan ketika pengidentifikasi pesan yang disediakan cocok dengan pengidentifikasi pesan yang berada di bagian depan antrean. Dengan kata lain, target dapat memesan pesan jika priority_buffer objek belum menerima pesan tambahan dan belum menyebarluaskan pesan saat ini.

  12. Di bagian , protected tentukan consume_message metode .

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Blok target memanggil untuk mentransfer kepemilikan pesan yang dipesannya consume_message .

  13. Di bagian , protected tentukan release_message metode .

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Target memblokir panggilan release_message untuk membatalkan reservasinya ke pesan.

  14. Di bagian , protected tentukan resume_propagation metode .

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Runtime panggilan resume_propagation setelah blok target mengonsumsi atau merilis pesan yang dipesan. Metode ini menyebarluaskan pesan apa pun yang berada dalam antrean output.

  15. Di bagian , protected tentukan link_target_notification metode .

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Variabel _M_pReservedFor anggota didefinisikan oleh kelas dasar, source_block. Variabel anggota ini menunjuk ke blok target, jika ada, yang menyimpan reservasi ke pesan yang berada di depan antrean output. Runtime memanggil link_target_notification ketika target baru ditautkan ke priority_buffer objek. Metode ini menyebarluaskan pesan apa pun yang berada dalam antrean output jika tidak ada target yang memegang reservasi.

  16. Di bagian , private tentukan propagate_priority_order metode .

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Metode ini menyebarluaskan semua pesan dari antrean output. Setiap pesan dalam antrean ditawarkan ke setiap blok target hingga salah satu blok target menerima pesan. Kelas priority_buffer mempertahankan urutan pesan keluar. Oleh karena itu, pesan pertama dalam antrean output harus diterima oleh blok target sebelum metode ini menawarkan pesan lain ke blok target.

  17. Di bagian , protected tentukan propagate_message metode .

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Metode ini propagate_message memungkinkan priority_buffer kelas untuk bertindak sebagai penerima pesan, atau target. Metode ini menerima pesan yang ditawarkan oleh blok sumber yang disediakan dan menyisipkan pesan tersebut ke dalam antrean prioritas. Metode kemudian propagate_message secara asinkron mengirim semua pesan output ke blok target.

    Runtime memanggil metode ini saat Anda memanggil fungsi konkurensi::asend atau ketika blok pesan tersambung ke blok pesan lain.

  18. Di bagian , protected tentukan send_message metode .

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Metode ini send_message menyerupan propagate_message. Namun mengirim pesan output secara sinkron alih-alih secara asinkron.

    Runtime memanggil metode ini selama operasi pengiriman sinkron, seperti saat Anda memanggil fungsi konkurensi::send .

Kelas priority_buffer berisi kelebihan beban konstruktor yang khas dalam banyak jenis blok pesan. Beberapa overload konstruktor mengambil konkurensi::Scheduler atau konkurensi::ScheduleGroup objects, yang memungkinkan blok pesan dikelola oleh penjadwal tugas tertentu. Kelebihan konstruktor lainnya mengambil fungsi filter. Fungsi filter memungkinkan blok pesan menerima atau menolak pesan berdasarkan payload-nya. Untuk informasi selengkapnya tentang filter pesan, lihat Blok Pesan Asinkron. Untuk informasi selengkapnya tentang penjadwal tugas, lihat Penjadwal Tugas.

priority_buffer Karena kelas memesan pesan berdasarkan prioritas dan kemudian berdasarkan urutan di mana pesan diterima, kelas ini paling berguna ketika menerima pesan secara asinkron, misalnya, ketika Anda memanggil fungsi konkurensi::asend atau ketika blok pesan terhubung ke blok pesan lain.

[Atas]

Contoh Lengkap

Contoh berikut menunjukkan definisi priority_buffer lengkap kelas.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Contoh berikut secara bersamaan melakukan sejumlah asenddan konkurensi::menerima operasi pada priority_buffer objek.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Contoh ini menghasilkan output sampel berikut.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Kelas priority_buffer memesan pesan terlebih dahulu berdasarkan prioritas lalu berdasarkan urutan penerimaan pesan. Dalam contoh ini, pesan dengan prioritas numerik yang lebih besar dimasukkan ke bagian depan antrean.

[Atas]

Mengompilasi Kode

Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempel definisi priority_buffer kelas dalam file yang diberi nama priority_buffer.h dan program pengujian dalam file yang diberi nama priority_buffer.cpp lalu jalankan perintah berikut di jendela Prompt Perintah Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Baca juga

Panduan Runtime Konkurensi
Blok Pesan Asinkron
Fungsi Passing Pesan