Procedura dettagliata: creazione di un blocco dei messaggi personalizzato
Questo documento descrive come creare un tipo di blocco di messaggi personalizzato che ordina i messaggi in arrivo in base alla priorità.
Sebbene i tipi di blocchi di messaggi predefiniti forniscano un'ampia gamma di funzionalità, è possibile creare un tipo di blocco di messaggi personalizzato e personalizzarlo per soddisfare i requisiti dell'applicazione. Per una descrizione dei tipi di blocchi di messaggi predefiniti forniti dalla libreria degli agenti asincroni, vedere Blocchi di messaggi asincroni.
Prerequisiti
Leggere i documenti seguenti prima di iniziare questa procedura dettagliata:
Sezioni
Questa procedura dettagliata contiene le sezioni seguenti:
Progettazione di un blocco di messaggi personalizzato
I blocchi di messaggi partecipano all'azione di invio e ricezione di messaggi. Un blocco di messaggi che invia messaggi è noto come blocco di origine. Un blocco di messaggi che riceve messaggi è noto come blocco di destinazione. Un blocco di messaggi che invia e riceve messaggi è noto come blocco propagatore. La libreria agenti usa la classe astratta concurrency::ISource per rappresentare i blocchi di origine e la classe astratta concurrency::ITarget per rappresentare i blocchi di destinazione. I tipi di blocco di messaggi che fungono da origini derivano da ISource
; tipi di blocchi di messaggi che fungono da destinazioni derivano da ITarget
.
Anche se è possibile derivare il tipo di blocco di messaggi direttamente da ISource
e ITarget
, la libreria agenti definisce tre classi di base che eseguono gran parte delle funzionalità comuni a tutti i tipi di blocco di messaggi, ad esempio, la gestione degli errori e la connessione dei blocchi di messaggi in modo indipendente dalla concorrenza. La classe concurrency::source_block deriva da ISource
e invia messaggi ad altri blocchi. La classe concurrency::target_block deriva da ITarget
e riceve messaggi da altri blocchi. La classe concurrency::p ropagator_block deriva da ISource
e ITarget
invia messaggi ad altri blocchi e riceve messaggi da altri blocchi. È consigliabile usare queste tre classi di base per gestire i dettagli dell'infrastruttura in modo che sia possibile concentrarsi sul comportamento del blocco di messaggi.
Le source_block
classi , target_block
e propagator_block
sono modelli con parametri su un tipo che gestisce le connessioni, o i collegamenti, tra blocchi di origine e di destinazione e su un tipo che gestisce la modalità di elaborazione dei messaggi. La libreria agenti definisce due tipi che eseguono la gestione dei collegamenti, concurrency::single_link_registry e concurrency::multi_link_registry. La single_link_registry
classe consente di collegare un blocco di messaggi a un'origine o a una destinazione. La multi_link_registry
classe consente di collegare un blocco di messaggi a più origini o a più destinazioni. La libreria agenti definisce una classe che esegue la gestione dei messaggi, concurrency::ordered_message_processor. La ordered_message_processor
classe consente ai blocchi di messaggi di elaborare i messaggi nell'ordine in cui li riceve.
Per comprendere meglio il modo in cui i blocchi di messaggi sono correlati alle origini e alle destinazioni, considerare l'esempio seguente. Questo esempio mostra la dichiarazione della classe concurrency::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La transformer
classe deriva da propagator_block
e quindi funge da blocco di origine e come blocco di destinazione. Accetta messaggi di tipo _Input
e invia messaggi di tipo _Output
. La transformer
classe specifica single_link_registry
come gestore collegamenti per i blocchi di destinazione e multi_link_registry
come gestore dei collegamenti per tutti i blocchi di origine. Pertanto, un transformer
oggetto può avere fino a una destinazione e un numero illimitato di origini.
Una classe che deriva da source_block
deve implementare sei metodi: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation. Una classe che deriva da target_block
deve implementare il metodo propagate_message e può facoltativamente implementare il metodo send_message . La derivazione da propagator_block
è funzionalmente equivalente alla derivazione da source_block
e target_block
.
Il propagate_to_any_targets
metodo viene chiamato dal runtime per elaborare in modo asincrono o sincrono tutti i messaggi in arrivo e propagare eventuali messaggi in uscita. Il accept_message
metodo viene chiamato dai blocchi di destinazione per accettare messaggi. Molti tipi di blocchi di messaggi, ad esempio unbounded_buffer
, inviano messaggi solo alla prima destinazione che la riceverebbe. Pertanto, trasferisce la proprietà del messaggio alla destinazione. Altri tipi di blocchi di messaggi, ad esempio concurrency::overwrite_buffer, offrono messaggi a ognuno dei relativi blocchi di destinazione. Pertanto, overwrite_buffer
crea una copia del messaggio per ognuna delle destinazioni.
I reserve_message
metodi , consume_message
, release_message
e resume_propagation
consentono ai blocchi di messaggi di partecipare alla prenotazione dei messaggi. I blocchi di destinazione chiamano il reserve_message
metodo quando vengono offerti un messaggio e devono riservare il messaggio per usarlo in un secondo momento. Dopo che un blocco di destinazione riserva un messaggio, può chiamare il consume_message
metodo per utilizzare tale messaggio o il release_message
metodo per annullare la prenotazione. Come per il accept_message
metodo , l'implementazione di consume_message
può trasferire la proprietà del messaggio o restituire una copia del messaggio. Dopo che un blocco di destinazione utilizza o rilascia un messaggio riservato, il runtime chiama il resume_propagation
metodo . In genere, questo metodo continua la propagazione dei messaggi, a partire dal messaggio successivo nella coda.
Il runtime chiama il propagate_message
metodo per trasferire in modo asincrono un messaggio da un altro blocco a quello corrente. Il send_message
metodo è simile a propagate_message
, ad eccezione del fatto che, in modo sincrono, anziché in modo asincrono, invia il messaggio ai blocchi di destinazione. L'implementazione predefinita di send_message
rifiuta tutti i messaggi in arrivo. Il runtime non chiama uno di questi metodi se il messaggio non passa la funzione di filtro facoltativa associata al blocco di destinazione. Per altre informazioni sui filtri dei messaggi, vedere Blocchi messaggi asincroni.
Definizione della classe priority_buffer
La priority_buffer
classe è un tipo di blocco di messaggi personalizzato che ordina i messaggi in arrivo prima per priorità e quindi in base all'ordine in cui vengono ricevuti i messaggi. La priority_buffer
classe è simile alla classe concurrency::unbounded_buffer perché contiene una coda di messaggi e anche perché funge sia da origine che da blocco di messaggi di destinazione e può avere più origini e più destinazioni. Tuttavia, unbounded_buffer
basa la propagazione dei messaggi solo sull'ordine in cui riceve i messaggi dalle relative origini.
La priority_buffer
classe riceve messaggi di tipo std::tuple che contengono PriorityType
elementi e Type
. PriorityType
fa riferimento al tipo che contiene la priorità di ogni messaggio; Type
fa riferimento alla parte dei dati del messaggio. La priority_buffer
classe invia messaggi di tipo Type
. La priority_buffer
classe gestisce anche due code di messaggi: un oggetto std::p riority_queue per i messaggi in arrivo e un oggetto std::queue per i messaggi in uscita. L'ordinamento dei messaggi per priorità è utile quando un priority_buffer
oggetto riceve più messaggi contemporaneamente o quando riceve più messaggi prima che i messaggi vengano letti dai consumer.
Oltre ai sette metodi da cui deve essere implementata una classe che deriva da propagator_block
, la classe esegue anche l'override priority_buffer
dei link_target_notification
metodi e send_message
. La priority_buffer
classe definisce anche due metodi helper pubblici, enqueue
e dequeue
e un metodo helper privato, propagate_priority_order
.
La procedura seguente descrive come implementare la priority_buffer
classe .
Per definire la classe priority_buffer
Creare un file di intestazione C++ e denominarlo
priority_buffer.h
. In alternativa, è possibile usare un file di intestazione esistente che fa parte del progetto.In
priority_buffer.h
aggiungere il codice seguente.#pragma once #include <agents.h> #include <queue>
Nello spazio dei
std
nomi definire le specializzazioni di std::less e std::greater che agiscono sugli oggetti concurrency::message .namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La
priority_buffer
classe archiviamessage
gli oggetti in unpriority_queue
oggetto . Queste specializzazioni di tipo consentono alla coda di priorità di ordinare i messaggi in base alla priorità. La priorità è il primo elemento dell'oggettotuple
.Nello spazio dei
concurrencyex
nomi dichiarare lapriority_buffer
classe .namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La classe
priority_buffer
deriva dapropagator_block
. Pertanto, può sia inviare che ricevere messaggi. Lapriority_buffer
classe può avere più destinazioni che ricevono messaggi di tipoType
. Può anche avere più origini che inviano messaggi di tipotuple<PriorityType, Type>
.private
Nella sezione dellapriority_buffer
classe aggiungere le variabili membro seguenti.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
L'oggetto
priority_queue
contiene messaggi in arrivo; l'oggettoqueue
contiene messaggi in uscita. Unpriority_buffer
oggetto può ricevere più messaggi contemporaneamente. L'oggetto sincronizza l'accessocritical_section
alla coda di messaggi di input.private
Nella sezione definire il costruttore di copia e l'operatore di assegnazione. In questo modo, glipriority_queue
oggetti non possono essere assegnati.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public
Nella sezione definire i costruttori comuni a molti tipi di blocchi di messaggi. Definire anche il distruttore.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public
Nella sezione definire i metodienqueue
edequeue
. Questi metodi helper forniscono un modo alternativo per inviare e ricevere messaggi da unpriority_buffer
oggetto .// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected
Nella sezione definire ilpropagate_to_any_targets
metodo .// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Il
propagate_to_any_targets
metodo trasferisce il messaggio che si trova all'inizio della coda di input nella coda di output e propaga tutti i messaggi nella coda di output.protected
Nella sezione definire ilaccept_message
metodo .// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Quando un blocco di destinazione chiama il
accept_message
metodo , lapriority_buffer
classe trasferisce la proprietà del messaggio al primo blocco di destinazione che lo accetta. Questo è simile al comportamento diunbounded_buffer
.protected
Nella sezione definire ilreserve_message
metodo .// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La
priority_buffer
classe consente a un blocco di destinazione di riservare un messaggio quando l'identificatore del messaggio specificato corrisponde all'identificatore del messaggio che si trova all'inizio della coda. In altre parole, una destinazione può riservare il messaggio se l'oggettopriority_buffer
non ha ancora ricevuto un messaggio aggiuntivo e non ha ancora propagato quello corrente.protected
Nella sezione definire ilconsume_message
metodo .// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un blocco di destinazione chiama
consume_message
per trasferire la proprietà del messaggio che ha riservato.protected
Nella sezione definire ilrelease_message
metodo .// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un blocco di destinazione chiama
release_message
per annullare la prenotazione a un messaggio.protected
Nella sezione definire ilresume_propagation
metodo .// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Il runtime chiama
resume_propagation
dopo che un blocco di destinazione utilizza o rilascia un messaggio riservato. Questo metodo propaga tutti i messaggi presenti nella coda di output.protected
Nella sezione definire illink_target_notification
metodo .// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La
_M_pReservedFor
variabile membro è definita dalla classe base ,source_block
. Questa variabile membro punta al blocco di destinazione, se presente, che contiene una prenotazione al messaggio che si trova all'inizio della coda di output. Il runtime chiamalink_target_notification
quando una nuova destinazione è collegata all'oggettopriority_buffer
. Questo metodo propaga tutti i messaggi presenti nella coda di output se nessuna destinazione contiene una prenotazione.private
Nella sezione definire ilpropagate_priority_order
metodo .// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Questo metodo propaga tutti i messaggi dalla coda di output. Ogni messaggio nella coda viene offerto a ogni blocco di destinazione finché uno dei blocchi di destinazione non accetta il messaggio. La
priority_buffer
classe mantiene l'ordine dei messaggi in uscita. Pertanto, il primo messaggio nella coda di output deve essere accettato da un blocco di destinazione prima che questo metodo offra qualsiasi altro messaggio ai blocchi di destinazione.protected
Nella sezione definire ilpropagate_message
metodo .// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Il
propagate_message
metodo consente allapriority_buffer
classe di agire come ricevitore di messaggi o come destinazione. Questo metodo riceve il messaggio offerto dal blocco di origine fornito e lo inserisce nella coda di priorità. Ilpropagate_message
metodo invia quindi in modo asincrono tutti i messaggi di output ai blocchi di destinazione.Il runtime chiama questo metodo quando si chiama la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.
protected
Nella sezione definire ilsend_message
metodo .// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Il
send_message
metodo è simile apropagate_message
. Tuttavia, invia i messaggi di output in modo sincrono anziché in modo asincrono.Il runtime chiama questo metodo durante un'operazione di invio sincrono, ad esempio quando si chiama la funzione concurrency::send .
La priority_buffer
classe contiene overload del costruttore tipici in molti tipi di blocchi di messaggi. Alcuni overload del costruttore accettano oggetti concurrency::Scheduler o concurrency::ScheduleGroup , che consentono di gestire il blocco di messaggi da un'utilità di pianificazione specifica. Altri overload del costruttore accettano una funzione di filtro. Le funzioni di filtro consentono ai blocchi di messaggi di accettare o rifiutare un messaggio in base al relativo payload. Per altre informazioni sui filtri dei messaggi, vedere Blocchi messaggi asincroni. Per altre informazioni sulle utilità di pianificazione delle attività, vedere Utilità di pianificazione.
Poiché la priority_buffer
classe ordina i messaggi in base alla priorità e quindi in base all'ordine in cui vengono ricevuti i messaggi, questa classe è più utile quando riceve messaggi in modo asincrono, ad esempio quando si chiama la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.
Esempio completo
Nell'esempio seguente viene illustrata la definizione completa della priority_buffer
classe .
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
L'esempio seguente esegue simultaneamente una serie di asend
operazioni concurrency ::receive su un priority_buffer
oggetto .
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
In questo esempio viene generato l'output di esempio seguente.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
La priority_buffer
classe ordina i messaggi prima per priorità e quindi in base all'ordine in cui riceve i messaggi. In questo esempio, i messaggi con priorità numerica maggiore vengono inseriti verso la parte anteriore della coda.
Compilazione del codice
Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollare la definizione della priority_buffer
classe in un file denominato priority_buffer.h
e il programma di test in un file denominato priority_buffer.cpp
e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Vedi anche
Procedure dettagliate del runtime di concorrenza
Blocchi dei messaggi asincroni
Funzioni di passaggio dei messaggi