Condividi tramite


Procedura dettagliata: creazione di un blocco dei messaggi personalizzato

In questo documento viene descritto come creare un tipo di blocco di messaggi personalizzato che consente di ordinare i messaggi in arrivo in base alla priorità.

Sebbene i tipi di blocchi di messaggi incorporati forniscano una vasta gamma di funzionalità, è possibile creare un tipo di blocco di messaggi e personalizzarlo per soddisfare i requisiti dell'applicazione. Per una descrizione dei tipi di blocchi di messaggi incorporati forniti dalla libreria di agenti asincroni, vedere Blocchi dei messaggi asincroni.

Prerequisiti

Prima di iniziare questa procedura dettagliata, leggere i documenti riportati di seguito.

Sezioni

In questa procedura dettagliata sono contenute le sezioni seguenti:

  • Progettazione di un blocco di messaggi personalizzato

  • Definizione della classe priority_buffer

  • Esempio completo

Progettazione di un blocco di messaggi personalizzato

I blocchi di messaggi vengono utilizzati nelle operazioni di invio e ricezione dei messaggi. Un blocco di messaggi che invia messaggi è definito blocco di origine. Un blocco di messaggi che riceve messaggi è definito blocco di destinazione. Un blocco di messaggi che invia e riceve messaggi è definito blocco di propagazione. La libreria di agenti utilizza la classe astratta concurrency::ISource per rappresentare i blocchi di origine e la classe astratta concurrency::ITarget per rappresentare i blocchi di destinazione. I tipi di blocchi dei messaggi che fungono da origini derivano da ISource, mentre i tipi di blocchi dei messaggi che fungono da destinazioni derivano da ITarget.

Sebbene sia possibile derivare il tipo di blocchi di messaggi direttamente da ISource e da ITarget, la libreria di agenti definisce tre classi di base che eseguono molte delle funzionalità comuni a tutti i tipi di blocchi di messaggi, ad esempio la gestione degli errori e la connessione dei blocchi di messaggi in una modalità indipendente dalla concorrenza. La classe concurrency::source_block deriva da ISource e invia i messaggi ad altri blocchi. La classe concurrency::target_block deriva da ITarget e riceve i messaggi da altri blocchi. La classe concurrency::propagator_block deriva da ISource e ITarget e invia i messaggi ad altri blocchi e li riceve da altri blocchi. È consigliabile utilizzare queste tre classi di base per gestire i dettagli dell'infrastruttura in modo da potersi concentrare sul comportamento del blocco di messaggi.

Le classi source_block, target_block e propagator_block sono modelli contenenti i parametri per il tipo che gestisce le connessioni o i collegamenti tra i blocchi di origine e di destinazione e per il tipo che gestisce la modalità di elaborazione dei messaggi. La libreria di agenti definisce due tipi che eseguono la gestione dei collegamenti, concurrency::single_link_registry e concurrency::multi_link_registry. La classe single_link_registry consente di collegare un blocco di messaggi a una sola origine o a una sola destinazione. La classe multi_link_registry consente di collegare un blocco di messaggi a più origini o a più destinazioni. La libreria di agenti definisce una sola classe che esegue la gestione dei messaggi, concurrency::ordered_message_processor. La classe ordered_message_processor consente ai blocchi di messaggi di elaborare i messaggi nell'ordine in cui vengono ricevuti.

Per comprendere meglio la correlazione tra i blocchi di messaggi e le relative origini e destinazioni, si consideri l'esempio seguente. Nell'esempio viene illustrata la dichiarazione della classe concurrency::transformer.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

La classe transformer deriva da propagator_block e pertanto funge sia da blocco di origine che da blocco di destinazione. Accetta messaggi di tipo _Input e invia messaggi di tipo _Output. La classe transformer specifica single_link_registry come gestore collegamenti per tutti i blocchi di destinazione e multi_link_registry come gestore collegamenti per tutti blocchi di origine. Un oggetto transformer può contenere pertanto una destinazione e un numero illimitato di origini.

Una classe che deriva da source_block deve implementare sei metodi: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation. Una classe che deriva da target_block deve implementare il metodo propagate_message e facoltativamente può implementare il metodo send_message. Dal punto di vista funzionale, la derivazione da propagator_block equivale alla derivazione da source_block e target_block.

Il metodo propagate_to_any_targets viene chiamato dal runtime per elaborare tutti i messaggi in modalità sincrona o asincrona e per propagare all'esterno tutti i messaggi in uscita. Il metodo accept_message viene chiamato dai blocchi di destinazione per accettare i messaggi. Molti tipi di blocchi di messaggi, ad esempio unbounded_buffer, inviano i messaggi solo alla prima destinazione che lo riceve, trasferendo pertanto la proprietà del messaggio alla destinazione. Altri tipi di blocchi di messaggi, ad esempio concurrency::overwrite_buffer, offrono i messaggi a ciascuno dei relativi blocchi di destinazione. overwrite_buffer crea pertanto una copia del messaggio per ciascuna delle relative destinazioni.

I metodi reserve_message, consume_message, release_message e resume_propagation consentono ai blocchi di messaggi di partecipare alla prenotazione dei messaggi. I blocchi di destinazione chiamano il metodo reserve_message quando a essi viene offerto un messaggio e devono prenotare il messaggio per un utilizzo successivo. Dopo la prenotazione di un messaggio, un blocco di destinazione può chiamare il metodo consume_message per utilizzare il messaggio o il metodo release_message per annullare la prenotazione. Analogamente a quanto avviene con il metodo accept_message, l'implementazione di consume_message può trasferire la proprietà del messaggio o restituire una copia del messaggio. Dopo che un blocco di destinazione ha utilizzato o rilasciato un messaggio prenotato, il runtime chiama il metodo resume_propagation. In genere, questo metodo continua la propagazione dei messaggi, a partire dal messaggio successivo della coda.

Il runtime chiama il metodo propagate_message per trasferire in modo asincrono un messaggio da un altro blocco a quello corrente. Il metodo send_message è analogo a propagate_message, ad eccezione del fatto che invia il messaggio ai blocchi di destinazione in modo sincrono anziché in modo asincrono. L'implementazione predefinita di send_message rifiuta tutti i messaggi in arrivo. Il runtime non chiama uno di questi metodi se il messaggio non passa la funzione facoltativa di filtro associata al blocco di destinazione. Per ulteriori informazioni sui filtri dei messaggi, vedere Blocchi dei messaggi asincroni.

[Top]

Definizione della classe priority_buffer

La classe priority_buffer è un tipo di blocco di messaggi personalizzato che consente di ordinare i messaggi in arrivo prima in base alla priorità e poi in base all'ordine di ricezione dei messaggi. La classe priority_buffer è analoga alla classe concurrency::unbounded_buffer perché utilizza una coda di messaggi e anche perché funge da blocco di messaggi sia di origine che di destinazione e può avere sia più origini che più destinazioni. Tuttavia, unbounded_buffer basa la propagazione dei messaggi solo sull'ordine di ricezione dei messaggi dalle relative origini.

La classe priority_buffer riceve i messaggi di tipo std::tuple che contendono gli elementi PriorityType e Type. PriorityType si riferisce al tipo che contiene la priorità di ogni messaggio, mentre Type si riferisce alla parte di dati del messaggio. La classe priority_buffer invia messaggi di tipo Type. La classe priority_buffer gestisce inoltre due code di messaggi: un oggetto std::priority_queue per i messaggi in arrivo e un oggetto std::queue per i messaggi in uscita. L'ordinamento dei messaggi in base alla priorità è utile quando un oggetto priority_buffer riceve contemporaneamente più messaggi o quando riceve più messaggi prima che tutti i messaggi vengano letti dai consumer.

Oltre ai sette metodi che una classe che deriva da propagator_block deve implementare, la classe priority_buffer esegue inoltre l'override dei metodi send_message e link_target_notification. La classe priority_buffer definisce anche due metodi di supporto pubblici, enqueue e dequeue, e un metodo di supporto privato, propagate_priority_order.

Nella procedura seguente viene descritto come implementare la classe priority_buffer.

Per definire la classe priority_buffer

  1. Creare un file di intestazione C++ e denominarlo priority_buffer.h. In alternativa, è possibile utilizzare un file di intestazione esistente che fa parte del progetto.

  2. In priority_buffer.h aggiungere il codice seguente.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. Nello spazio dei nomi std definire le specializzazioni di std::less e std::greater che agiscono sugli oggetti concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    La classe priority_buffer archivia gli oggetti message in un oggetto priority_queue. Queste specializzazioni dei tipi consentono alla coda delle priorità di ordinare i messaggi in base alla priorità. La priorità è il primo elemento dell'oggetto tuple.

  4. Nello spazio dei nomi concurrencyex dichiarare la classe priority_buffer.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    La classe priority_buffer deriva da propagator_block. Pertanto può sia inviare che ricevere messaggi. La classe priority_buffer può avere più destinazioni che ricevono messaggi di tipo Type. Può inoltre avere più origini che inviano messaggi di tipo tuple<PriorityType, Type>.

  5. Nella sezione private della classe priority_buffer aggiungere le variabili membro seguenti.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    L'oggetto priority_queue contiene i messaggi in arrivo, mentre l'oggetto queue contiene i messaggi in uscita. Un oggetto priority_buffer può ricevere contemporaneamente più messaggi, mentre l'oggetto critical_section sincronizza l'accesso alla coda dei messaggi di input.

  6. Nella sezione private definire il costruttore di copia e l'operatore di assegnazione. In questo modo si evita agli oggetti priority_queue di poter essere assegnati.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. Nella sezione public definire i costruttori comuni a molti tipi di blocchi di messaggi. Definire anche il distruttore.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Nella sezione public definire i metodi enqueue e dequeue. Questi metodi di supporto costituiscono un modo alternativo per inviare e ricevere messaggi da un oggetto priority_buffer.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Nella sezione protected definire il metodo propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Il metodo propagate_to_any_targets trasferisce il messaggio che si trova all'inizio della coda di input alla coda di output e propaga all'esterno tutti i messaggi della coda di output.

  10. Nella sezione protected definire il metodo accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Quando un blocco di destinazione chiama il metodo accept_message, la classe priority_buffer trasferisce la proprietà del messaggio al primo blocco di destinazione che lo accetta. Questo comportamento è analogo a quello di unbounded_buffer.

  11. Nella sezione protected definire il metodo reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    La classe priority_buffer consente a un blocco di destinazione di prenotare un messaggio quando l'identificatore del messaggio fornito corrisponde all'identificatore del messaggio che si trova all'inizio della coda. In altre parole, una destinazione può prenotare il messaggio se l'oggetto priority_buffer non ha ancora ricevuto un ulteriore messaggio e non ha ancora propagato all'esterno quello corrente.

  12. Nella sezione protected definire il metodo consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Un blocco di destinazione chiama consume_message per trasferire la proprietà del messaggio prenotato.

  13. Nella sezione protected definire il metodo release_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Un blocco di destinazione chiama release_message per annullare la prenotazione di un messaggio.

  14. Nella sezione protected definire il metodo resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Il runtime chiama il metodo resume_propagation dopo che un blocco di destinazione ha utilizzato o rilasciato un messaggio prenotato. Questo metodo propaga all'esterno tutti i messaggi contenuti nella coda di output.

  15. Nella sezione protected definire il metodo link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    La variabile membro _M_pReservedFor viene definita dalla classe di base, source_block. Questa variabile membro punta al blocco di destinazione, se presente, che contiene una prenotazione del messaggio che si trova all'inizio della coda di output. Il runtime chiama link_target_notification quando una nuova destinazione viene collegata all'oggetto priority_buffer. Questo metodo propaga all'esterno tutti i messaggi presenti nella coda di output se nessuna destinazione contiene una prenotazione.

  16. Nella sezione private definire il metodo propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Questo metodo propaga all'esterno tutti i messaggi dalla coda di output. Ogni messaggio della coda viene offerto a ciascun blocco di destinazione finché uno dei blocchi di destinazione non accetta il messaggio. La classe priority_buffer conserva l'ordine dei messaggi in uscita. È pertanto necessario che il primo messaggio della coda di output venga accettato da un blocco di destinazione prima che il metodo offra un altro messaggio ai blocchi di destinazione.

  17. Nella sezione protected definire il metodo propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Il metodo propagate_message consente alla classe priority_buffer di fungere da destinatario del messaggio o da destinazione. Questo metodo riceve il messaggio offerto dal blocco di origine fornito e inserisce il messaggio nella coda delle priorità. Il metodo propagate_message invia quindi in modo asincrono tutti i messaggi di output ai blocchi di destinazione.

    Il runtime chiama questo metodo quando viene chiamata la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.

  18. Nella sezione protected definire il metodo send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Il metodo send_message è analogo a propagate_message, con la differenza che invia i messaggi di output in modo sincrono anziché in modo asincrono.

    Il runtime chiama questo metodo durante un'operazione di invio sincrona, ad esempio quando viene chiamata la funzione concurrency::send.

La classe priority_buffer contiene gli overload del costruttore che sono tipici di molti tipi di blocchi di messaggi. Alcuni overload del costruttore accettano oggetti concurrency::Scheduler o concurrency::ScheduleGroup che consentono al blocco di messaggi di essere gestito da un'utilità di pianificazione specifica. Altri overload del costruttore accettano una funzione di filtro. Le funzioni di filtro consentono ai blocchi di messaggi di accettare o rifiutare un messaggio in base al relativo payload. Per ulteriori informazioni sui filtri dei messaggi, vedere Blocchi dei messaggi asincroni. Per ulteriori informazioni sulle utilità di pianificazione, vedere Utilità di pianificazione (runtime di concorrenza).

Poiché la classe priority_buffer ordina i messaggi prima in base alla priorità e poi in base all'ordine di ricezione dei messaggi, questa classe è molto utile quando riceve i messaggi in modo asincrono, ad esempio quando si chiama la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.

[Top]

Esempio completo

Nell'esempio seguente viene illustrata la definizione completa della classe priority_buffer.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Nell'esempio seguente vengono eseguite contemporaneamente diverse operazioni asend e concurrency::receive su un oggetto priority_buffer.

// priority_buffer.cpp 
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h" 

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations 
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Questo esempio produce l'output seguente:

  

La classe priority_buffer ordina i messaggi prima in base alla priorità e poi in base all'ordine di ricezione dei messaggi. In questo esempio i messaggi con priorità numerica più alta vengono inseriti all'inizio della coda.

[Top]

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto Visual Studio, o incollare la definizione della classe priority_buffer in un file chiamato priority_buffer.h e il programma di test in un file chiamato priority_buffer.cpp , quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Vedere anche

Concetti

Blocchi dei messaggi asincroni

Funzioni di passaggio dei messaggi

Altre risorse

Procedure dettagliate del runtime di concorrenza