Instruktaż: Tworzenie bloku komunikatów niestandardowych
Niniejszy dokument opisuje sposób tworzenia komunikat niestandardowy typ bloku, który zamówień przychodzących wiadomości według priorytetu.
Chociaż typy bloku komunikatu wbudowane zapewniają szeroki zakres funkcji, można utworzyć własny typ bloku komunikatu i dostosować go do wymagań aplikacji.Opis typy bloku wbudowanych wiadomości, które są dostarczane przez asynchronicznego biblioteki agentów, zobacz Asynchroniczne blokuje wiadomości.
Wymagania wstępne
Przed rozpoczęciem tego instruktażu, przeczytaj następujące dokumenty:
Sekcje
Ten instruktaż zawiera następujące sekcje:
Projektowanie bloku komunikatów niestandardowych
Definiowanie klasy priority_buffer
Kompletny przykład
Projektowanie bloku komunikatów niestandardowych
Bloki komunikatów uczestniczyć w ustawie z wysyłaniem i odbieraniem wiadomości.Bloku komunikatu, który wysyła komunikaty jest znany jako bloku źródłowego.Bloku komunikatu, który odbiera wiadomości jest znany jako bloku docelowego.Blok wiadomość, która wysyła i odbiera wiadomości jest znany jako bloku propagator.Biblioteka agentów używa klasa abstrakcyjna concurrency::ISource do reprezentowania bloki źródła i klasa abstrakcyjna concurrency::ITarget do reprezentowania bloki docelowych.Blok komunikatów typy tego aktu jak źródeł pochodzić od ISource; blok komunikatów typy tego aktu jak cele pochodzić od ITarget.
Chociaż można czerpać Twój typ bloku komunikatu bezpośrednio z ISource i ITarget, biblioteka agentów definiuje trzy klasy podstawowej, które wykonują wiele funkcji, które są wspólne dla wszystkich typów bloku komunikatu, na przykład obsługa błędów i łączenie wiadomość blokuje razem w sposób bezpieczny współbieżności.Concurrency::source_block klasa pochodzi od ISource i wysyła wiadomości do innych bloków.Concurrency::target_block klasa pochodzi od ITarget i odbiera wiadomości od innych bloków.Concurrency::propagator_block klasa pochodzi od ISource i ITarget i wysyła wiadomości do innych bloków i odbiera wiadomości od innych bloków.Zaleca się, aby użyć tych trzech klas podstawowych do obsługi infrastruktury szczegóły tak, aby skupić się na zachowanie użytkownika bloku komunikatów.
source_block, target_block, I propagator_block klasy są szablony, które są parametryzowane na typ, który zarządza połączeniami lub łączy między źródłowym i docelowym bloki i na typ, który zarządza, w jaki sposób przetwarzania wiadomości.Biblioteka agentów definiuje dwa typy wykonujących zarządzania łącze concurrency::single_link_registry i concurrency::multi_link_registry.single_link_registry Klasy umożliwia bloku wiadomości mają być połączone w jedno źródło lub jeden obiekt docelowy.multi_link_registry Klasy umożliwia bloku komunikatów, być połączone z wielu źródeł lub wiele elementów docelowych.Biblioteka agentów definiuje jednej klasy, która wykonuje zarządzania komunikat concurrency::ordered_message_processor.ordered_message_processor Klasy umożliwia bloki komunikatów do przetwarzania wiadomości w kolejności, w jakiej je otrzymuje.
Aby lepiej zrozumieć, jak bloki komunikatów odnoszą się do ich źródeł i obiektów docelowych, rozważmy następujący przykład.Ten przykład przedstawia deklaracji concurrency::transformer klasy.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer Klasa pochodzi od propagator_blocki dlatego działa jako zarówno bloku źródłowego i docelowego bloku.Akceptuje komunikaty typu _Input i wysyła komunikaty typu _Output.transformer Określa klasę single_link_registry jako Menedżer łącza dla bloków docelowych i multi_link_registry jako Menedżer łącza dla bloków dowolnego źródła.W związku z tym transformer obiektu może zawierać maksymalnie jeden obiekt docelowy i nieograniczoną liczbę źródeł.
Klasa, która wynika z source_block musi implementować metody: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, i resume_propagation.Klasa, która wynika z target_block musi implementować propagate_message metody i opcjonalnie można zaimplementować send_message metody.Wynikające z propagator_block jest funkcjonalnie równoważne wynikające z obu source_block i target_block.
propagate_to_any_targets Wywoływana jest metoda w czasie wykonywania asynchronicznie lub synchronicznie przetwarzać komunikaty przychodzące i propagować wszystkie wiadomości wychodzące.accept_message Metoda jest wywoływana przez docelowe bloki, aby akceptował wiadomości.Wiele typów bloku wiadomości, takich jak unbounded_buffer, wysyłać wiadomości tylko do pierwszego elementu docelowego, który otrzyma go.W związku z tym transfery własności wiadomości do obiektu docelowego.Typy innych bloków komunikatów, takich jak concurrency::overwrite_buffer, oferują wiadomości do każdego z jej bloków docelowych.W związku z tym overwrite_buffer tworzy kopię wiadomości dla każdego z jej celów.
reserve_message, consume_message, release_message, I resume_propagation metody włączyć bloki komunikatów do uczestnictwa w wiadomości rezerwacji.Docelowe blokuje połączenia reserve_message w czasie, gdy są one oferowane wiadomości i rezerwacji wiadomości do późniejszego użycia.Po bloku docelowego rezerw wiadomości, może wywołać consume_message metoda zużyje wiadomości lub release_message metodę, aby anulować rezerwację.Tak jak w accept_message metody, wykonania consume_message można przetransferować własność wiadomości lub zwraca kopię wiadomości.Po bloku docelowego zużywa lub zwalnia zarezerwowane wiadomości, środowisko wykonawcze wywołuje resume_propagation metody.Zazwyczaj ta metoda nadal propagacji wiadomości, począwszy od następnej wiadomości w kolejce.
Wywołania runtime propagate_message metoda asynchronicznie transferu do bieżącej wiadomości z inny blok.send_message Przypomina metodę propagate_message, chyba że synchronicznie, zamiast asynchronicznie, wysyła wiadomość do bloków docelowych.Domyślne wykonania send_message odrzuca wszystkie wiadomości przychodzące.Środowisko wykonawcze nie wywołuje każdej z tych metod, jeśli wiadomość nie przechodzi funkcji opcjonalnych filtr skojarzony z bloku docelowego.Aby uzyskać więcej informacji na temat filtrów wiadomości, zobacz Asynchroniczne blokuje wiadomości.
Top
Definiowanie klasy priority_buffer
priority_buffer Klasy jest typ bloku niestandardowy komunikat, który pierwszy zamówień przychodzących wiadomości według priorytetu, a następnie według kolejności, w którym są odbierane wiadomości.priority_buffer Podobny do klasy concurrency::unbounded_buffer klasy, ponieważ posiada kolejki wiadomości, a także ponieważ działa jako źródło i bloku komunikatu docelowych i może mieć zarówno wielu źródeł i wiele elementów docelowych.Jednakże unbounded_buffer podstaw wiadomości propagacji tylko w kolejności, w którym otrzyma od jej źródeł wiadomości.
priority_buffer Klasy odbiera wiadomości typu std::tuple , które zawierają PriorityType i Type elementy.PriorityTypeodnosi się do typu, który przechowuje priorytetu każdej wiadomości; Typeodnosi się do części danych wiadomości.priority_buffer Klasy wysyła komunikaty typu Type.priority_buffer Klasy zarządza także dwie kolejki wiadomości: std::priority_queue obiektu dla wiadomości przychodzących i std::queue obiektu dla wiadomości wychodzących.Porządkowanie wiadomości według priorytetu jest przydatna, gdy priority_buffer obiekt otrzymuje wielu wiadomości jednocześnie lub w przypadku odebrania wielu wiadomości przed komunikaty są odczytywane przez konsumentów.
Oprócz metod siedmiu że klasa, która wynika z propagator_block musi implementować priority_buffer także klasy przesłonięcia link_target_notification i send_message metody.priority_buffer Klasy definiuje również dwie metody publiczne pomocnika, enqueue i dequeueoraz metody pomocnika prywatne, propagate_priority_order.
Poniższa procedura opisuje sposób nadawania priority_buffer klasy.
Do zdefiniowania klasy priority_buffer
Tworzenie pliku nagłówka C++ i nadaj mu nazwę priority_buffer.h.Alternatywnie można użyć istniejącego pliku nagłówka, który jest częścią projektu.
W priority_buffer.h, Dodaj następujący kod.
#pragma once #include <agents.h> #include <queue>
W std obszaru nazw, definiowanie dziedziny specjalizacji, z std::less i std::greater , działać na concurrency::message obiektów.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer Klasy sklepów message obiektów w priority_queue obiektu.Te typu specjalizacji włączyć priorytet kolejki do sortowania wiadomości według ich priorytetu.Priorytet jest pierwszym elementem tuple obiektu.
W concurrency obszaru nazw, zadeklarować priority_buffer klasy.
namespace concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer Klasa pochodzi od propagator_block.W związku z tym to może zarówno wysyłać i odbierać wiadomości.priority_buffer Klasa może mieć wiele elementów docelowych, które odbierają wiadomości typu Type.Może również mieć wiele źródeł, które wysyłają wiadomości typu tuple<PriorityType, Type>.
W private sekcji priority_buffer klasy, należy dodać następujące zmienne składowe.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
priority_queue Obiektu przechowuje wiadomości przychodzących; queue obiektu przechowuje wiadomości wychodzących.A priority_buffer obiektu można odebrania wielu wiadomości jednocześnie; critical_section obiektu synchronizuje dostęp do kolejki komunikatów wejściowych.
W private sekcji, definiowanie konstruktora kopii i operator przypisania.Zapobiega to priority_queue obiektów jest możliwa do przypisania.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
W public sekcji, definiowanie konstruktory, które są wspólne dla wielu typów bloku komunikatu.Również zdefiniować destruktor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
W public sekcji, definiować metody enqueue i dequeue.Te metody pomocnika stanowią alternatywny sposób wysyłać i odbierać wiadomości priority_buffer obiektu.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
W protected sekcji, należy zdefiniować propagate_to_any_targets metody.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets Metoda przenosi wiadomość, która znajduje się na wierzchu kolejki wejściowej do kolejki wyjściowej i propaguje wszystkie wiadomości w kolejce danych wyjściowych.
W protected sekcji, należy zdefiniować accept_message metody.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Gdy wywołuje bloku docelowego accept_message metody, priority_buffer klasy transfery własności wiadomości do pierwszego bloku docelowego, który akceptuje on.(Przypomina to zachowanie unbounded_buffer.)
W protected sekcji, należy zdefiniować reserve_message metody.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
priority_buffer Klasy pozwala bloku docelowego zarezerwować wiadomość, gdy identyfikator wiadomości dostarczone odpowiada identyfikator wiadomości, który znajduje się na początek kolejki.Innymi słowy, element docelowy może zarezerwować wiadomość, jeśli priority_buffer obiektu nie otrzymał jeszcze dodatkowe wiadomości i nie ma jeszcze propagowane poza bieżący.
W protected sekcji, należy zdefiniować consume_message metody.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Wywołuje bloku docelowego consume_message do przenoszenia własności wiadomość, którą on zarezerwowany.
W protected sekcji, należy zdefiniować release_message metody.
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Wywołuje bloku docelowego release_message do anulowania rezerwacji jej do wiadomości.
W protected sekcji, należy zdefiniować resume_propagation metody.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Wywołania runtime resume_propagation po bloku docelowego zużywa lub zwalnia zarezerwowane wiadomości.Ta metoda propaguje wszystkie wiadomości w kolejce danych wyjściowych.
W protected sekcji, należy zdefiniować link_target_notification metody.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor Zmienną jest zdefiniowany przez klasę podstawową source_block.Ta zmienna Członkowskie wskazuje bloku docelowego jeśli trzyma rezerwacji wiadomość, która znajduje się na wierzchu kolejki wyjściowej.Wywołania runtime link_target_notification gdy nowy element docelowy jest powiązany z priority_buffer obiektu.Ta metoda propaguje żadnych wiadomości w kolejce wyjście Jeśli żadne miejsce docelowe jest posiadających rezerwację.
W private sekcji, należy zdefiniować propagate_priority_order metody.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Ta metoda propaguje wszystkich wiadomości z kolejki wyjściowej.Każda wiadomość w kolejce jest oferowana każdego bloku docelowego dopóki jeden z bloków docelowych akceptuje wiadomości.priority_buffer Klasy zachowuje kolejności wiadomości wychodzących.Dlatego pierwszej wiadomości w kolejce wyjściowy musi zostać zaakceptowany przez bloku docelowego przed ta metoda daje inne wiadomości do bloków docelowych.
W protected sekcji, należy zdefiniować propagate_message metody.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message Metoda umożliwia priority_buffer klasy jako odbiorca wiadomości lub docelowych.Ta metoda odbiera wiadomość, która jest oferowany przez bloku źródłowego dostarczonego i wstawia tej wiadomości do kolejki priorytetowe.propagate_message Metoda asynchronicznie wysyła następnie wyświetlają wszystkie komunikaty bloki docelowych.
Środowisko wykonawcze wywołuje tej metody po wywołaniu concurrency::asend funkcji lub jeśli blok komunikatów jest podłączony do innych bloków komunikatów.
W protected sekcji, należy zdefiniować send_message metody.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message Przypomina metodę propagate_message.Jednakże wysyła komunikaty wyjściowe synchronicznie zamiast asynchronicznie.
Środowisko wykonawcze wywołuje tej metody podczas operacji synchronicznego wysyłania, takich jak po wywołaniu concurrency::send funkcji.
priority_buffer Klasa zawiera overloads konstruktora, które są typowe w wielu typów bloku komunikatu.Niektóre konstruktora overloads take concurrency::Scheduler lub concurrency::ScheduleGroup obiektów, które włączyć blok komunikatów zarządzane przez harmonogram zadań szczególnych.Inne overloads konstruktora podjąć funkcji filtru.Funkcje filtrowania włączyć bloki komunikatów zaakceptować lub odrzucić wiadomość z jego ładunku.Aby uzyskać więcej informacji na temat filtrów wiadomości, zobacz Asynchroniczne blokuje wiadomości.Aby uzyskać więcej informacji na temat planiści zadań zobacz Harmonogram zadań (współbieżności Runtime).
Ponieważ priority_buffer klasy zamówienia wiadomości według priorytetu, a następnie według kolejności, w jakiej wiadomości są odbierane, klasa ta jest najbardziej użyteczna po odebraniu wiadomości asynchronicznie, na przykład podczas wywołania concurrency::asend funkcji lub jeśli blok komunikatów jest podłączony do innych bloków komunikatów.
Top
Kompletny przykład
Poniższy przykład pokazuje pełną definicję priority_buffer klasy.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
W następującym przykładzie wykonywana jednocześnie wiele asend i concurrency::receive operacji na priority_buffer obiektu.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Ten przykład generuje następujące przykładowe dane wyjściowe.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
priority_buffer Klasy zamówienia najpierw wiadomości, priorytet, a następnie kolejność, w której otrzymuje wiadomości.W tym przykładzie wiadomości o priorytecie liczbowym większe są wstawiane do przodu kolejki.
Top
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub Wklej definicji priority_buffer klasy w pliku o nazwie priority_buffer.h i program badań w pliku o nazwie priority_buffer.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia usługi programu Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Zobacz też
Koncepcje
Asynchroniczne blokuje wiadomości
Funkcji przekazywania wiadomości