Exemplarische Vorgehensweise: Erstellen eines benutzerdefinierten Nachrichtenblocks
In diesem Dokument wird beschrieben, wie ein benutzerdefinierter Nachrichtenblocktyp erstellt wird, um eingehende Nachrichten nach Priorität zu sortieren.
Obwohl die integrierten Nachrichtenblocktypen eine breite Palette von Funktionen bereitstellen, können Sie auch eigene Nachrichtenblocktypen erstellen und anpassen, um die Anforderungen Ihrer Anwendung zu erfüllen. Eine Beschreibung der integrierten Nachrichtenblocktypen, die von der Asynchronen Agents-Bibliothek bereitgestellt werden, finden Sie unter "Asynchrone Nachrichtenblöcke".
Voraussetzungen
Lesen Sie die folgenden Dokumente, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen:
Abschnitte
Diese exemplarische Vorgehensweise enthält folgende Abschnitte:
Entwerfen eines benutzerdefinierten Nachrichtenblocks
Nachrichtenblöcke sind am Senden und Empfangen von Nachrichten beteiligt. Ein Nachrichtenblock, der Nachrichten sendet, wird als Quellblock bezeichnet. Ein Nachrichtenblock, der Nachrichten empfängt, wird als Zielblock bezeichnet. Ein Nachrichtenblock, der Nachrichten sendet und empfängt, wird als Verteilungsblock bezeichnet. Die Agents-Bibliothek verwendet die abstrakte Klassenkoncurrency ::ISource zum Darstellen von Quellblöcken und der abstrakten Klassenkoncurrency ::ITarget zur Darstellung von Zielblöcken. Nachrichtenblocktypen, die als Quelle dienen, werden von der ISource
-Klasse abgeleitet, während Nachrichtenblocktypen, die als Ziel dienen, von der ITarget
-Klasse abgeleitet werden.
Der Nachrichtenblocktyp kann prinzipiell unmittelbar von ISource
und ITarget
abgeleitet werden. Die Agents Library definiert jedoch drei Basisklassen, deren Funktionalität weitestgehend der aller Nachrichtenblocktypen entspricht. Beispiel: parallelitätssicheres Behandeln von Fehlern und parallelitätssicheres Verbinden von Nachrichtenblöcken. Die Parallelität::source_block Klasse wird von anderen Blöcken abgeleitet ISource
und sendet Nachrichten an andere Blöcke. Die Parallelität::target_block Klasse wird von anderen Blöcken abgeleitet ITarget
und empfängt Nachrichten. Die Parallelität::p ropagator_block-Klasse wird von anderen Blöcken abgeleitet ISource
und ITarget
sendet Nachrichten an andere Blöcke und empfängt Nachrichten von anderen Blöcken. Es wird empfohlen, Infrastrukturdetails mit diesen drei Basisklassen zu behandeln, sodass Sie sich auf das Verhalten des Nachrichtenblocks konzentrieren können.
Die Klassen source_block
, target_block
und propagator_block
sind Vorlagen, die auf der Grundlage eines Typs parametrisiert werden, der die Verbindungen oder Links zwischen Quell- und Zielblöcken verwaltet, sowie auf Grundlage eines Typs, der die Verarbeitung von Nachrichten verwaltet. Die Agents-Bibliothek definiert zwei Typen, die die Verknüpfungsverwaltung ausführen, Parallelität::single_link_registry und Parallelität::multi_link_registry. Die single_link_registry
-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit einer Quelle oder einem Ziel. Die multi_link_registry
-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit mehreren Quellen oder mehreren Zielen. Die Agentsbibliothek definiert eine Klasse, die die Nachrichtenverwaltung durchführt, Parallelität::ordered_message_processor. Die ordered_message_processor
-Klasse ermöglicht Nachrichtenblöcken die Verarbeitung von Nachrichten in der Reihenfolge ihres Empfangs.
Im folgenden Beispiel wird die Beziehung zwischen Nachrichtenblöcken sowie Quellen und Zielen veranschaulicht. Dieses Beispiel zeigt die Deklaration der Parallelitätsklasse::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Die transformer
-Klasse wird von propagator_block
abgeleitet und fungiert daher als Quell- sowie als Zielblock. Sie akzeptiert Nachrichten vom Typ _Input
und sendet Nachrichten vom Typ _Output
. Die transformer
-Klasse gibt single_link_registry
als Link-Manager für alle Zielblöcke und multi_link_registry
als Link-Manager für alle Quellblöcke an. Aus diesem Grund kann ein transformer
-Objekt ein Ziel sowie eine unbegrenzte Anzahl von Quellen haben.
Eine Klasse, von source_block
der abgeleitet wird, muss sechs Methoden implementieren: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message und resume_propagation. Eine Klasse, die von der target_block
abgeleitet wird, muss die propagate_message-Methode implementieren und optional die send_message-Methode implementieren. Ableitungen von propagator_block
sowie von source_block
und target_block
sind funktional äquivalent.
Die propagate_to_any_targets
-Methode wird von der Laufzeit aufgerufen, um alle eingehenden Nachrichten synchron oder asynchron zu verarbeiten und alle ausgehenden Nachrichten weiterzugeben. Die accept_message
-Methode wird von Zielblöcken aufgerufen, um Nachrichten zu akzeptieren. Viele Nachrichtenblocktypen wie unbounded_buffer
senden Nachrichten nur an das erste Ziel, das diese empfangen würde. Daher wird der Besitz der Nachricht auf das Ziel übertragen. Andere Nachrichtenblocktypen, z . B. Parallelität::overwrite_buffer, bieten Nachrichten für jeden ihrer Zielblöcke an. overwrite_buffer
erstellt daher eine Kopie der Nachricht für alle diesbezüglichen Ziele.
Mit den Methoden reserve_message
, consume_message
, release_message
und resume_propagation
können Nachrichtenblöcke an der Reservierung von Nachrichten teilnehmen. Zielblöcke rufen die reserve_message
-Methode auf, wenn eine Nachricht für sie bereitgestellt wird, die zur späteren Verwendung reserviert werden muss. Nach dem Reservieren einer Nachricht durch den Zielblock kann dieser die consume_message
-Methode aufrufen, um die Nachricht zu verarbeiten, oder die release_message
-Methode, um die Reservierung abzubrechen. Analog zur accept_message
-Methode kann die Implementierung von consume_message
den Besitz der Nachricht übertragen oder eine Kopie der Nachricht zurückgeben. Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die resume_propagation
-Methode von der Laufzeit aufgerufen. Diese Methode setzt die Nachrichtenweitergabe i. d. R. mit der nächsten Nachricht in der Warteschlange fort.
Die propagate_message
-Methode wird von der Laufzeit aufgerufen, um eine Nachricht asynchron von einem anderen Block zum aktuellen zu übertragen. Die send_message
-Methode ähnelt der propagate_message
-Methode, sendet die Nachrichten im Unterschied zu dieser jedoch synchron an die Zielblöcke. Die Standardimplementierung von send_message
weist alle eingehenden Nachrichten zurück. Die Laufzeit ruft keine der Methoden auf, wenn von der Nachricht nicht die optionale Filterfunktion übergeben wird, die dem Zielblock zugeordnet ist. Weitere Informationen zu Nachrichtenfiltern finden Sie unter "Asynchrone Nachrichtenblöcke".
Definieren der priority_buffer Klasse
Die priority_buffer
-Klasse ist ein benutzerdefinierter Nachrichtenblocktyp, der eingehende Meldungen zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs sortiert. Die priority_buffer
Klasse ähnelt der Parallelität::unbounded_buffer Klasse, da sie eine Warteschlange mit Nachrichten enthält, und auch weil sie sowohl als Quelle als auch als Zielnachrichtenblock fungiert und sowohl mehrere Quellen als auch mehrere Ziele aufweisen kann. unbounded_buffer
legt als Kriterium für die Weitergabe von Nachrichten jedoch nur die Reihenfolge ihres Empfangs aus den Quellen zugrunde.
Die priority_buffer
Klasse empfängt Nachrichten vom Typ "std::tuple", die enthalten und Type
Elemente enthaltenPriorityType
. PriorityType
verweist auf den Typ, der die Priorität einer Nachricht angibt; Type
verweist auf den Datenteil der Nachricht. Die priority_buffer
-Klasse sendet Nachrichten vom Typ Type
. Die priority_buffer
Klasse verwaltet auch zwei Nachrichtenwarteschlangen: ein std::p riority_queue-Objekt für eingehende Nachrichten und ein std::queue-Objekt für ausgehende Nachrichten. Das Sortieren von Nachrichten nach der Priorität ist hilfreich, wenn ein priority_buffer
-Objekt mehrere Nachrichten gleichzeitig oder bevor diese von Consumern gelesen werden empfängt.
Zusätzlich zu den sieben Methoden, die von einer Klasse implementiert werden müssen, die von propagator_block
abgeleitet wird, überschreibt die priority_buffer
-Klasse die noch die link_target_notification
-Methode und die send_message
-Methode. Die priority_buffer
-Klasse definiert außerdem zwei öffentliche Hilfsmethoden (, enqueue
und dequeue
) sowie eine private Hilfsmethode ( propagate_priority_order
).
Im folgenden Verfahren wird beschrieben, wie die priority_buffer
-Klasse implementiert wird.
So definieren Sie die priority_buffer-Klasse
Erstellen Sie eine C++-Headerdatei, und nennen Sie sie
priority_buffer.h
. Sie können auch eine bestehende Headerdatei verwenden, die Teil Ihres Projekts ist.Fügen Sie in
priority_buffer.h
, fügen Sie den folgenden Code hinzu.#pragma once #include <agents.h> #include <queue>
Definieren Sie im
std
Namespace Spezialisierungen von std::less und std::greater , die auf Parallelitätsobjekte::message reagieren.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Die
priority_buffer
-Klasse speichertmessage
-Objekte in einempriority_queue
-Objekt. Mit diesen Typspezialisierungen können die Nachrichten von der Prioritätswarteschlange anhand ihrer Priorität sortiert werden. Die Priorität ist das erste Element destuple
-Objekts.Deklarieren Sie die
concurrencyex
-Klasse impriority_buffer
-Namespace.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Die
priority_buffer
-Klasse wird vonpropagator_block
abgeleitet. Sie kann daher Meldungen senden und empfangen. Diepriority_buffer
-Klasse mehrere Ziele aufweisen, die Nachrichten vom TypType
empfangen. Sie kann außerdem mehrere Quellen aufweisen, die Nachrichten vom Typtuple<PriorityType, Type>
senden.Fügen Sie im
private
-Abschnitt derpriority_buffer
-Klasse die folgenden Membervariablen hinzu.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
Das
priority_queue
-Objekt enthält eingehende Nachrichten, dasqueue
-Objekt enthält ausgehende Nachrichten. Einpriority_buffer
-Objekt kann mehrere Nachrichten gleichzeitig empfangen. Dascritical_section
-Objekt synchronisiert den Zugriff auf die Warteschlange für eingehende Nachrichten.Definieren Sie den Kopierkonstruktor und den Zuweisungsoperator im Abschnitt
private
. Dadurch wird verhindert, dasspriority_queue
-Objekte zugewiesen werden können.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Definieren Sie im
public
-Abschnitt die Konstruktoren, die in zahlreichen Nachrichtenblocktypen verwendet werden. Definieren Sie auch den Destruktor.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Definieren Sie im
public
-Abschnitt dieenqueue
-Methode und diedequeue
-Methode. Diese Hilfsmethoden bieten eine alternative Möglichkeit, Nachrichten an einpriority_buffer
-Objekt zu senden und von diesem zu empfangen.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Definieren Sie die
protected
-Methode impropagate_to_any_targets
-Abschnitt.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Die
propagate_to_any_targets
-Methode überträgt die Nachricht, die sich in der Eingabewarteschlange an erster Stelle befindet, an die Ausgabewarteschlange, und gibt alle Nachrichten an die Ausgabewarteschlange weiter.Definieren Sie die
protected
-Methode imaccept_message
-Abschnitt.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Wenn die
accept_message
-Methode von einem Zielblock aufgerufen wird, wird der Besitz der Nachricht von derpriority_buffer
-Klasse auf den ersten Zielblock übertragen, der diesen akzeptiert. (Dieses Verhalten ist vergleichbar mitunbounded_buffer
).Definieren Sie die
protected
-Methode imreserve_message
-Abschnitt.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
Die
priority_buffer
-Klasse ermöglicht einem Zielblock, eine Nachricht zu reservieren, wenn der Bezeichner der bereitgestellten Nachricht mit dem Bezeichner der Nachricht übereinstimmt, die an erster Position in der Warteschlange steht. Anders ausgedrückt kann die Nachricht von einem Ziel reserviert werden, wenn vompriority_buffer
-Objekt noch keine weitere Nachricht empfangen und die aktuelle Nachricht noch nicht weitergegeben wurde.Definieren Sie die
protected
-Methode imconsume_message
-Abschnitt.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Die
consume_message
-Methode wird von einem Zielblock aufgerufen, um den Besitz der reservierten Methode zu übertragen.Definieren Sie die
protected
-Methode imrelease_message
-Abschnitt.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Die
release_message
-Methode wird von einem Zielblock aufgerufen, um die Reservierung einer Nachricht abzubrechen.Definieren Sie die
protected
-Methode imresume_propagation
-Abschnitt.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die
resume_propagation
-Methode von der Laufzeit aufgerufen. Diese Methode gibt alle Nachrichten weiter, die sich in der Ausgabewarteschlange befinden.Definieren Sie die
protected
-Methode imlink_target_notification
-Abschnitt.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
Die Membervariable
_M_pReservedFor
wird von der Basisklassesource_block
definiert. Diese Membervariable zeigt ggf. auf den Zielblock mit der Reservierung für die Nachricht, die sich an erster Stelle in der Warteschlange befindet.link_target_notification
wird von der Laufzeit aufgerufen, wenn ein neues Ziel mit dempriority_buffer
-Objekt verknüpft wird. Diese Methode gibt alle Nachrichten in der Ausgabewarteschlange weiter, wenn kein Ziel eine Reservierung aufweist.Definieren Sie die
private
-Methode impropagate_priority_order
-Abschnitt.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Diese Methode gibt alle Nachrichten von der Ausgabewarteschlange weiter. Jede Nachricht in der Warteschlange wird für alle Zielblöcke bereitgestellt, bis einer der Zielblöcke die Meldung akzeptiert. Die
priority_buffer
-Klasse behält die Reihenfolge der ausgehenden Nachrichten bei. Daher muss die erste Nachricht in der Ausgabewarteschlange von einem Zielblock akzeptiert werden, bevor eine andere Meldung von dieser Methode für die Zielblöcke bereitgestellt wird.Definieren Sie die
protected
-Methode impropagate_message
-Abschnitt.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Die
propagate_message
-Methode ermöglicht derpriority_buffer
-Klasse als Nachrichtenempfänger oder -ziel fungieren. Diese Methode empfängt die vom angegebenen Quellblock bereitgestellte Nachricht und fügt sie in die Prioritätswarteschlange ein. Anschließend werden alle Ausgabenachrichten von derpropagate_message
-Methode asynchron an die Zielblöcke gesendet.Die Laufzeit ruft diese Methode auf, wenn Sie die Funktion "concurrency::asend " aufrufen oder wenn der Nachrichtenblock mit anderen Nachrichtenblöcken verbunden ist.
Definieren Sie die
protected
-Methode imsend_message
-Abschnitt.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Die
send_message
-Methode ähnelt derpropagate_message
-Methode. Im Unterschied zu dieser sendet sie die Ausgabemeldungen jedoch synchron.Die Laufzeit ruft diese Methode während eines synchronen Sendevorgangs auf, z. B. wenn Sie die Parallelität::send-Funktion aufrufen.
Die priority_buffer
-Klasse enthält Konstruktorüberladungen, die in vielen Nachrichtenblocktypen verwendet werden. Einige Konstruktorüberladungen übernehmen parallele::Scheduler - oder Parallelitätsobjekte::ScheduleGroup-Objekte , mit denen der Nachrichtenblock von einem bestimmten Aufgabenplaner verwaltet werden kann. Andere Konstruktorüberladungen übernehmen eine Filterfunktion. Filterfunktionen ermöglichen Nachrichtenblöcken das Annehmen oder Ablehnen von Nachrichten anhand der Nutzlast. Weitere Informationen zu Nachrichtenfiltern finden Sie unter "Asynchrone Nachrichtenblöcke". Weitere Informationen zu Aufgabenplanern finden Sie unter "Task Scheduler".
Da die priority_buffer
Klasse Nachrichten nach Priorität und dann nach der Reihenfolge anordnet, in der Nachrichten empfangen werden, ist diese Klasse am nützlichsten, wenn sie Nachrichten asynchron empfängt, z. B. wenn Sie die Parallelität::asend-Funktion aufrufen oder wenn der Nachrichtenblock mit anderen Nachrichtenblöcken verbunden ist.
Vollständiges Beispiel
Im folgenden Beispiel wird die vollständige Definition der priority_buffer
-Klasse veranschaulicht.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Im folgenden Beispiel werden gleichzeitig eine Reihe von Und Parallelitätsvorgängenasend
::receive-Vorgänge für ein priority_buffer
Objekt ausgeführt.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Dieses Beispiel erzeugt die folgende Beispielausgabe.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Die priority_buffer
-Klasse ordnet die Nachrichten zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs. In diesem Beispiel werden Nachrichten mit höherer numerischer Priorität am Anfang der Warteschlange eingefügt.
Kompilieren des Codes
Kopieren Sie den Beispielcode, fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie die Definition der Klasse in eine Datei ein, die priority_buffer
benannt priority_buffer.h
ist, und das Testprogramm in einer Benannten priority_buffer.cpp
Datei, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.
cl.exe /EHsc priority_buffer.cpp
Siehe auch
Exemplarische Vorgehensweisen für die Concurrency Runtime
Asynchrone Nachrichtenblöcke
Funktionen zum Übergeben von Nachrichten