Wskazówki: tworzenie niestandardowego bloku komunikatów
Niniejszy dokument zawiera opis sposobu tworzenia komunikatów niestandardowy typ bloku, które porządkuje przychodzące wiadomości według priorytetu.
Chociaż wbudowane typy bloku komunikatu zapewniają szeroki zakres funkcji, można utworzyć własny typ bloku komunikatu i dostosować go do potrzeb aplikacji.Aby uzyskać informacje na temat opisu typów bloku wbudowanych wiadomości dostarczanych przez bibliotekę asynchronicznych agentów, zobacz Bloki komunikatów asynchronicznych.
Wymagania wstępne
Przed rozpoczęciem tego instruktażu, przeczytaj następujące dokumenty:
Sekcje
Ten instruktaż zawiera następujące sekcje:
Projektowanie bloku komunikatów niestandardowych
Definiowanie obiektu bufor_priorytetowy klasy
Kompletny przykład
Projektowanie bloku komunikatów niestandardowych
Bloki komunikatów uczestniczą w akcie wysyłania i odbierania wiadomości.Blok komunikatów, który wysyła wiadomości jest znany jako blok źródłowy.Blok komunikatów, który odbiera wiadomości jest znany jako blok docelowy.Blok komunikatów, który wysyła i odbiera wiadomości jest znany jako blok propagatora.Klasa abstrakcyjna używa biblioteki agentów concurrency::ISource do reprezentowania bloków źródła i klasa abstrakcyjna concurrency::ITarget do reprezentowania bloków docelowych.Typy bloków wiadomości, które działają jako obiekty źródłowe wywodzą się z ISource; typy bloków wiadomości, które działają jako obiekty docelowe wywodzą się z ITarget.
Chociaż można uzyskać typ bloku wiadomości bezpośrednio z ISource i ITarget, biblioteka agentów definiuje trzy klasy bazowe, które wykonują wiele funkcji, które są wspólne dla wszystkich typów bloku komunikatu, na przykład obsługa błędów i łączenie bloków wiadomości razem w sposób bezpieczny dla współbieżności.Concurrency::source_block klasa pochodzi od ISource i wysyła wiadomości do innych bloków.Concurrency::target_block klasa pochodzi od ITarget i odbiera wiadomości z innych bloków.Concurrency::propagator_block klasa pochodzi od ISource i ITarget i wysyła wiadomości do innych bloków i otrzymuje komunikaty z innych bloków.Zaleca się używanie tych trzech klas bazowych do obsługi infrastruktury szczegółowej, aby skoncentrować się na zachowaniu bloku komunikatu.
source_block, target_block, i propagator_block klasy są szablonami, które są parametryzowane na typ, który zarządza połączeniami lub linkami, między źródłowym i docelowym bloki i na typach, którymi zarządza, jak wiadomości są przetwarzane.Biblioteka agentów definiuje dwa typy, które wykonują łącza zarządzania, concurrency::single_link_registry i concurrency::multi_link_registry.single_link_registry klasa umożliwia blokowi komunikatu połączenie z jednym źródłem i jednym elementem docelowym.multi_link_registry klasa umożliwia blokowi komunikatu połączenie z wieloma źródłami lub wieloma elementami docelowymi.Biblioteki agentów definiuje jedna klasa, która wykonuje zarządzania wiadomość concurrency::ordered_message_processor.ordered_message_processor klasy umożliwiają blokadom komunikatów przetwarzanie wiadomości w kolejności, w której otrzymuje je.
Aby lepiej zrozumieć, jak bloki komunikatów odnoszą się do ich źródła i obiektów docelowych, rozważmy następujący przykład.W tym przykładzie przedstawiono deklarację concurrency::transformer klasy.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer klasa pochodzi od propagator_block, a zatem działa jako blok źródłowy i blok docelowy.Akceptuje ona wiadomości typu _Input i wysyła komunikaty typu _Output.transformer określa klasę single_link_registry jako menedżera połączeń dla wszelkich bloków docelowych i multi_link_registry jako menedżera połączeń dla bloków dowolnego źródła.W związku z tym transformer obiektu może zawierać maksymalnie jeden obiekt docelowy i nieograniczoną liczbę źródeł.
Klasa, która pochodzi z source_block musi zaimplementować sześć metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message i resume_propagation.Klasa, która pochodzi z target_block musi zaimplementować metodę propagate_message i może opcjonalnie zaimplementować metodę send_message.Pochodząca z propagator_block jest funkcjonalnie równoważna pochodzeniu z obu source_block i target_block.
propagate_to_any_targets Metoda jest wywoływana w czasie wykonywania to asynchronicznego lub synchronicznego przetwarzania wszelkich komunikatów przychodzących i propagowania wszystkich wiadomości wychodzących.accept_message metoda jest wywoływana przez bloki miejsce docelowego, aby akceptować wiadomości.Wiele typów bloku komunikatu, takich jak unbounded_buffer, wysyła wiadomości tylko do pierwszego obiektu docelowego, który mógłby ją otrzymać.W związku z tym przenosi własność wiadomość do obiektu docelowego.Typy innych bloków komunikatów, takich jak concurrency::overwrite_buffer, oferują wiadomości do każdego z bloków jego miejsca docelowego.W związku z tym overwrite_buffer tworzy kopię wiadomości dla każdego z jego elementów docelowych.
The reserve_message, consume_message, release_message, and resume_propagation metody umożliwiają blokom komunikatów wzięcia udziału w wiadomości rezerwacji.Miejsce docelowe blokuje połączenia reserve_message metody, gdy muszą zaoferować komunikat lub zarezerwować komunikat do późniejszego użytku.Po zarezerwowaniu wiadomości przez blok docelowy, może on wywołać metodę consume_message, aby zużyć tę wiadomość lub metodę release_message, aby anulować rezerwację.Podobnie jak w przypadku metody accept_message wykonanie consume_message może przenieść prawo własności wiadomości lub zwrócić kopię wiadomości.Po zużyciu lub zwolnieniu zarezerwowanej wiadomości przez blok docelowy, środowisko uruchomieniowe wywołuje metodę resume_propagation.Zazwyczaj ta metoda nadal propaguje wiadomość, począwszy od następnej wiadomości w kolejce.
Wywołanie środowiska uruchomieniowego propagate_message metody asynchronicznej przenosi wiadomości z innego bloku do bieżącego.send_message Metoda przypomina propagate_message, z tym wyjątkiem, że synchronicznie, zamiast asynchronicznie, wysyła komunikat do bloków docelowych.Domyślna implementacja z send_message odrzuca wszystkie wiadomości przychodzące.Środowisko wykonawcze nie wymaga jednej z tych metod, jeśli wiadomość nie przejdzie funkcji opcjonalnych filtru skojarzonego z blokiem docelowym.Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Bloki komunikatów asynchronicznych.
[U góry]
Definiowanie obiektu bufor_priorytetowy klasy
priority_buffer klasa jest niestandardowym typem bloku wiadomości, która porządkuje wiadomości przychodzące najpierw według priorytetu, a następnie według kolejności otrzymywania wiadomości.priority_buffer klasa przypomina concurrency::unbounded_buffer klasy, ponieważ posiada kolejkę wiadomości, a także ponieważ działa jako źródło i miejsce docelowe bloku komunikatów i może mieć wiele źródeł i wiele elementów docelowych.Jednakże unbounded_buffer opiera propagację wiadomości tylko na kolejności, w której otrzymuje wiadomości od jej źródeł.
priority_buffer klasa otrzymuje komunikaty typu std::tuple , które zawierają PriorityType i Type elementy.PriorityType odnosi się do typu, który posiada priorytet każdej wiadomości; Type odnosi się do części danych wiadomości.priority_buffer Klasy wysyła komunikaty typu Type.priority_buffer klasa zarządza także dwoma kolejkami wiadomości: std::priority_queue obiektu dla wiadomości przychodzących i std::queue obiektu dla wiadomości wychodzących.Porządkowanie wiadomości według priorytetu jest przydatne, gdy priority_buffer obiekt otrzymuje wiele wiadomości jednocześnie lub kiedy otrzymuje wiele wiadomości zanim jakiekolwiek komunikaty są odczytywane przez konsumentów.
Oprócz siedmiu metod, które klasa pochodząca z propagator_block musi implementować, klasa priority_buffer również zastępuje metody link_target_notification i send_message.priority_buffer Klasa definiuje również dwie metody pomocnika publicznego, enqueue i dequeueoraz metody pomocnika prywatnego, propagate_priority_order.
Poniższa procedura opisuje sposób implementacji priority_buffer klasy.
Aby zdefiniować klasę priority_buffer
Utwórz plik nagłówka C++ i nadaj mu nazwę priority_buffer.h.Alternatywnie można użyć istniejącego pliku nagłówka, który jest częścią projektu.
W priority_buffer.h dodaj następujący kod.
#pragma once #include <agents.h> #include <queue>
W obszarze nazw std, zdefiniuj specjalizacje std::less i std::greater, które działają na obiektach concurrency::message.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer klasa przechowuje message obiekty w priority_queue obiekcie.Specjalizacje tego typu włączają kolejkę priorytetów posortowanych wiadomości według ich priorytetu.Priorytet jest pierwszym elementem tuple obiektu.
W obszarze nazw concurrencyex, zadeklaruj klasę priority_buffer.
namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Klasa priority_buffer pochodzi od propagator_block.W związku z tym może zarówno wysyłać i odbierać wiadomości.priority_buffer klasa może mieć wiele elementów docelowych, które odbierają komunikaty typu Type.Może też mieć wiele źródeł, które wysyłają wiadomości typu tuple<PriorityType, Type>.
W części private klasy priority_buffer dodaj następujące zmienne członkowskie.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
priority_queue obiekt zawiera wiadomości przychodzące; queue obiekt zawiera wiadomości wychodzące.Obiekt priority_buffer może odbierać wiele wiadomości równocześnie; obiekt critical_section synchronizuje dostęp do kolejki komunikatów wejściowych.
W części private określ konstruktor kopiujący i operator przypisania.Zapobiega to priority_queue przypisaniu obiektów.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
W sekcji public zdefiniuj konstruktory, które są wspólne dla wielu typów bloku komunikatu.Również określa destruktor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
W sekcji public określ metody enqueue i dequeue.Te metody pomocnika zapewniają alternatywny sposób wysyłania wiadomości do i odbierania wiadomości z priority_buffer obiektu.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
W sekcji protected określ metodę propagate_to_any_targets.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets Metoda przesyła wiadomości, który znajdują się na wierzchu kolejki wejściowej do wyjścia i rozprzestrzeniają wszystkie wiadomości w kolejce danych wyjściowych.
W sekcji protected określ metodę accept_message.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Kiedy blick docelowy wywołuje accept_message metodę, priority_buffer klasa przenosi własności wiadomości do pierwszego bloku docelowego, która go akceptuje. (Przypomina to zachowanie unbounded_buffer).
W sekcji protected określ metodę reserve_message.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
priority_buffer klasy zezwala, aby blok docelowy zarezerwował komunikat, gdy dostarczony identyfikator wiadomości pasuje do identyfikatora wiadomości, który znajduje się na początku kolejki.Innymi słowy, obiekt docelowy może zarezerwować wiadomość, jeśli obiekt priority_buffer nie otrzymał jeszcze dodatkowej wiadomości i nie rozprzestrzenił się jeszcze na bieżącą wiadomość.
W sekcji protected określ metodę consume_message.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Blok docelowy wywołuje consume_message, aby przenieść prawo własności wiadomości zastrzeżonej.
W sekcji protected określ metodę release_message.
// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Blok docelowy wywołuje release_message, aby anulować swoje zastrzeżenia do wiadomości.
W sekcji protected określ metodę resume_propagation.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Wywołania runtime resume_propagation po bloku docelowym zużywa lub zwalnia zarezerwowaną wiadomość.Ta metoda propaguje wszelkie wiadomości z kolejki wyjściowej.
W sekcji protected określ metodę link_target_notification.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor zmienna członka jest zdefiniowany przez klasę podstawową source_block.Ta zmienna członka wskazuje blok docelowy, jeśli taki posiada rezerwację wiadomość, która znajduje się na wierzchu kolejki wyjściowej.Wywołania środowiska uruchomieniowego link_target_notification kiedy nowy obiekt docelowy jest połączony z priority_buffer obiektem.Ta metoda rozdaje komunikaty, które znajdują się w kolejce wyjścia, jeśli żadne miejsce docelowe nie posiada rezerwacji.
W sekcji private określ metodę propagate_priority_order.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Ta metoda propaguje wszystkie wiadomości z kolejki wyjściowej.Każda wiadomość w kolejce jest oferowana do każdego bloku docelowego aż jeden z bloków docelowych zaakceptuje wiadomość.priority_buffer klasa zachowuje kolejności wiadomości wychodzących.W związku z tym pierwsza wiadomości w kolejce wyjściowej musi zostać zaakceptowana przez bloku docelowy, zanim ta metoda da inne wiadomości do bloków docelowych.
W sekcji protected określ metodę propagate_message.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message metoda umożliwia priority_buffer klasie działania jako odbiorcom wiadomości lub celom.Ta metoda odbiera komunikat, który jest oferowany przez zapewniony blok źródłowy i wstawia tę wiadomość w kolejce priorytetu.propagate_message Metoda następnie asynchronicznie wysyła wszystkie wyjściowe komunikaty do bloków docelowych.
Środowisko wykonawcze wywołuje tę metodę podczas wywoływania concurrency::asend funkcji lub gdy blok komunikatów jest podłączony do innych bloków komunikatów.
W sekcji protected określ metodę send_message.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
The send_message metoda przypomina propagate_message.Jednak wysyła wiadomości danych wyjściowych synchronicznie zamiast asynchronicznie.
Środowisko wykonawcze wywołuje tę metodę podczas synchronicznej operacji, na przykład wywołując concurrency::send funkcję.
priority_buffer klasa zawiera konstruktora przeciążeń, który jest typowy w wielu typów komunikatów bloku.Niektóre Konstruktory przeciążeń przybierają concurrency::Scheduler lub concurrency::ScheduleGroup obiekty, które umożliwiają blok komunikatów, które mają być zarządzane przez harmonogram zadań szczególnych.Inne Konstruktory przeładowania mają funkcję filtru.Funkcje filtrowania umożliwiają blokom wiadomości akceptowanie lub odrzucanie wiadomość na podstawie ładunku.Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Bloki komunikatów asynchronicznych.Aby uzyskać więcej informacji dotyczących harmonogramów zadań, zobacz Harmonogram zadań (współbieżność środowiska wykonawczego).
Ponieważ klasa priority_buffer zamawia wiadomości według priorytetu, a następnie według kolejności, w której wiadomości są odbierane, klasa ta jest najbardziej użyteczna po odebraniu wiadomości asynchronicznie, na przykład podczas wywoływania funkcji concurrency::asend lub gdy blok komunikatów jest podłączony do innych bloków komunikatów.
[U góry]
Kompletny przykład
Poniższy przykład pokazuje kompletną definicję klasy priority_buffer.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Następujący przykład wykonuje jednocześnie wiele asend i concurrency::receive operacji na priority_buffer obiekcie.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Ten przykład generuje następujące przykładowe wyniki.
priority_buffer klasy zszereguje wiadomości po raz pierwszy według priorytetu, a następnie według kolejności, w której otrzymuje wiadomości.W tym przykładzie, wiadomości o większym priorytecie liczbowym są wstawiane do przodu kolejki.
[U góry]
Kompilowanie kodu
Kopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej definicję klasy priority_buffer w pliku o nazwie priority_buffer.h, a program badań w pliku o nazwie priority_buffer.cpp, a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Zobacz też
Koncepcje
Bloki komunikatów asynchronicznych
Funkcje przekazywania komunikatów