Procédure pas à pas : création d'un bloc de message personnalisé
Ce document décrit comment créer un type de bloc de message personnalisé qui classe les messages entrants par priorité.
Bien que les types de blocs de messages intégrés fournissent une gamme de fonctionnalités étendue, vous pouvez créer votre propre type de bloc de message et le personnaliser pour répondre aux besoins de votre application. Pour obtenir une description des types de blocs de messages intégrés fournis par la bibliothèque d'agents asynchrones, consultez Blocs de messages asynchrones.
Composants requis
Lisez les documents suivants avant de démarrer cette procédure pas-à-pas :
Sections
Cette procédure pas-à-pas contient les sections suivantes :
Conception d'un bloc de message personnalisé
Définition de la classe priority_buffer
Exemple complet
Conception d'un bloc de message personnalisé
Les blocs de messages participent à l'envoi et à la réception de messages. Un bloc de message qui envoie des messages est appelé bloc source. Un bloc de message qui reçoit des messages est appelé bloc cible. Un bloc de message qui envoie et reçoit des messages est appelé bloc propagateur. La bibliothèque d'agents utilise la classe abstraite concurrency::ISource pour représenter des blocs sources et la classe abstraite concurrency::ITarget pour représenter des blocs cibles. Les types de blocs de messages qui jouent le rôle de sources dérivent d'ISource ; les types de blocs de messages qui jouent le rôle de cibles dérivent d' ITarget.
Bien que vous puissiez dériver votre type de bloc de message directement de ISource et de ITarget, la bibliothèque d'agents définit trois classes de base qui exécutent une grande partie de la fonctionnalité commune à tous les types de messages de blocs, par exemple, la gestion des erreurs et l'interconnexion de blocs de messages de façon sécurisée du point de vue de l'accès concurrentiel. La classe concurrency::source_block dérive de ISource et envoie des messages à d'autres blocs. La classe concurrency::target_block dérive de ITarget et reçoit des messages d'autres blocs. La classe concurrency::propagator_block dérive de ISource et de ITarget et envoie des messages et en reçoit d'autres blocs. Nous vous conseillons d'utiliser ces trois classes de base pour gérer les détails de l'infrastructure afin que vous puissiez vous concentrer sur le comportement de votre bloc de message.
Les classes source_block, target_block et propagator_block sont des modèles qui sont paramétrables sur un type qui gère les connexions, également appelées liens, entre les blocs sources et cibles, et sur un type qui gère la façon dont les messages sont traités. La bibliothèque d'agents définit deux types qui procèdent à la gestion des liens, concurrency::single_link_registry et Concurrency::multi_link_registry. La classe single_link_registry active un bloc de message qui sera lié à une source ou à une cible. La classe multi_link_registry active un bloc de message qui sera lié à plusieurs sources ou à plusieurs cibles. La bibliothèque d'agents définit une classe qui exécute la gestion des messages, concurrency::ordered_message_processor. La classe ordered_message_processor permet aux blocs de messages de traiter les messages dans l'ordre dans lequel ils les reçoivent.
Pour mieux comprendre la relation entre les blocs de messages et leurs sources et leurs cibles, examinons l'exemple suivant. Cet exemple montre la déclaration de la classe concurrency::transformer.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La classe transformer dérive de propagator_block. Par conséquent, elle joue à la fois le rôle de bloc source et de bloc cible. Elle reçoit des messages de type _Input et envoie des messages de type _Output. La classe transformer spécifie single_link_registry en tant que gestionnaire de lien pour tous les blocs cibles et multi_link_registry en tant que gestionnaire de lien pour tous les blocs sources. Par conséquent, un objet transformer peut contenir une cible et un nombre illimité de sources.
Une classe qui dérive de source_block doit implémenter six méthodes : propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message et resume_propagation. Une classe qui dérive de target_block doit implémenter la méthode propagate_message et peut éventuellement implémenter la méthode send_message. Au niveau du fonctionnement, dériver de propagator_block équivaut à dériver de source_block et de target_block.
La méthode propagate_to_any_targets est appelée par le runtime pour traiter de manière asynchrone ou synchrone tous les messages entrants et propager tous les messages de sortie. La méthode accept_message est appelée par les blocs cibles pour recevoir des messages. De nombreux types de blocs de messages, comme unbounded_buffer, n'envoient des messages qu'à la première cible qui les reçoit. Dans ce cas, la propriété du message est transférée à la cible. D'autres types de blocs de messages, comme concurrency::overwrite_buffer, offrent des messages à chacun de leurs blocs cibles. Par conséquent, overwrite_buffer crée une copie du message pour chacune de ses cibles.
Les méthodes reserve_message, consume_message, release_message et resume_propagation permettent aux blocs de messages de participer à la réservation des messages. Les blocs cibles appellent la méthode reserve_message lorsqu'un message leur est proposé et doivent réserver ce message pour l'utiliser ultérieurement. Une fois le message réservé, le bloc cible peut appeler la méthode consume_message pour utiliser ce message ou la méthode release_message pour annuler la réservation. Comme pour la méthode accept_message, l'implémentation de consume_message peut transférer la propriété du message ou retourner une copie du message. Une fois que le message réservé est consommé ou que sa réservation est annulée par le bloc cible, le runtime appelle la méthode resume_propagation. En général, cette méthode poursuit la propagation des messages, en commençant par le message suivant dans la file d'attente.
Le runtime appelle la méthode propagate_message pour transférer de façon asynchrone un message d'un autre bloc au bloc actuel. La méthode send_message ressemble à propagate_message, à la différence qu'elle envoie de façon synchrone, et non pas asynchrone, le message aux blocs cibles. L'implémentation par défaut de send_message rejette tous les messages entrants. Le runtime n'appelle aucune de ces méthodes si le message ne passe pas la fonction de filtre facultative associée au bloc cible. Pour plus d'informations sur les filtres de messages, consultez Blocs de messages asynchrones.
[Premières]
Définition de la classe priority_buffer
La classe priority_buffer est un type de bloc de message personnalisé qui classe les messages entrants par priorité, puis par ordre de réception. La classe priority_buffer ressemble à la classe concurrency::unbounded_buffer car elle contient une file d'attente de messages, elle joue le rôle de bloc de message source et cible et elle peut comporter plusieurs sources et plusieurs cibles. Toutefois, unbounded_buffer fait reposer la propagation des messages uniquement sur l'ordre dans lequel elle reçoit des messages de ses sources.
La classe priority_buffer reçoit des messages de type std::tuple qui comportent des éléments PriorityType et Type. PriorityType fait référence au type qui conserve la priorité de chaque message. Type fait référence à la partie données du message. La classe priority_buffer envoie des messages de type Type. La classe priority_buffer gère également deux files d'attente : un objet std::priority_queue pour les messages entrants et un objet std::queue pour les messages sortants. Classer les messages par priorité s'avère utile lorsqu'un objet priority_buffer reçoit simultanément plusieurs messages ou lorsqu'il reçoit des messages avant qu'ils ne soient lus par les consommateurs.
Outre les sept méthodes qui doivent être implémentées par une classe qui dérive de propagator_block, la classe priority_buffer substitue également les méthodes link_target_notification et send_message. La classe priority_buffer définit également deux méthodes d'assistance publiques, enqueue et dequeue, et une méthode d'assistance privée, propagate_priority_order.
La procédure suivante décrit comment implémenter la classe priority_buffer.
Pour définir la classe priority_buffer
Créez le fichier d'en-tête C++ et nommez-le priority_buffer.h. Vous pouvez également utiliser un fichier d'en-tête existant qui fait partie de votre projet.
Dans priority_buffer.h, ajoutez le code suivant.
#pragma once #include <agents.h> #include <queue>
Dans l'espace de noms std, définissez les spécialisations de std::less et de std::greater, qui agissent sur les objets concurrency::message.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La classe priority_buffer stocke les objets message dans un objet priority_queue. Ces spécialisations de type permettent à la file d'attente de priorité déterminée de trier les messages en fonction de leur priorité. La priorité est le premier élément de l'objet tuple.
Dans l'espace de noms concurrencyex, déclarez la classe priority_buffer.
namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La classe priority_buffer dérive de propagator_block. Par conséquent, elle peut envoyer et recevoir des messages. La classe priority_buffer peut avoir plusieurs cibles qui reçoivent des messages de type Type. Elle peut également comporter plusieurs sources qui envoient des messages de type tuple<PriorityType, Type>.
Dans la section private de la classe priority_buffer, ajoutez les variables membres suivantes.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
L'objet priority_queue contient les messages entrants. L'objet queue contient les messages sortants. Un objet priority_buffer peut recevoir plusieurs messages simultanément. L'objet critical_section synchronise l'accès des messages d'entrée à la file d'attente.
Dans la section private, définissez le constructeur de copie et l'opérateur d'assignation. Cela empêche les objets priority_queue de pouvoir être assignés.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Dans la section public, définissez les constructeurs communs à de nombreux types de blocs de messages. Définissez également le destructeur.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Dans la section public, définissez les méthodes enqueue et dequeue. Ces méthodes d'assistance fournissent un autre moyen d'envoyer des messages à des objets priority_buffer et d'en recevoir de leur part.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Dans la section protected, définissez la méthode propagate_to_any_targets.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
La méthode propagate_to_any_targets transfère le message qui est en tête de la file d'attente d'entrée vers la file d'attente de sortie et propage tous les messages de la file d'attente de sortie.
Dans la section protected, définissez la méthode accept_message.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Lorsqu'un bloc cible appelle la méthode accept_message, la classe priority_buffer transfère la propriété du message au premier bloc cible qui le reçoit. (Ce comportement ressemble à celui de unbounded_buffer)
Dans la section protected, définissez la méthode reserve_message.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La classe priority_buffer permet à un bloc cible de réserver un message lorsque l'identificateur de message fourni correspond à l'identificateur de message du début de la file d'attente. En d'autres termes, une cible peut réserver le message si l'objet priority_buffer n'a pas encore reçu de message supplémentaire et qu'il n'a pas encore propagé le message actuel.
Dans la section protected, définissez la méthode consume_message.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un bloc cible appelle consume_message pour transférer la propriété du message qui a été réservé.
Dans la section protected, définissez la méthode release_message.
// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un bloc cible appelle release_message pour annuler la réservation d'un message.
Dans la section protected, définissez la méthode resume_propagation.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Une fois que le message réservé est consommé ou que sa réservation est annulée par le bloc cible, le runtime appelle resume_propagation. Cette méthode propage tous les messages présents dans la file d'attente de sortie.
Dans la section protected, définissez la méthode link_target_notification.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La variable membre _M_pReservedFor est définie par la classe de base, source_block. Cette variable membre pointe vers le bloc cible, s'il y en a un, qui conserve la réservation du message présent au début de la file d'attente de sortie. Le runtime appelle link_target_notification lorsqu'une nouvelle cible est associée à l'objet priority_buffer. Cette méthode propage tous les messages présents dans la file d'attente de sortie si aucune cible ne conserve de réservation.
Dans la section private, définissez la méthode propagate_priority_order.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Cette méthode propage tous les messages de la file d'attente de sortie. Chaque message de la file d'attente est envoyé à chaque bloc cible, jusqu'à ce que l'un des blocs cibles accepte le message. La classe priority_buffer conserve l'ordre des messages sortants. Par conséquent, le premier message de la file d'attente de sortie doit être accepté par un bloc cible avant que cette méthode offre un autre message aux blocs cibles.
Dans la section protected, définissez la méthode propagate_message.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
La méthode propagate_message permet à la classe priority_buffer de jouer le rôle de cible ou de récepteur de message. Cette méthode reçoit le message qui est offert par le bloc source fourni et insère ce message dans la file d'attente de priorité. La méthode propagate_message envoie ensuite de façon asynchrone tous les messages sortants vers les blocs cibles.
Le runtime appelle cette méthode lorsque vous appelez la fonction concurrency::asend ou lorsque le bloc de message est connecté à d'autres blocs de message.
Dans la section protected, définissez la méthode send_message.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
La méthode send_message ressemble à propagate_message. Toutefois, elle envoie les messages sortants de façon synchrone, et non asynchrone.
Le runtime appelle cette méthode pendant une opération d'envoi synchrone, comme lorsque vous appelez la fonction concurrency::send.
La classe priority_buffer contient des surcharges de constructeur qui sont courantes dans de nombreux types de blocs de messages. Certaines surcharges de constructeur acceptent les objets concurrency::Scheduler ou concurrency::ScheduleGroup, qui permettent au bloc de message d'être géré par un planificateur de tâches spécifique. D'autres surcharges de constructeur acceptent une fonction de filtre. Les fonctions de filtre permettent aux blocs de messages d'accepter ou de rejeter un message en fonction de sa charge. Pour plus d'informations sur les filtres de messages, consultez Blocs de messages asynchrones. Pour plus d'informations sur les planificateurs de tâches, consultez Planificateur de tâches (runtime d'accès concurrentiel).
Étant donné que la classe priority_buffer ordonne les messages par priorité puis par ordre de réception, cette classe s'avère très utile lorsqu'elle reçoit les messages de manière asynchrone, par exemple lorsque vous appelez la fonction concurrency::asend ou lorsque le bloc de message est connecté à d'autres blocs de message.
[Premières]
Exemple complet
L'exemple suivant illustre la définition complète de la classe priority_buffer.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
L'exemple suivant exécute simultanément plusieurs opérations asend et concurrency::receive sur un objet priority_buffer.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Cet exemple génère l'exemple de sortie suivant.
La classe priority_buffer classe les messages par priorité, puis par ordre de réception. Dans cet exemple, les messages dont la priorité numérique est supérieure sont insérés à l'avant de la file d'attente.
[Premières]
Compilation du code
Copiez l'exemple de code et collez-le dans un projet Visual Studio, ou collez la définition de la classe priority_buffer dans un fichier nommé priority_buffer.h et le programme de test dans un fichier nommé priority_buffer.cpp. Exécutez ensuite la commande suivante dans une fenêtre d'invite de commandes de Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Voir aussi
Concepts
Fonctions de passage de messages
Autres ressources
Procédures pas à pas relatives au runtime d'accès concurrentiel