Instruções passo a passo: criando um bloco de mensagens personalizado
Este documento descreve como criar um tipo de bloco de mensagem personalizado que ordena as mensagens de entrada por prioridade.
Embora os tipos internos do bloco de mensagem forneçam uma ampla variedade de funcionalidade, você pode criar seu próprio tipo de bloco de mensagem e personalizá-lo para atender aos requisitos do aplicativo. Para obter uma descrição dos tipos internos do bloco de mensagem que são fornecidos pela biblioteca assíncrona de agentes, consulte Blocos de mensagens assíncronos.
Pré-requisitos
Leia os seguintes documentos antes de iniciar esta explicação passo a passo:
Seções
Essa explicação passo a passo contém as seguintes seções:
Projetando um Bloco de Mensagem Personalizado
Definindo a Classe priority_buffer
O Exemplo Completo
Projetando um Bloco de Mensagem Personalizado
Os blocos de mensagem participam do ato de enviar e receber mensagens. Um bloco de mensagem que envia mensagens é conhecido como bloco de origem. Um bloco de mensagem recebe mensagens é conhecido como bloco de destino. Um bloco de mensagem que envia e recebe mensagens é conhecido como bloco de propagador. A biblioteca de agentes usa a classe abstrata concurrency::ISource para representar blocos de origem e a classe abstrata concurrency::ITarget para representar blocos de destino. Os tipos de bloco de mensagem que atuam como fontes derivam de ISource; os tipos de bloco de mensagem que atuam como destinos derivam de ITarget.
Embora você possa derivar seu tipo de bloco de mensagem diretamente de ISource e ITarget, a biblioteca de agentes define três classes base que executam grande parte da funcionalidade, que é comum a todos os tipos de bloco de mensagem, por exemplo, tratamento de erros e conexão de blocos de mensagem juntos de maneira segura de simultaneidade. A classe concurrency::source_block deriva de ISource e envia mensagens a outros blocos. A classe concurrency::target_block deriva de ITarget e recebe mensagens de outros blocos. A classe concurrency::propagator_block deriva de ISource e ITarget e envia e recebe mensagens de outros blocos. Recomendamos que você usa essas três classes base para manipular detalhes de infraestrutura de modo que você possa focalizar no comportamento do bloco de mensagem.
As classes source_block, target_block e propagator_block são modelos que são parametrizados em um tipo que gerencia as conexões, ou links, entre os blocos de origem e destino e em um tipo que gerencia como as mensagens são processadas. A biblioteca de agentes define dois tipos que executam o gerenciamento de links, concurrency::single_link_registry e concurrency::multi_link_registry. A classe single_link_registry permite que um bloco de mensagem seja vinculado a uma origem ou a um destino. A classe multi_link_registry permite que um bloco de mensagem seja vinculado às várias origens ou vários destinos. A biblioteca de agentes define uma classe que executa o gerenciamento de mensagens, concurrency::ordered_message_processor. A classe ordered_message_processor permite que blocos de mensagem processem mensagens na ordem em que as recebem.
Para melhor compreender como blocos de mensagem se relacionam às origens e aos destinos, considere o exemplo a seguir. Esse exemplo mostra a declaração da classe concurrency::transformer.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
A classe transformer deriva de propagator_block e, portanto, atua como um bloco de origem e como um bloco de destino. Aceita mensagens de tipo _Input e envia mensagens de tipo _Output. A classe transformer especifica single_link_registry como o gerenciador de links para todos os blocos de destino e multi_link_registry como o gerenciador de links para todos os blocos de origem. Portanto, um objeto transformer pode ter até um destino e um número ilimitado de origens.
Uma classe que deriva de source_block deve implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation. Uma classe que deriva de target_block deve implementar o método propagate_message e pode, opcionalmente, implementar o método send_message. A derivação de propagator_block é funcionalmente equivalente à derivação de source_block e de target_block.
O método propagate_to_any_targets é chamado pelo tempo de execução para processar de modo assíncrono ou síncrono qualquer mensagem de entrada e propagar qualquer mensagem de saída. O método accept_message é chamado por blocos de destino para aceitar mensagens. Muitos tipos de bloco de mensagem, como unbounded_buffer, enviam mensagens somente para o primeiro destino que a receberia. Portanto, ele transfere a propriedade da mensagem ao destino. Outros tipos de bloco de mensagem, como concurrency::overwrite_buffer, oferecem mensagens a cada um de seus blocos de destino. Portanto, overwrite_buffer cria uma cópia da mensagem para cada um dos seus destinos.
Os métodos reserve_message, consume_message, release_message e resume_propagation permitem que blocos de mensagem participem da reserva de mensagem. Blocos de destino chamam o método reserve_message quando recebem uma mensagem e eles precisam reservá-la para uso posterior. Depois que um bloco de destino reserva uma mensagem, pode chamar o método consume_message para consumir aquela mensagem ou o método release_message para cancelar a reserva. Como com o método accept_message , a implementação de consume_message pode transferir a propriedade da mensagem ou retornar uma cópia da mensagem. Depois que um bloco de destino consome ou libera uma mensagem reservada, o tempo de execução chama o método resume_propagation. Normalmente, esse método continua a propagação da mensagem, começando com a mensagem a seguir na fila.
O tempo de execução chama o método propagate_message para transferir de forma assíncrona uma mensagem de outro bloco para o atual. O método send_message é semelhante a propagate_message, exceto pelo fato de que envia a mensagem aos blocos de destino de modo síncrono, e não assíncrono. A implementação padrão de send_message rejeita todas as mensagens de entrada. O tempo de execução não chamará nenhum desses métodos se a mensagem não passar na função opcional de filtro que está associada ao bloco de destino. Para obter mais informações sobre os filtros de mensagem, consulte Blocos de mensagens assíncronos.
[Superior]
Definindo a Classe priority_buffer
A classe priority_buffer é um tipo de bloco de mensagem personalizado que ordena as mensagens de entrada primeiro por prioridade e depois pela ordem em que as mensagens são recebidas. A classe priority_buffer é semelhante à classe concurrency::unbounded_buffer, pois contém uma fila de mensagens e também porque atua como um bloco de mensagem de origem e de destino, podendo ter várias origens e vários destinos. No entanto, unbounded_buffer baseia a propagação de mensagem somente na ordem em que recebe as mensagens de suas fontes.
A classe priority_buffer recebe mensagens do tipo std::tuple que contêm os elementos PriorityType e Type. PriorityType refere-se ao tipo que contém a prioridade de cada mensagem; Type refere-se a parte de dados da mensagem. A classe priority_buffer envia mensagens do tipo Type. A classe priority_buffer também gerencia duas filas de mensagens: um objeto std::priority_queue para mensagens de entrada e um objeto std::queue para mensagens de saída. Ordenar mensagens por prioridade é útil quando um objeto priority_buffer recebe várias mensagens simultaneamente ou quando ele recebe várias mensagens antes de elas serem lidas pelos consumidores.
Além dos sete métodos que uma classe que deriva de propagator_block deve implementar, a classe de priority_buffer também sobrescreve os métodos de link_target_notification e de send_message. A classe priority_buffer também define dois métodos auxiliares públicos, enqueue e dequeue, e um método auxiliar privado, propagate_priority_order.
O procedimento a seguir descreve como implementar a classe priority_buffer.
Para definir a classe priority_buffer
Crie o arquivo de cabeçalho de c++ e denomine-o priority_buffer.h. Como alternativa, é possível usar um arquivo de cabeçalho existente que faz parte do projeto.
Em priority_buffer.h, adicione o código a seguir.
#pragma once #include <agents.h> #include <queue>
No namespace de std, defina as especializações de std::less e de std::greater que atuam em objetos de concurrency::message.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
A classe priority_buffer armazena objetos message em um objeto priority_queue. Essas especializações de tipo permitem que a fila de prioridade classifique mensagens de acordo com a prioridade. A prioridade é o primeiro elemento do objeto tuple.
No namespace concurrencyex, declare a classe de priority_buffer.
namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
A classe priority_buffer é derivada de propagator_block. Portanto, ela pode enviar e receber mensagens. A classe priority_buffer pode ter vários destinos que recebem mensagens do tipo Type. Também pode ter várias fontes que enviam mensagens do tipo tuple<PriorityType, Type>.
Na seção private da classe de priority_buffer, adicione as seguintes variáveis de membro.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
O objeto priority_queue contém mensagens de entrada; o objeto queue contém mensagens de saída. Um objeto priority_buffer pode receber vários mensagens simultaneamente; o objeto critical_section sincroniza acesso à fila de mensagens de entrada.
Na seção private, defina o construtor de impressão e o operador de atribuição. Isso impede que os objetos priority_queue sejam atribuíveis.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Na seção public, defina os construtores que são comuns a muitos tipos de bloco de mensagem. Também define o destruidor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Na seção public, defina os métodos enqueue e dequeue. Esses métodos auxiliares fornecem uma maneira alternativa para enviar mensagens de um objeto priority_buffer, bem como para receber mensagens dele.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Na seção protected, defina o método propagate_to_any_targets.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
O método propagate_to_any_targets transfere a mensagem que está no início da fila de entrada para a fila de saída e propaga todas as mensagens na fila de saída.
Na seção de protected , defina o método accept_message.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Quando um bloco de destino chama o método accept_message, a classe priority_buffer transfere a propriedade da mensagem para o primeiro bloco de destino que a aceita. (Isso se assemelha ao comportamento de unbounded_buffer.)
Na seção de protected , defina o método reserve_message.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
A classe priority_buffer permite que um bloco de destino reserve uma mensagem quando o identificador da mensagem fornecido corresponde ao identificador da mensagem que está no início da fila. Em outras palavras, um destino pode reservar a mensagem se o objeto de priority_buffer ainda não tiver recebido uma mensagem adicional e ainda não tiver propagado a atual.
Na seção de protected , defina o método consume_message.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Um bloco de destino chama consume_message para transferir a propriedade da mensagem que ele reservou.
Na seção de protected , defina o método release_message.
// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Um bloco de destino chama release_message para cancelar sua reserva para uma mensagem.
Na seção de protected , defina o método resume_propagation.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
O tempo de execução chama resume_propagation depois que um bloco de destino consome ou libera uma mensagem reservada. Esse método propaga qualquer mensagem que esteja na fila de saída.
Na seção de protected , defina o método link_target_notification.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
A variável de membro _M_pReservedFor é definido pela classe base source_block. Essa variável de membro aponta para o bloco de destino, se houver, que está mantendo uma reserva para a mensagem que está no início da fila de saída. O tempo de execução chama link_target_notification quando um novo destino é vinculado ao objeto priority_buffer. Esse método propagará todas as mensagens que estão na fila de saída se nenhum destino estiver mantendo uma reserva.
Na seção de private , defina o método propagate_priority_order.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Esse método propaga todas as mensagens da fila de saída. Cada mensagem na fila é oferecida a cada bloco de destino até que um dos blocos de destino aceite a mensagem. A classe priority_buffer preserva a ordem das mensagens de saída. Portanto, a primeira mensagem na fila de saída deve ser aceita por um bloco de destino para que esse método ofereça qualquer outra mensagem aos blocos de destino.
Na seção de protected , defina o método propagate_message.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
O método propagate_message permite que a classe priority_buffer atue como um receptor de mensagens ou destino. Esse método recebe a mensagem que é oferecida pelo bloco de origem fornecido e insere essa mensagem na fila de prioridade. O método propagate_message envia de modo assíncrono todas as mensagens de saída aos blocos de destino.
O tempo de execução chama esse método quando você chama a função concurrency::asend ou quando o bloco de mensagem está conectado a outros blocos de mensagem.
Na seção de protected , defina o método send_message.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
O método send_message é semelhante a propagate_message. No entanto, envia mensagens de saída de forma síncrona em vez de forma assíncrona.
O tempo de execução chama esse método durante uma operação de envio síncrona, como quando você chama a função concurrency::send.
A classe priority_buffer contém sobrecargas de construtor que são comuns em muitos tipos de bloco de mensagem. Algumas sobrecargas do construtor utilizam os objetos concurrency::Scheduler ou concurrency::ScheduleGroup, que permitem que o bloco de mensagens seja gerenciado por um agendador de tarefas específico. Outras sobrecargas do construtor têm função de filtro. As funções de filtro permitem que blocos de mensagem aceitem ou descartem uma mensagem com base em sua carga útil. Para obter mais informações sobre os filtros de mensagem, consulte Blocos de mensagens assíncronos. Para obter mais informações sobre os agendadores de tarefa, consulte Agendador de tarefas (Tempo de Execução de Simultaneidade).
Como a classe de priority_buffer ordena as mensagens por prioridade e pela ordem em que as mensagens são recebidas, essa classe é mais útil quando se recebe mensagens de forma assíncrona, por exemplo, quando você chama a função de simultaneidade:: ou quando o bloco de mensagens é conectado à outros blocos de mensagens.
[Superior]
O Exemplo Completo
O exemplo a seguir mostra a definição completa da classe priority_buffer.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
O exemplo a seguir execução simultaneamente um número de operações asend e concurrency::receive em um objeto priority_buffer.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Esse exemplo gera a seguinte saída de amostra.
A classe priority_buffer ordena as mensagens primeiro por prioridade e depois pela ordem em que recebe as mensagens. Nesse exemplo, as mensagens com maior prioridade numérica são inseridas à frente da fila.
[Superior]
Compilando o código
Copie o código de exemplo e cole-o em um projeto Visual Studio, ou cole a definição de classe de priority_buffer em um arquivo chamado priority_buffer.h e o programa de teste em um arquivo chamado priority_buffer.cpp e, em seguida, execute o seguinte comando em uma janela de prompt de comando do Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Consulte também
Conceitos
Blocos de mensagens assíncronos
Funções de transmissão de mensagem
Outros recursos
Instruções passo a passo do Tempo de Execução de Simultaneidade