Wskazówki: tworzenie niestandardowego bloku komunikatów
W tym dokumencie opisano sposób tworzenia niestandardowego typu bloku komunikatów, który porządkuje komunikaty przychodzące według priorytetu.
Mimo że wbudowane typy bloków komunikatów zapewniają szeroką gamę funkcji, można utworzyć własny typ bloku komunikatów i dostosować go tak, aby spełniał wymagania aplikacji. Opis wbudowanych typów bloków komunikatów udostępnianych przez bibliotekę agentów asynchronicznych znajduje się w temacie Asynchroniczne bloki komunikatów.
Wymagania wstępne
Przed rozpoczęciem tego przewodnika zapoznaj się z następującymi dokumentami:
Sekcje
Ten przewodnik zawiera następujące sekcje:
Projektowanie niestandardowego bloku komunikatów
Bloki komunikatów uczestniczą w akcie wysyłania i odbierania komunikatów. Blok komunikatów, który wysyła komunikaty, jest nazywany blokiem źródłowym. Blok komunikatów odbierający komunikaty jest nazywany blokiem docelowym. Blok komunikatów, który zarówno wysyła, jak i odbiera komunikaty, jest nazywany blokiem propagacji. Biblioteka agentów używa abstrakcyjnej współbieżności klasy ::ISource do reprezentowania bloków źródłowych i abstrakcyjnej współbieżności klasy ::ITarget do reprezentowania bloków docelowych. Typy bloków komunikatów, które działają jako źródła pochodzące z ISource
; typy bloków komunikatów, które działają jako obiekty docelowe pochodzące z ITarget
.
Chociaż typ bloku komunikatów można uzyskać bezpośrednio z ISource
elementów i ITarget
, biblioteka agentów definiuje trzy klasy podstawowe, które wykonują większość funkcji, które są wspólne dla wszystkich typów bloków komunikatów, na przykład obsługi błędów i łączenia bloków komunikatów w bezpieczny sposób współbieżności. Klasa concurrency::source_block pochodzi z ISource
i wysyła komunikaty do innych bloków. Klasa concurrency::target_block pochodzi z ITarget
innych bloków i odbiera komunikaty z innych bloków. Klasa concurrency::p ropagator_block pochodzi z ISource
i ITarget
wysyła komunikaty do innych bloków i odbiera komunikaty z innych bloków. Zalecamy użycie tych trzech klas bazowych do obsługi szczegółów infrastruktury, aby można było skoncentrować się na zachowaniu bloku komunikatów.
Klasy source_block
, target_block
i propagator_block
to szablony, które są sparametryzowane w typie, który zarządza połączeniami lub łączami między blokami źródłowymi i docelowymi oraz typem, który zarządza sposobem przetwarzania komunikatów. Biblioteka agentów definiuje dwa typy, które wykonują zarządzanie łączami, współbieżność::single_link_registry i współbieżność::multi_link_registry. Klasa single_link_registry
umożliwia połączenie bloku komunikatów z jednym źródłem lub z jednym obiektem docelowym. Klasa multi_link_registry
umożliwia łączenie bloku komunikatów z wieloma źródłami lub wieloma obiektami docelowymi. Biblioteka agentów definiuje jedną klasę, która wykonuje zarządzanie komunikatami, współbieżność::ordered_message_processor. Klasa ordered_message_processor
umożliwia blokowanie komunikatów przetwarzania komunikatów w kolejności ich odebrania.
Aby lepiej zrozumieć, jak bloki komunikatów odnoszą się do ich źródeł i celów, rozważ poniższy przykład. W tym przykładzie pokazano deklarację klasy concurrency::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Klasa transformer
pochodzi z propagator_block
klasy , a zatem działa zarówno jako blok źródłowy, jak i jako blok docelowy. Akceptuje komunikaty typu _Input
i wysyła komunikaty typu _Output
. Klasa transformer
określa single_link_registry
jako menedżera linków dla dowolnych bloków docelowych i multi_link_registry
jako menedżera linków dla dowolnych bloków źródłowych. transformer
W związku z tym obiekt może mieć maksymalnie jeden obiekt docelowy i nieograniczoną liczbę źródeł.
Klasa pochodząca z source_block
klasy musi implementować sześć metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message i resume_propagation. Klasa pochodząca z target_block
klasy musi implementować metodę propagate_message i opcjonalnie może zaimplementować metodę send_message . Wyprowadzanie z propagator_block
elementu jest funkcjonalnie równoważne wyprowadzeniu zarówno z , jak source_block
i target_block
.
Metoda propagate_to_any_targets
jest wywoływana przez środowisko uruchomieniowe w celu asynchronicznego lub synchronicznego przetwarzania wszystkich przychodzących komunikatów i propagacji wszystkich komunikatów wychodzących. Metoda jest wywoływana accept_message
przez bloki docelowe w celu akceptowania komunikatów. Wiele typów bloków komunikatów, takich jak unbounded_buffer
, wysyła komunikaty tylko do pierwszego obiektu docelowego, który go otrzyma. W związku z tym przenosi własność komunikatu do elementu docelowego. Inne typy bloków komunikatów, takie jak współbieżność::overwrite_buffer, oferują komunikaty do każdego z bloków docelowych. overwrite_buffer
W związku z tym tworzy kopię komunikatu dla każdego z jego obiektów docelowych.
Metody reserve_message
, , release_message
consume_message
i resume_propagation
umożliwiają blokom komunikatów uczestnictwo w rezerwacji komunikatów. Bloki docelowe wywołają metodę reserve_message
, gdy są one oferowane komunikat i muszą zarezerwować komunikat do późniejszego użycia. Po zarezerwowaniu komunikatu przez blok docelowy może wywołać consume_message
metodę , aby użyć tego komunikatu lub release_message
metody w celu anulowania rezerwacji. Podobnie jak w przypadku accept_message
metody, implementacja consume_message
polecenia może przenieść własność komunikatu lub zwrócić kopię komunikatu. Gdy blok docelowy zużywa lub zwalnia komunikat zarezerwowany, środowisko uruchomieniowe wywołuje metodę resume_propagation
. Zazwyczaj ta metoda kontynuuje propagację komunikatów, zaczynając od następnego komunikatu w kolejce.
Środowisko uruchomieniowe wywołuje metodę propagate_message
w celu asynchronicznego transferu komunikatu z innego bloku do bieżącego. Metoda send_message
przypomina propagate_message
metodę , z tą różnicą, że synchronicznie, a nie asynchronicznie, wysyła komunikat do bloków docelowych. Domyślna implementacja send_message
odrzuca wszystkie komunikaty przychodzące. Środowisko uruchomieniowe nie wywołuje żadnej z tych metod, jeśli komunikat nie przekazuje opcjonalnej funkcji filtru skojarzonej z blokiem docelowym. Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Asynchroniczne bloki komunikatów.
[Top]
Definiowanie klasy priority_buffer
Klasa priority_buffer
jest niestandardowym typem bloku komunikatów, który porządkuje komunikaty przychodzące najpierw według priorytetu, a następnie według kolejności odbierania komunikatów. Klasa priority_buffer
przypomina klasę concurrency::unbounded_buffer , ponieważ przechowuje kolejkę komunikatów, a także dlatego, że działa zarówno jako źródło, jak i docelowy blok komunikatów i może mieć zarówno wiele źródeł, jak i wiele obiektów docelowych. unbounded_buffer
Jednak propagacja komunikatów opiera się tylko na kolejności, w której odbiera komunikaty ze swoich źródeł.
Klasa priority_buffer
odbiera komunikaty typu std::tuple , które zawierają PriorityType
elementy i Type
. PriorityType
odnosi się do typu, który ma priorytet każdego komunikatu; Type
odnosi się do części danych komunikatu. Klasa priority_buffer
wysyła komunikaty typu Type
. Klasa priority_buffer
zarządza również dwiema kolejkami komunikatów: obiekt std::p riority_queue dla komunikatów przychodzących i obiekt std::queue dla komunikatów wychodzących. Porządkowanie komunikatów według priorytetu jest przydatne, gdy priority_buffer
obiekt odbiera wiele komunikatów jednocześnie lub gdy odbiera wiele komunikatów, zanim jakiekolwiek komunikaty będą odczytywane przez użytkowników.
Oprócz siedmiu metod, które klasa pochodząca z propagator_block
klasy musi implementować, priority_buffer
klasa również zastępuje link_target_notification
metody i send_message
. Klasa priority_buffer
definiuje również dwie publiczne metody pomocnika i enqueue
, i dequeue
metodę prywatnego pomocnika . propagate_priority_order
Poniższa procedura opisuje sposób implementowania priority_buffer
klasy.
Aby zdefiniować klasę priority_buffer
Utwórz plik nagłówka języka C++ i nadaj mu
priority_buffer.h
nazwę . Alternatywnie możesz użyć istniejącego pliku nagłówka, który jest częścią projektu.W
priority_buffer.h
pliku dodaj następujący kod.#pragma once #include <agents.h> #include <queue>
W przestrzeni nazw zdefiniuj
std
specjalizacje std::less i std::greater , które działają na obiektach concurrency::message .namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Klasa
priority_buffer
przechowujemessage
obiekty wpriority_queue
obiekcie. Specjalizacje tego typu umożliwiają kolejki priorytetów sortowanie komunikatów zgodnie z ich priorytetem. Priorytetem jest pierwszy elementtuple
obiektu.W przestrzeni nazw zadeklaruj
concurrencyex
klasępriority_buffer
.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Klasa
priority_buffer
pochodzi z klasypropagator_block
. W związku z tym może wysyłać i odbierać komunikaty. Klasapriority_buffer
może mieć wiele obiektów docelowych, które odbierają komunikaty typuType
. Może również mieć wiele źródeł, które wysyłają komunikaty typutuple<PriorityType, Type>
.private
W sekcjipriority_buffer
klasy dodaj następujące zmienne składowe.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
Obiekt
priority_queue
przechowuje komunikaty przychodzące;queue
obiekt przechowuje komunikaty wychodzące. Obiektpriority_buffer
może odbierać wiele komunikatów jednocześnie;critical_section
obiekt synchronizuje dostęp do kolejki komunikatów wejściowych.W sekcji zdefiniuj
private
konstruktor kopiujący i operator przypisania.priority_queue
Zapobiega to przypisywaniu obiektów.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
W sekcji zdefiniuj
public
konstruktory wspólne dla wielu typów bloków komunikatów. Zdefiniuj również destruktor.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
W sekcji zdefiniuj
public
metodyenqueue
idequeue
. Te metody pomocnicze zapewniają alternatywny sposób wysyłania komunikatów do obiektu i odbieraniapriority_buffer
ich z obiektu.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
W sekcji zdefiniuj
protected
metodępropagate_to_any_targets
.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Metoda
propagate_to_any_targets
przenosi komunikat znajdujący się przed kolejką wejściową do kolejki wyjściowej i propaguje wszystkie komunikaty w kolejce wyjściowej.W sekcji zdefiniuj
protected
metodęaccept_message
.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Gdy blok docelowy wywołuje metodę
accept_message
,priority_buffer
klasa przenosi własność komunikatu do pierwszego bloku docelowego, który go akceptuje. (Przypomina to zachowanie .unbounded_buffer
)W sekcji zdefiniuj
protected
metodęreserve_message
.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
Klasa
priority_buffer
zezwala na zablokowanie docelowego bloku, aby zarezerwować komunikat, gdy podany identyfikator komunikatu jest zgodny z identyfikatorem komunikatu, który znajduje się przed kolejką. Innymi słowy, obiekt docelowy może zarezerwować komunikat, jeślipriority_buffer
obiekt nie otrzymał jeszcze dodatkowego komunikatu i nie rozpropagował jeszcze bieżącego komunikatu.W sekcji zdefiniuj
protected
metodęconsume_message
.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Blok docelowy wywołuje metodę
consume_message
przeniesienia własności komunikatu, który został zarezerwowany.W sekcji zdefiniuj
protected
metodęrelease_message
.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Docelowe wywołania
release_message
bloku, aby anulować rezerwację do komunikatu.W sekcji zdefiniuj
protected
metodęresume_propagation
.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Środowisko uruchomieniowe wywołuje
resume_propagation
po bloku docelowym zużywa lub zwalnia komunikat zarezerwowany. Ta metoda propaguje wszystkie komunikaty, które znajdują się w kolejce wyjściowej.W sekcji zdefiniuj
protected
metodęlink_target_notification
.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
Zmienna
_M_pReservedFor
składowa jest definiowana przez klasę bazową .source_block
Ta zmienna składowa wskazuje blok docelowy, jeśli istnieje, który przechowuje rezerwację do komunikatu, który znajduje się przed kolejką wyjściową. Środowisko uruchomieniowe jest wywoływanelink_target_notification
, gdy nowy obiekt docelowy jest połączony z obiektempriority_buffer
. Ta metoda propaguje wszystkie komunikaty, które znajdują się w kolejce wyjściowej, jeśli żaden element docelowy nie przechowuje rezerwacji.W sekcji zdefiniuj
private
metodępropagate_priority_order
.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Ta metoda propaguje wszystkie komunikaty z kolejki wyjściowej. Każdy komunikat w kolejce jest oferowany do każdego bloku docelowego, dopóki jeden z bloków docelowych nie zaakceptuje komunikatu. Klasa
priority_buffer
zachowuje kolejność wychodzących komunikatów. W związku z tym pierwszy komunikat w kolejce wyjściowej musi zostać zaakceptowany przez blok docelowy, zanim ta metoda wyświetli dowolny inny komunikat do bloków docelowych.W sekcji zdefiniuj
protected
metodępropagate_message
.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Metoda
propagate_message
umożliwiapriority_buffer
klasie działanie jako odbiornik komunikatu lub element docelowy. Ta metoda odbiera komunikat oferowany przez podany blok źródłowy i wstawia ten komunikat do kolejki priorytetu.propagate_message
Metoda następnie asynchronicznie wysyła wszystkie komunikaty wyjściowe do bloków docelowych.Środowisko uruchomieniowe wywołuje tę metodę podczas wywoływania funkcji concurrency::asend lub gdy blok komunikatów jest połączony z innymi blokami komunikatów.
W sekcji zdefiniuj
protected
metodęsend_message
.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Metoda
send_message
przypomina metodępropagate_message
. Jednak wysyła komunikaty wyjściowe synchronicznie zamiast asynchronicznie.Środowisko uruchomieniowe wywołuje tę metodę podczas synchronicznej operacji wysyłania, na przykład podczas wywoływania funkcji concurrency::send .
Klasa priority_buffer
zawiera przeciążenia konstruktora typowe dla wielu typów bloków komunikatów. Niektóre przeciążenia konstruktora przyjmują współbieżność::Scheduler lub współbieżność::ScheduleGroup obiekty, które umożliwiają zarządzanie blokiem komunikatów przez określony harmonogram zadań. Inne przeciążenia konstruktora przyjmują funkcję filtru. Funkcje filtrowania umożliwiają blokom komunikatów akceptowanie lub odrzucanie komunikatu na podstawie ładunku. Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Asynchroniczne bloki komunikatów. Aby uzyskać więcej informacji na temat harmonogramów zadań, zobacz Harmonogram zadań.
priority_buffer
Ponieważ klasy porządkują komunikaty według priorytetu, a następnie według kolejności odbierania komunikatów, ta klasa jest najbardziej przydatna, gdy odbiera komunikaty asynchronicznie, na przykład podczas wywoływania funkcji concurrency::asend lub gdy blok komunikatów jest połączony z innymi blokami komunikatów.
[Top]
Kompletny przykład
W poniższym przykładzie przedstawiono pełną definicję priority_buffer
klasy.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Poniższy przykład współbieżnie wykonuje wiele asend
operacji współbieżności::receive na priority_buffer
obiekcie.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
W tym przykładzie są generowane następujące przykładowe dane wyjściowe.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Klasa priority_buffer
porządkuje komunikaty najpierw według priorytetu, a następnie według kolejności, w której odbiera komunikaty. W tym przykładzie komunikaty z większym priorytetem liczbowym są wstawiane do przodu kolejki.
[Top]
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej definicję priority_buffer
klasy w pliku o nazwie i programie testowym w pliku o nazwie priority_buffer.h
priority_buffer.cpp
, a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Zobacz też
Środowisko uruchomieniowe współbieżności — wskazówki
Bloki komunikatów asynchronicznych
Funkcje przekazywania komunikatów