Tutorial: Crear un bloque de mensajes personalizado
En este documento se describe cómo crear un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes por prioridad.
Aunque los tipos integrados de bloques de mensajes proporciona una amplia gama de funcionalidad, puede crear su propio tipo de bloque de mensajes y personalizarlo para satisfacer los requisitos de la aplicación. Para obtener una descripción de los tipos de bloques de mensajes integrados proporcionados por la Biblioteca de agentes asincrónicos, vea Bloques de mensajes asincrónicos.
Requisitos previos
Lea los documentos siguientes antes de iniciar este tutorial:
Secciones
Este tutorial contiene las siguientes secciones:
Diseñar un bloque de mensajes personalizado
Definir la clase priority_buffer
Ejemplo completo
Diseñar un bloque de mensajes personalizado
Los bloques de mensajes participan en el acto de enviar y recibir mensajes. Un bloque de mensajes que envía mensajes se conoce como un bloque de origen. Un bloque de mensajes que recibe mensajes se conoce como un bloque de destino. Un bloque de mensajes que envía y recibe mensajes se conoce como un bloque propagador. La biblioteca de agentes usa la clase abstracta Concurrency::ISource para representar bloques de origen y la clase abstracta Concurrency::ITarget para representar bloques de destino. Los tipos de bloques de mensajes que actúan como orígenes derivan de ISource; los tipos de bloques de mensajes que actúan como destinos derivan de ITarget.
Aunque puede derivar el tipo de bloque de mensajes directamente de ISource y ITarget, la biblioteca de agentes define tres clases base que realizan gran parte de la funcionalidad común a todos los tipos de bloques de mensajes; por ejemplo, control de errores y conexión de los bloques de mensajes de manera segura para simultaneidad. La clase Concurrency::source_block deriva de ISource y envía mensajes a otros bloques. La clase Concurrency::target_block deriva de ITarget y recibe mensajes de otros bloques. La clase Concurrency::propagator_block deriva de ISource e ITarget y envía mensajes a otros bloques y recibe mensajes de otros bloques. Se recomienda usar estas tres clases base para controlar los detalles de infraestructura de modo que se pueda centrar en el comportamiento del bloque de mensajes.
Las clases source_block, target_block y propagator_block son plantillas que se parametrizan en un tipo que administra las conexiones, o vínculos, entre los bloques de origen y de destino y en un tipo que administra cómo se procesan los mensajes. La biblioteca de agentes define dos tipos que realizan administración de vínculos: Concurrency::single_link_registry y Concurrency::multi_link_registry. La clase single_link_registry permite vincular un bloque de mensajes a un origen o a un destino. La clase multi_link_registry permite vincular un bloque de mensajes a varios orígenes o a varios destinos. La biblioteca de agentes define una clase que realiza administración de mensajes, Concurrency::ordered_message_processor. La clase ordered_message_processor permite que los bloques de mensajes procesen los mensajes en el orden en que se reciben.
Para entender mejor cómo se relacionan los bloques de mensajes con sus orígenes y destinos, considere el ejemplo siguiente. En este ejemplo se muestra la declaración de la clase Concurrency::transformer.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La clase transformer se deriva de propagator_block y, por tanto, actúa como bloque de origen y como bloque de destino. Acepta mensajes de tipo _Input y envía mensajes de tipo _Output. La clase transformer especifica single_link_registry como administrador de vínculos para los bloques de destino y multi_link_registry como administrador de vínculos para los bloques de origen. Por tanto, un objeto transformer puede tener hasta un destino y un número ilimitado de orígenes.
Una clase derivada de source_block debe implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message y resume_propagation. Una clase derivada de target_block debe implementar el método propagate_message y puede implementar opcionalmente el método send_message. Derivar de propagator_block es funcionalmente equivalente a la derivación de source_block y target_block.
El runtime llama al método propagate_to_any_targets para procesar de forma sincrónica o asincrónica los mensajes entrantes y propagar los mensajes salientes. Los bloques de destino llaman al método accept_message para aceptar mensajes. Muchos tipos de bloques de mensajes, como unbounded_buffer, envían mensajes solo al primer destino que los recibiría. Por tanto, transfiere la propiedad del mensaje al destino. Otros tipos de bloques de mensajes, como Concurrency::overwrite_buffer, ofrecen mensajes a cada uno de sus bloques de destino. Por tanto, overwrite_buffer crea una copia del mensaje para cada uno de sus destinos.
Los métodos reserve_message, consume_message, release_message y resume_propagation permiten a los bloques de mensajes participar en la reserva de mensajes. Los bloques de destino llaman al método reserve_message cuando se les ofrece un mensaje y tienen que reservar el mensaje para su uso posterior. Después de que un bloque de destino reserva un mensaje, puede llamar al método consume_message para usar ese mensaje o al método release_message para cancelar la reserva. Como sucede con el método accept_message, la implementación de consume_message puede transferir la propiedad del mensaje o devolver una copia del mensaje. Después de que un bloque de destino usa o libera un mensaje reservado, el runtime llama al método resume_propagation. Normalmente, este método continúa la propagación de mensajes, comenzando por el siguiente mensaje de la cola.
El runtime llama al método propagate_message para transferir de forma asincrónica un mensaje de otro bloque al actual. El método send_message es similar a propagate_message, excepto en que envía de forma sincrónica, en lugar de asincrónica, el mensaje a los bloques de destino. La implementación predeterminada de send_message rechaza todos los mensajes entrantes. El runtime no llama a ninguno de estos métodos si el mensaje no supera la función opcional de filtro asociada al bloque de destino. Para obtener más información sobre los filtros de mensajes, vea Bloques de mensajes asincrónicos.
[Ir al principio]
Definir la clase priority_buffer
La clase priority_buffer es un tipo de bloque de mensajes personalizado que ordena los mensajes entrantes primero por prioridad y, a continuación, en el orden en que se reciben los mensajes. La clase priority_buffer es similar a la clase Concurrency::unbounded_buffer porque contiene una cola de mensajes, y también porque actúa como bloque de mensajes de origen y de destino, y puede tener varios orígenes y varios destinos. Sin embargo, unbounded_buffer basa la propagación de mensaje solo en el orden en que recibe mensajes de sus orígenes.
La clase priority_buffer recibe mensajes de tipo std::tuple que contienen elementos PriorityType y Type. PriorityType se refiere al tipo que contiene la prioridad de cada mensaje; Type se refiere a la parte de datos del mensaje. La clase priority_buffer envía mensajes de tipo Type. La clase priority_buffer también administra dos colas de mensajes: un objeto std::priority_queue para los mensajes entrantes y un objeto std::queue para los mensajes salientes. Ordenar los mensajes por prioridad es útil cuando un objeto priority_buffer recibe varios mensajes simultáneamente o cuando recibe varios mensajes antes de que los consumidores lean cualquier mensaje.
Además de los siete métodos que una clase derivada de propagator_block debe implementar, la clase priority_buffer también invalida los métodos link_target_notification y send_message. La clase priority_buffer también define dos métodos auxiliares públicos, enqueue y dequeue, y un método auxiliar privado, propagate_priority_order.
En el procedimiento siguiente se describe cómo implementar la clase priority_buffer.
Para definir la clase priority_buffer
Cree el archivo de encabezado de C++ y asígnele el nombre priority_buffer.h. O bien, puede usar un archivo de encabezado existente que forme parte del proyecto.
En priority_buffer.h, agregue el código siguiente.
#pragma once #include <agents.h> #include <queue>
En el espacio de nombres std, defina especializaciones de std::less y std::greater que actúen sobre objetos Concurrency::message.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<Concurrency::message<tuple<PriorityType,Type>>*> { typedef Concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<Concurrency::message<tuple<PriorityType, Type>>*> { typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La clase priority_buffer almacena objetos message en un objeto priority_queue. Estas especializaciones de tipo permiten a la cola de prioridad ordenar los mensajes según su prioridad. La prioridad es el primer elemento del objeto tuple.
En el espacio de nombres Concurrency, declare la clase priority_buffer.
namespace Concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La clase priority_buffer se deriva de propagator_block. Por tanto, puede enviar y recibir mensajes. La clase priority_buffer puede tener varios destinos que reciben mensajes de tipo Type. También puede tener varios orígenes que envían mensajes de tipo tuple<PriorityType, Type>.
En la sección private de la clase priority_buffer, agregue las variables miembro siguientes.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
El objeto priority_queue contiene mensajes entrantes; el objeto queue contiene mensajes salientes. Un objeto priority_buffer puede recibir varios mensajes simultáneamente; el objeto critical_section sincroniza el acceso a la cola de mensajes entrantes.
En la sección private, defina el constructor de copias y el operador de asignación. Esto impide que los objetos priority_queue sean asignables.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
En la sección public, defina los constructores que son comunes a muchos tipos de bloques de mensajes. Defina también el destructor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
En la sección public, defina los métodos enqueue y dequeue. Estos métodos auxiliares proporcionan una manera alternativa de enviar mensajes a y recibir mensajes de un objeto priority_buffer.
// Sends an item to the message block. bool enqueue(Type const& item) { return Concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
En la sección protected, defina el método propagate_to_any_targets.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
El método propagate_to_any_targets transfiere a la cola de salida el mensaje que está al principio de la cola de entrada y propaga todos los mensajes de la cola de salida.
En la sección protected, defina el método accept_message.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Cuando un bloque de destino llama al método accept_message, la clase priority_buffer transfiere la propiedad del mensaje al primer bloque de destino que lo acepta. (Esto es similar al comportamiento de unbounded_buffer.)
En la sección protected, defina el método reserve_message.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La clase priority_buffer permite a un bloque de destino reservar un mensaje cuando el identificador de mensaje proporcionado coincide con el identificador de mensaje que está en el principio de la cola. Es decir, un destino puede reservar el mensaje si el objeto priority_buffer aún no ha recibido un mensaje adicional y aún no ha propagado el actual.
En la sección protected, defina el método consume_message.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un bloque de destino llama a consume_message para transferir la propiedad del mensaje reservado.
En la sección protected, defina el método release_message.
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un bloque de destino llama a release_message para cancelar la reserva un mensaje.
En la sección protected, defina el método resume_propagation.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
El runtime llama a resume_propagation después de que un bloque de destino use o libere un mensaje reservado. Este método propaga cualquier mensaje que esté en la cola de salida.
En la sección protected, defina el método link_target_notification.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La variable miembro _M_pReservedFor está definida por la clase base, source_block. Esta variable miembro apunta al bloque de destino, si existe, que mantiene una reserva del mensaje que está al principio de la cola de salida. El runtime llama a link_target_notification cuando un nuevo destino se vincula al objeto priority_buffer. Este método propaga cualquier mensaje que está en la cola de salida si ningún destino mantiene una reserva.
En la sección private, defina el método propagate_priority_order.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Este método propaga todos los mensajes de la cola de salida. Todos los mensajes de la cola se ofrecen a todos los bloques de destino hasta que uno de los bloques de destino acepta el mensaje. La clase priority_buffer conserva el orden de los mensajes salientes. Por tanto, un bloque de destino debe aceptar el primer mensaje de la cola de salida antes de que este método ofrezca ningún otro mensaje a los bloques de destino.
En la sección protected, defina el método propagate_message.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
El método propagate_message permite que la clase priority_buffer actúe como receptor de mensajes o destino. Este método recibe el mensaje ofrecido por el bloque de origen proporcionado e inserta ese mensaje en la cola de prioridad. El método propagate_message envía de forma asincrónica todos los mensajes de salida a los bloques de destino.
El runtime llama a este método cuando se llama a la función Concurrency::asend o cuando el bloque de mensajes está conectado a otros bloques de mensajes.
En la sección protected, defina el método send_message.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
El método send_message es similar a propagate_message. Sin embargo, envía los mensajes de salida sincrónicamente en lugar de hacerlo de forma asincrónica.
El runtime llama a este método durante una operación sincrónica de envío, como cuando se llama a la función Concurrency::send.
La clase priority_buffer contiene sobrecargas del constructor que son típicas en muchos tipos de bloques de mensajes. Algunas sobrecargas de constructor toman objetos Concurrency::Scheduler o Concurrency::ScheduleGroup, que permiten que un programador de tareas específico administre el bloque de mensajes. Otras sobrecargas del constructor toman una función de filtro. Las funciones de filtro permiten a los bloques de mensajes aceptar o rechazar un mensaje en función de su carga. Para obtener más información sobre los filtros de mensajes, vea Bloques de mensajes asincrónicos. Para obtener más información sobre los programadores de tareas, vea Programador de tareas (Runtime de simultaneidad).
Puesto que la clase priority_buffer ordena los mensajes por prioridad y, a continuación, por el orden en que se reciben mensajes, esta clase es más útil cuando los mensajes de forma asincrónica, por ejemplo, cuando se llama a la función Concurrency::asend o cuando el bloque de mensajes está conectado a otros bloques de mensajes.
[Ir al principio]
Ejemplo completo
En el ejemplo siguiente se muestra la definición completa de la clase priority_buffer.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*>
{
typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*>
{
typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace Concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return Concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
En el ejemplo siguiente se realiza simultáneamente una serie de operaciones asend y Concurrency::receive sobre un objeto priority_buffer.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace Concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Este ejemplo genera la siguiente salida de ejemplo.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
La clase priority_buffer ordena los mensajes primero por prioridad y, a continuación, por el orden en que recibe los mensajes. En este ejemplo, los mensajes con mayor prioridad numérica se insertan al principio de la cola.
[Ir al principio]
Compilar el código
Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o pegue la definición de la clase priority_buffer en un archivo denominado priority_buffer.h y el programa de prueba en un archivo denominado priority_buffer.cpp y, a continuación, ejecute el siguiente comando en una ventana de símbolo del sistema de Visual Studio 2010.
cl.exe /EHsc priority_buffer.cpp
Vea también
Conceptos
Tutoriales del Runtime de simultaneidad