Procedura dettagliata: creazione di un blocco dei messaggi personalizzato
In questo documento viene descritto come creare un tipo di blocco di messaggi personalizzato che consente di ordinare i messaggi in arrivo in base alla priorità.
Sebbene i tipi di blocchi di messaggi incorporati forniscano una vasta gamma di funzionalità, è possibile creare un tipo di blocco di messaggi e personalizzarlo per soddisfare i requisiti dell'applicazione.Per una descrizione dei tipi di blocchi di messaggi incorporati forniti dalla libreria di agenti asincroni, vedere Blocchi dei messaggi asincroni.
Prerequisiti
Prima di iniziare questa procedura dettagliata, leggere i documenti riportati di seguito.
Sezioni
In questa procedura dettagliata sono contenute le sezioni seguenti:
Progettazione di un blocco di messaggi personalizzato
Definizione della classe priority_buffer
Esempio completo
Progettazione di un blocco di messaggi personalizzato
I blocchi di messaggi vengono utilizzati nelle operazioni di invio e ricezione dei messaggi.Un blocco di messaggi che invia messaggi è definito blocco di origine.Un blocco di messaggi che riceve messaggi è definito blocco di destinazione.Un blocco di messaggi che invia e riceve messaggi è definito blocco di propagazione.La libreria di agenti utilizza la classe astratta concurrency::ISource per rappresentare i blocchi di codice sorgente e la classe astratta concurrency::ITarget per rappresentare i blocchi di destinazione.I tipi di blocchi dei messaggi che fungono da origini derivano da ISource, mentre i tipi di blocchi dei messaggi che fungono da destinazioni derivano da ITarget.
Sebbene sia possibile derivare il tipo di blocchi di messaggi direttamente da ISource e da ITarget, la libreria di agenti definisce tre classi di base che eseguono molte delle funzionalità comuni a tutti i tipi di blocchi di messaggi, ad esempio la gestione degli errori e la connessione dei blocchi di messaggi in una modalità indipendente dalla concorrenza.Il concurrency::source_block la classe deriva da ISource e invia messaggi di altri blocchi.Il concurrency::target_block la classe deriva da ITarget e riceve messaggi da altri blocchi.Il concurrency::propagator_block la classe deriva da ISource e ITarget e invia messaggi ad altri blocchi e riceve i messaggi da altri blocchi.È consigliabile utilizzare queste tre classi di base per gestire i dettagli dell'infrastruttura in modo da potersi concentrare sul comportamento del blocco di messaggi.
Le classi source_block, target_block e propagator_block sono modelli contenenti i parametri per il tipo che gestisce le connessioni o i collegamenti tra i blocchi di origine e di destinazione e per il tipo che gestisce la modalità di elaborazione dei messaggi.La libreria di agenti definisce due tipi che consentono di eseguire la gestione del collegamento, concurrency::single_link_registry e concurrency::multi_link_registry.La classe single_link_registry consente di collegare un blocco di messaggi a una sola origine o a una sola destinazione.La classe multi_link_registry consente di collegare un blocco di messaggi a più origini o a più destinazioni.La libreria di agenti definisce una classe che esegue la gestione dei messaggi, concurrency::ordered_message_processor.La classe ordered_message_processor consente ai blocchi di messaggi di elaborare i messaggi nell'ordine in cui vengono ricevuti.
Per comprendere meglio la correlazione tra i blocchi di messaggi e le relative origini e destinazioni, si consideri l'esempio seguente.In questo esempio viene illustrata la dichiarazione del concurrency::transformer classe.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
La classe transformer deriva da propagator_block e pertanto funge sia da blocco di origine che da blocco di destinazione.Accetta messaggi di tipo _Input e invia messaggi di tipo _Output.La classe transformer specifica single_link_registry come gestore collegamenti per tutti i blocchi di destinazione e multi_link_registry come gestore collegamenti per tutti blocchi di origine.Un oggetto transformer può contenere pertanto una destinazione e un numero illimitato di origini.
Una classe che deriva da source_block deve implementare sei metodi: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation.Una classe che deriva da target_block deve implementare il metodo propagate_message e facoltativamente può implementare il metodo send_message.Dal punto di vista funzionale, la derivazione da propagator_block equivale alla derivazione da source_block e target_block.
Il metodo propagate_to_any_targets viene chiamato dal runtime per elaborare tutti i messaggi in modalità sincrona o asincrona e per propagare all'esterno tutti i messaggi in uscita.Il metodo accept_message viene chiamato dai blocchi di destinazione per accettare i messaggi.Molti tipi di blocchi di messaggi, ad esempio unbounded_buffer, inviano i messaggi solo alla prima destinazione che lo riceve,trasferendo pertanto la proprietà del messaggio alla destinazione.Tipi di blocco altri messaggi, ad esempio concurrency::overwrite_buffer, offrono i messaggi a ciascuno dei relativi blocchi di destinazione.overwrite_buffer crea pertanto una copia del messaggio per ciascuna delle relative destinazioni.
I metodi reserve_message, consume_message, release_message e resume_propagation consentono ai blocchi di messaggi di partecipare alla prenotazione dei messaggi.I blocchi di destinazione chiamano il metodo reserve_message quando a essi viene offerto un messaggio e devono prenotare il messaggio per un utilizzo successivo.Dopo la prenotazione di un messaggio, un blocco di destinazione può chiamare il metodo consume_message per utilizzare il messaggio o il metodo release_message per annullare la prenotazione.Analogamente a quanto avviene con il metodo accept_message, l'implementazione di consume_message può trasferire la proprietà del messaggio o restituire una copia del messaggio.Dopo che un blocco di destinazione ha utilizzato o rilasciato un messaggio prenotato, il runtime chiama il metodo resume_propagation.In genere, questo metodo continua la propagazione dei messaggi, a partire dal messaggio successivo della coda.
Il runtime chiama il metodo propagate_message per trasferire in modo asincrono un messaggio da un altro blocco a quello corrente.Il metodo send_message è analogo a propagate_message, ad eccezione del fatto che invia il messaggio ai blocchi di destinazione in modo sincrono anziché in modo asincrono.L'implementazione predefinita di send_message rifiuta tutti i messaggi in arrivo.Il runtime non chiama uno di questi metodi se il messaggio non passa la funzione facoltativa di filtro associata al blocco di destinazione.Per ulteriori informazioni sui filtri dei messaggi, vedere Blocchi dei messaggi asincroni.
Top
Definizione della classe priority_buffer
La classe priority_buffer è un tipo di blocco di messaggi personalizzato che consente di ordinare i messaggi in arrivo prima in base alla priorità e poi in base all'ordine di ricezione dei messaggi.Il priority_buffer classe è simile al concurrency::unbounded_buffer classe perché contiene una coda di messaggi e inoltre perché funge da un'origine e un blocco del messaggio di destinazione e può disporre di più origini e destinazioni multiple.Tuttavia, unbounded_buffer basa la propagazione dei messaggi solo sull'ordine di ricezione dei messaggi dalle relative origini.
La classe priority_buffer riceve i messaggi di tipo std::tuple che contendono gli elementi PriorityType e Type.PriorityType si riferisce al tipo che contiene la priorità di ogni messaggio, mentre Type si riferisce alla parte di dati del messaggio.La classe priority_buffer invia messaggi di tipo Type.La classe priority_buffer gestisce inoltre due code di messaggi: un oggetto std::priority_queue per i messaggi in arrivo e un oggetto std::queue per i messaggi in uscita.L'ordinamento dei messaggi in base alla priorità è utile quando un oggetto priority_buffer riceve contemporaneamente più messaggi o quando riceve più messaggi prima che tutti i messaggi vengano letti dai consumer.
Oltre ai sette metodi che una classe che deriva da propagator_block deve implementare, la classe priority_buffer esegue inoltre l'override dei metodi send_message e link_target_notification.La classe priority_buffer definisce anche due metodi di supporto pubblici, enqueue e dequeue, e un metodo di supporto privato, propagate_priority_order.
Nella procedura seguente viene descritto come implementare la classe priority_buffer.
Per definire la classe priority_buffer
Creare un file di intestazione C++ e denominarlo priority_buffer.h.In alternativa, è possibile utilizzare un file di intestazione esistente che fa parte del progetto.
In priority_buffer.h aggiungere il codice seguente.
#pragma once #include <agents.h> #include <queue>
Nel std spazio dei nomi, definire specializzazioni di std::less e std::greater che agiscono su concurrency::message oggetti.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
La classe priority_buffer archivia gli oggetti message in un oggetto priority_queue.Queste specializzazioni dei tipi consentono alla coda delle priorità di ordinare i messaggi in base alla priorità.La priorità è il primo elemento dell'oggetto tuple.
Nello spazio dei nomi concurrency dichiarare la classe priority_buffer.
namespace concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
La classe priority_buffer deriva da propagator_block.Pertanto può sia inviare che ricevere messaggi.La classe priority_buffer può avere più destinazioni che ricevono messaggi di tipo Type.Può inoltre avere più origini che inviano messaggi di tipo tuple<PriorityType, Type>.
Nella sezione private della classe priority_buffer aggiungere le variabili membro seguenti.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
L'oggetto priority_queue contiene i messaggi in arrivo, mentre l'oggetto queue contiene i messaggi in uscita.Un oggetto priority_buffer può ricevere contemporaneamente più messaggi, mentre l'oggetto critical_section sincronizza l'accesso alla coda dei messaggi di input.
Nella sezione private definire il costruttore di copia e l'operatore di assegnazione.In questo modo si evita agli oggetti priority_queue di poter essere assegnati.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Nella sezione public definire i costruttori comuni a molti tipi di blocchi di messaggi.Definire anche il distruttore.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Nella sezione public definire i metodi enqueue e dequeue.Questi metodi di supporto costituiscono un modo alternativo per inviare e ricevere messaggi da un oggetto priority_buffer.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Nella sezione protected definire il metodo propagate_to_any_targets.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Il metodo propagate_to_any_targets trasferisce il messaggio che si trova all'inizio della coda di input alla coda di output e propaga all'esterno tutti i messaggi della coda di output.
Nella sezione protected definire il metodo accept_message.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Quando un blocco di destinazione chiama il metodo accept_message, la classe priority_buffer trasferisce la proprietà del messaggio al primo blocco di destinazione che lo accetta.Questo comportamento è analogo a quello di unbounded_buffer.
Nella sezione protected definire il metodo reserve_message.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
La classe priority_buffer consente a un blocco di destinazione di prenotare un messaggio quando l'identificatore del messaggio fornito corrisponde all'identificatore del messaggio che si trova all'inizio della coda.In altre parole, è possibile prenotare il messaggio di destinazione se il priority_buffer oggetto non ha ancora ricevuto un ulteriore messaggio e non è ancora propagata da quella corrente.
Nella sezione protected definire il metodo consume_message.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Un blocco di destinazione chiama consume_message per trasferire la proprietà del messaggio prenotato.
Nella sezione protected definire il metodo release_message.
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Un blocco di destinazione chiama release_message per annullare la prenotazione di un messaggio.
Nella sezione protected definire il metodo resume_propagation.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Il runtime chiama il metodo resume_propagation dopo che un blocco di destinazione ha utilizzato o rilasciato un messaggio prenotato.Questo metodo propaga all'esterno tutti i messaggi contenuti nella coda di output.
Nella sezione protected definire il metodo link_target_notification.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
La variabile membro _M_pReservedFor viene definita dalla classe di base, source_block.Questa variabile membro punta al blocco di destinazione, se presente, che contiene una prenotazione del messaggio che si trova all'inizio della coda di output.Il runtime chiama link_target_notification quando una nuova destinazione viene collegata all'oggetto priority_buffer.Questo metodo propaga all'esterno tutti i messaggi presenti nella coda di output se nessuna destinazione contiene una prenotazione.
Nella sezione private definire il metodo propagate_priority_order.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Questo metodo propaga all'esterno tutti i messaggi dalla coda di output.Ogni messaggio della coda viene offerto a ciascun blocco di destinazione finché uno dei blocchi di destinazione non accetta il messaggio.La classe priority_buffer conserva l'ordine dei messaggi in uscita.È pertanto necessario che il primo messaggio della coda di output venga accettato da un blocco di destinazione prima che il metodo offra un altro messaggio ai blocchi di destinazione.
Nella sezione protected definire il metodo propagate_message.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Il metodo propagate_message consente alla classe priority_buffer di fungere da destinatario del messaggio o da destinazione.Questo metodo riceve il messaggio offerto dal blocco di origine fornito e inserisce il messaggio nella coda delle priorità.Il metodo propagate_message invia quindi in modo asincrono tutti i messaggi di output ai blocchi di destinazione.
Common Language runtime chiama questo metodo quando si chiama il concurrency::asend funzione o altri blocchi di messaggio quando è connesso il blocco del messaggio.
Nella sezione protected definire il metodo send_message.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Il metodo send_message è analogo a propagate_message,con la differenza che invia i messaggi di output in modo sincrono anziché in modo asincrono.
Common Language runtime chiama questo metodo durante un'operazione di invio sincrono, ad esempio quando si chiama il concurrency::send funzione.
La classe priority_buffer contiene gli overload del costruttore che sono tipici di molti tipi di blocchi di messaggi.Alcuni costruttore overloads take concurrency::Scheduler o concurrency::ScheduleGroup gli oggetti, che consentono il blocco di messaggi gestiti da un'utilità di pianificazione di attività specifiche.Altri overload del costruttore accettano una funzione di filtro.Le funzioni di filtro consentono ai blocchi di messaggi di accettare o rifiutare un messaggio in base al relativo payload.Per ulteriori informazioni sui filtri dei messaggi, vedere Blocchi dei messaggi asincroni.Per ulteriori informazioni sulle utilità di pianificazione, vedere Utilità di pianificazione (runtime di concorrenza).
Poiché il priority_buffer classe ordini i messaggi in base alla priorità e quindi dall'ordine in cui i messaggi vengono ricevuti, questa classe è particolarmente utile quando riceve i messaggi in modo asincrono, ad esempio, quando si chiama il concurrency::asend funzione o altri blocchi di messaggio quando è connesso il blocco del messaggio.
Top
Esempio completo
Nell'esempio seguente viene illustrata la definizione completa della classe priority_buffer.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Nell'esempio riportato di seguito consente di eseguire simultaneamente un numero di asend e concurrency::receive le operazioni su un priority_buffer oggetto.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Questo esempio produce l'output seguente:
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
La classe priority_buffer ordina i messaggi prima in base alla priorità e poi in base all'ordine di ricezione dei messaggi.In questo esempio i messaggi con priorità numerica più alta vengono inseriti all'inizio della coda.
Top
Compilazione del codice
Copiare il codice di esempio e incollarlo in un progetto di Visual Studio o incollare la definizione del priority_buffer classe in un file denominato priority_buffer.h e il programma di test in un file denominato priority_buffer.cpp e quindi eseguire il comando riportato di seguito in una finestra del prompt dei comandi di Visual Studio.
cl.exe /EHsc priority_buffer.cpp
Vedere anche
Concetti
Blocchi dei messaggi asincroni
Funzioni di passaggio dei messaggi