Panduan: Membuat Blok Pesan Kustom
Dokumen ini menjelaskan cara membuat jenis blok pesan kustom yang mengurutkan pesan masuk berdasarkan prioritas.
Meskipun jenis blok pesan bawaan menyediakan berbagai fungsionalitas, Anda dapat membuat jenis blok pesan Anda sendiri dan menyesuaikannya untuk memenuhi persyaratan aplikasi Anda. Untuk deskripsi jenis blok pesan bawaan yang disediakan oleh Pustaka Agen Asinkron, lihat Blok Pesan Asinkron.
Prasyarat
Baca dokumen berikut sebelum Anda memulai panduan ini:
Bagian
Panduan ini berisi bagian berikut:
Merancang Blok Pesan Kustom
Blok pesan berpartisipasi dalam tindakan mengirim dan menerima pesan. Blok pesan yang mengirim pesan dikenal sebagai blok sumber. Blok pesan yang menerima pesan dikenal sebagai blok target. Blok pesan yang mengirim dan menerima pesan dikenal sebagai blok penyebar. Pustaka Agen menggunakan konkurensi kelas abstrak::ISource untuk mewakili blok sumber dan konkurensi kelas abstrak::ITarget untuk mewakili blok target. Jenis blok pesan yang bertindak sebagai sumber yang berasal dari ISource
; jenis blok pesan yang bertindak sebagai target yang berasal dari ITarget
.
Meskipun Anda dapat memperoleh jenis blok pesan langsung dari ISource
dan ITarget
, Pustaka Agen menentukan tiga kelas dasar yang melakukan banyak fungsionalitas yang umum untuk semua jenis blok pesan, misalnya, menangani kesalahan dan menghubungkan blok pesan bersama-sama dengan cara yang aman konkurensi. Kelas konkurensi::source_block berasal dari ISource
dan mengirim pesan ke blok lain. Kelas konkurensi::target_block berasal dari ITarget
dan menerima pesan dari blok lain. Kelas concurrency::p ropagator_block berasal dari ISource
dan ITarget
dan mengirim pesan ke blok lain dan menerima pesan dari blok lain. Kami menyarankan agar Anda menggunakan ketiga kelas dasar ini untuk menangani detail infrastruktur sehingga Anda dapat fokus pada perilaku blok pesan Anda.
Kelas source_block
, target_block
, dan propagator_block
adalah templat yang diparameterkan pada jenis yang mengelola koneksi, atau tautan, antara blok sumber dan target dan pada jenis yang mengelola cara pesan diproses. Pustaka Agen mendefinisikan dua jenis yang melakukan manajemen tautan, konkurensi::single_link_registry dan konkurensi::multi_link_registry. Kelas single_link_registry
memungkinkan blok pesan ditautkan ke satu sumber atau ke satu target. Kelas memungkinkan multi_link_registry
blok pesan ditautkan ke beberapa sumber atau beberapa target. Pustaka Agen mendefinisikan satu kelas yang melakukan manajemen pesan, konkurensi::ordered_message_processor. Kelas memungkinkan ordered_message_processor
blok pesan memproses pesan dalam urutan penerimaannya.
Untuk lebih memahami bagaimana blok pesan berhubungan dengan sumber dan targetnya, pertimbangkan contoh berikut. Contoh ini menunjukkan deklarasi kelas konkurensi::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Kelas transformer
ini berasal dari propagator_block
, dan oleh karena itu bertindak sebagai blok sumber dan sebagai blok target. Ini menerima pesan jenis _Input
dan mengirim pesan jenis _Output
. Kelas transformer
menentukan single_link_registry
sebagai manajer tautan untuk blok target apa pun dan multi_link_registry
sebagai manajer tautan untuk blok sumber apa pun. Oleh karena itu, transformer
objek dapat memiliki hingga satu target dan jumlah sumber yang tidak terbatas.
Kelas yang berasal dari source_block
harus menerapkan enam metode: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, dan resume_propagation. Kelas yang berasal dari target_block
harus menerapkan metode propagate_message dan dapat secara opsional menerapkan metode send_message . Berasal dari propagator_block
secara fungsional setara dengan turunan dari dan source_block
target_block
.
Metode propagate_to_any_targets
ini dipanggil oleh runtime untuk memproses pesan masuk secara asinkron atau sinkron dan menyebarluaskan pesan keluar apa pun. Metode accept_message
ini dipanggil oleh blok target untuk menerima pesan. Banyak jenis blok pesan, seperti unbounded_buffer
, kirim pesan hanya ke target pertama yang akan menerimanya. Oleh karena itu, ia mentransfer kepemilikan pesan ke target. Jenis blok pesan lainnya, seperti konkurensi::overwrite_buffer, menawarkan pesan ke setiap blok targetnya. Oleh karena itu, overwrite_buffer
membuat salinan pesan untuk setiap targetnya.
Metode reserve_message
, consume_message
, release_message
, dan resume_propagation
memungkinkan blok pesan untuk berpartisipasi dalam reservasi pesan. Blok target memanggil reserve_message
metode ketika mereka ditawarkan pesan dan harus memesan pesan untuk digunakan nanti. Setelah blok target memesan pesan, blok target dapat memanggil consume_message
metode untuk menggunakan pesan tersebut release_message
atau metode untuk membatalkan reservasi. Seperti halnya accept_message
metode , implementasi consume_message
dapat mentransfer kepemilikan pesan atau mengembalikan salinan pesan. Setelah blok target mengonsumsi atau merilis pesan yang dipesan, runtime memanggil resume_propagation
metode . Biasanya, metode ini melanjutkan penyebaran pesan, dimulai dengan pesan berikutnya dalam antrean.
Runtime memanggil propagate_message
metode untuk mentransfer pesan secara asinkron dari blok lain ke yang saat ini. Metode ini send_message
menyerupai propagate_message
, kecuali bahwa metode tersebut secara sinkron, alih-alih secara asinkron, mengirim pesan ke blok target. Implementasi send_message
default menolak semua pesan masuk. Runtime tidak memanggil salah satu metode ini jika pesan tidak melewati fungsi filter opsional yang terkait dengan blok target. Untuk informasi selengkapnya tentang filter pesan, lihat Blok Pesan Asinkron.
[Atas]
Menentukan Kelas priority_buffer
Kelas priority_buffer
adalah jenis blok pesan kustom yang mengurutkan pesan masuk terlebih dahulu berdasarkan prioritas, lalu berdasarkan urutan pesan diterima. Kelas menyerupankan priority_buffer
kelas konkurensi::unbounded_buffer karena menyimpan antrean pesan, dan juga karena bertindak sebagai sumber dan blok pesan target dan dapat memiliki beberapa sumber dan beberapa target. Namun, unbounded_buffer
penyebaran pesan dasar hanya pada urutan penerimaan pesan dari sumbernya.
Kelas priority_buffer
menerima pesan jenis std::tuple yang berisi PriorityType
elemen dan Type
. PriorityType
mengacu pada jenis yang memegang prioritas setiap pesan; Type
mengacu pada bagian data pesan. Kelas priority_buffer
mengirim pesan jenis Type
. Kelas ini priority_buffer
juga mengelola dua antrean pesan: objek std::p riority_queue untuk pesan masuk dan objek std::queue untuk pesan keluar. Memesan pesan berdasarkan prioritas berguna ketika priority_buffer
objek menerima beberapa pesan secara bersamaan atau ketika menerima beberapa pesan sebelum pesan dibaca oleh konsumen.
Selain tujuh metode yang harus diimplementasikan oleh kelas yang propagator_block
berasal, priority_buffer
kelas juga mengambil alih link_target_notification
metode dan send_message
. Kelas ini priority_buffer
juga mendefinisikan dua metode pembantu publik, enqueue
dan dequeue
, dan metode pembantu privat, propagate_priority_order
.
Prosedur berikut menjelaskan cara mengimplementasikan priority_buffer
kelas.
Untuk menentukan kelas priority_buffer
Buat file header C++ dan beri nama
priority_buffer.h
. Atau, Anda dapat menggunakan file header yang sudah ada yang merupakan bagian dari proyek Anda.Di
priority_buffer.h
, tambahkan kode berikut.#pragma once #include <agents.h> #include <queue>
std
Di namespace, tentukan spesialisasi std::less dan std::greater yang bertindak pada konkurensi::objek pesan.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Kelas
priority_buffer
menyimpanmessage
objek dalampriority_queue
objek. Spesialisasi jenis ini memungkinkan antrean prioritas untuk mengurutkan pesan sesuai dengan prioritasnya. Prioritasnya adalah elemen pertama darituple
objek.concurrencyex
Di namespace layanan, deklarasikanpriority_buffer
kelas .namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Kelas
priority_buffer
ini berasal daripropagator_block
. Oleh karena itu, dapat mengirim dan menerima pesan. Kelaspriority_buffer
dapat memiliki beberapa target yang menerima pesan jenisType
. Ini juga dapat memiliki beberapa sumber yang mengirim pesan jenistuple<PriorityType, Type>
.Di bagian
private
priority_buffer
kelas, tambahkan variabel anggota berikut.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
Objek
priority_queue
menyimpan pesan masuk;queue
objek menyimpan pesan keluar. Objekpriority_buffer
dapat menerima beberapa pesan secara bersamaan;critical_section
objek menyinkronkan akses ke antrean pesan input.Di bagian ,
private
tentukan konstruktor salinan dan operator penugasan. Ini mencegahpriority_queue
agar objek tidak dapat ditetapkan.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Di bagian ,
public
tentukan konstruktor yang umum untuk banyak jenis blok pesan. Tentukan juga destruktor.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Di bagian ,
public
tentukan metodeenqueue
dandequeue
. Metode pembantu ini menyediakan cara alternatif untuk mengirim pesan ke dan menerima pesan daripriority_buffer
objek.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Di bagian ,
protected
tentukanpropagate_to_any_targets
metode .// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Metode ini
propagate_to_any_targets
mentransfer pesan yang berada di depan antrean input ke antrean output dan menyebarkan semua pesan dalam antrean output.Di bagian ,
protected
tentukanaccept_message
metode .// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Ketika blok target memanggil
accept_message
metode ,priority_buffer
kelas mentransfer kepemilikan pesan ke blok target pertama yang menerimanya. (Ini menyerupan perilakuunbounded_buffer
.)Di bagian ,
protected
tentukanreserve_message
metode .// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
Kelas
priority_buffer
mengizinkan blok target untuk memesan pesan ketika pengidentifikasi pesan yang disediakan cocok dengan pengidentifikasi pesan yang berada di bagian depan antrean. Dengan kata lain, target dapat memesan pesan jikapriority_buffer
objek belum menerima pesan tambahan dan belum menyebarluaskan pesan saat ini.Di bagian ,
protected
tentukanconsume_message
metode .// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Blok target memanggil untuk mentransfer kepemilikan pesan yang dipesannya
consume_message
.Di bagian ,
protected
tentukanrelease_message
metode .// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Target memblokir panggilan
release_message
untuk membatalkan reservasinya ke pesan.Di bagian ,
protected
tentukanresume_propagation
metode .// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Runtime panggilan
resume_propagation
setelah blok target mengonsumsi atau merilis pesan yang dipesan. Metode ini menyebarluaskan pesan apa pun yang berada dalam antrean output.Di bagian ,
protected
tentukanlink_target_notification
metode .// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
Variabel
_M_pReservedFor
anggota didefinisikan oleh kelas dasar,source_block
. Variabel anggota ini menunjuk ke blok target, jika ada, yang menyimpan reservasi ke pesan yang berada di depan antrean output. Runtime memanggillink_target_notification
ketika target baru ditautkan kepriority_buffer
objek. Metode ini menyebarluaskan pesan apa pun yang berada dalam antrean output jika tidak ada target yang memegang reservasi.Di bagian ,
private
tentukanpropagate_priority_order
metode .// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Metode ini menyebarluaskan semua pesan dari antrean output. Setiap pesan dalam antrean ditawarkan ke setiap blok target hingga salah satu blok target menerima pesan. Kelas
priority_buffer
mempertahankan urutan pesan keluar. Oleh karena itu, pesan pertama dalam antrean output harus diterima oleh blok target sebelum metode ini menawarkan pesan lain ke blok target.Di bagian ,
protected
tentukanpropagate_message
metode .// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Metode ini
propagate_message
memungkinkanpriority_buffer
kelas untuk bertindak sebagai penerima pesan, atau target. Metode ini menerima pesan yang ditawarkan oleh blok sumber yang disediakan dan menyisipkan pesan tersebut ke dalam antrean prioritas. Metode kemudianpropagate_message
secara asinkron mengirim semua pesan output ke blok target.Runtime memanggil metode ini saat Anda memanggil fungsi konkurensi::asend atau ketika blok pesan tersambung ke blok pesan lain.
Di bagian ,
protected
tentukansend_message
metode .// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Metode ini
send_message
menyerupanpropagate_message
. Namun mengirim pesan output secara sinkron alih-alih secara asinkron.Runtime memanggil metode ini selama operasi pengiriman sinkron, seperti saat Anda memanggil fungsi konkurensi::send .
Kelas priority_buffer
berisi kelebihan beban konstruktor yang khas dalam banyak jenis blok pesan. Beberapa overload konstruktor mengambil konkurensi::Scheduler atau konkurensi::ScheduleGroup objects, yang memungkinkan blok pesan dikelola oleh penjadwal tugas tertentu. Kelebihan konstruktor lainnya mengambil fungsi filter. Fungsi filter memungkinkan blok pesan menerima atau menolak pesan berdasarkan payload-nya. Untuk informasi selengkapnya tentang filter pesan, lihat Blok Pesan Asinkron. Untuk informasi selengkapnya tentang penjadwal tugas, lihat Penjadwal Tugas.
priority_buffer
Karena kelas memesan pesan berdasarkan prioritas dan kemudian berdasarkan urutan di mana pesan diterima, kelas ini paling berguna ketika menerima pesan secara asinkron, misalnya, ketika Anda memanggil fungsi konkurensi::asend atau ketika blok pesan terhubung ke blok pesan lain.
[Atas]
Contoh Lengkap
Contoh berikut menunjukkan definisi priority_buffer
lengkap kelas.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Contoh berikut secara bersamaan melakukan sejumlah asend
dan konkurensi::menerima operasi pada priority_buffer
objek.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Contoh ini menghasilkan output sampel berikut.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Kelas priority_buffer
memesan pesan terlebih dahulu berdasarkan prioritas lalu berdasarkan urutan penerimaan pesan. Dalam contoh ini, pesan dengan prioritas numerik yang lebih besar dimasukkan ke bagian depan antrean.
[Atas]
Mengompilasi Kode
Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempel definisi priority_buffer
kelas dalam file yang diberi nama priority_buffer.h
dan program pengujian dalam file yang diberi nama priority_buffer.cpp
lalu jalankan perintah berikut di jendela Prompt Perintah Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Baca juga
Panduan Runtime Konkurensi
Blok Pesan Asinkron
Fungsi Passing Pesan
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk