Condividi tramite


Procedura dettagliata: creazione di un blocco dei messaggi personalizzato

Questo documento descrive come creare un tipo di blocco di messaggi personalizzato che ordina i messaggi in arrivo in base alla priorità.

Sebbene i tipi di blocchi di messaggi predefiniti forniscano un'ampia gamma di funzionalità, è possibile creare un tipo di blocco di messaggi personalizzato e personalizzarlo per soddisfare i requisiti dell'applicazione. Per una descrizione dei tipi di blocchi di messaggi predefiniti forniti dalla libreria degli agenti asincroni, vedere Blocchi di messaggi asincroni.

Prerequisiti

Leggere i documenti seguenti prima di iniziare questa procedura dettagliata:

Sezioni

Questa procedura dettagliata contiene le sezioni seguenti:

Progettazione di un blocco di messaggi personalizzato

I blocchi di messaggi partecipano all'azione di invio e ricezione di messaggi. Un blocco di messaggi che invia messaggi è noto come blocco di origine. Un blocco di messaggi che riceve messaggi è noto come blocco di destinazione. Un blocco di messaggi che invia e riceve messaggi è noto come blocco propagatore. La libreria agenti usa la classe astratta concurrency::ISource per rappresentare i blocchi di origine e la classe astratta concurrency::ITarget per rappresentare i blocchi di destinazione. I tipi di blocco di messaggi che fungono da origini derivano da ISource; tipi di blocchi di messaggi che fungono da destinazioni derivano da ITarget.

Anche se è possibile derivare il tipo di blocco di messaggi direttamente da ISource e ITarget, la libreria agenti definisce tre classi di base che eseguono gran parte delle funzionalità comuni a tutti i tipi di blocco di messaggi, ad esempio, la gestione degli errori e la connessione dei blocchi di messaggi in modo indipendente dalla concorrenza. La classe concurrency::source_block deriva da ISource e invia messaggi ad altri blocchi. La classe concurrency::target_block deriva da ITarget e riceve messaggi da altri blocchi. La classe concurrency::p ropagator_block deriva da ISource e ITarget invia messaggi ad altri blocchi e riceve messaggi da altri blocchi. È consigliabile usare queste tre classi di base per gestire i dettagli dell'infrastruttura in modo che sia possibile concentrarsi sul comportamento del blocco di messaggi.

Le source_blockclassi , target_blocke propagator_block sono modelli con parametri su un tipo che gestisce le connessioni, o i collegamenti, tra blocchi di origine e di destinazione e su un tipo che gestisce la modalità di elaborazione dei messaggi. La libreria agenti definisce due tipi che eseguono la gestione dei collegamenti, concurrency::single_link_registry e concurrency::multi_link_registry. La single_link_registry classe consente di collegare un blocco di messaggi a un'origine o a una destinazione. La multi_link_registry classe consente di collegare un blocco di messaggi a più origini o a più destinazioni. La libreria agenti definisce una classe che esegue la gestione dei messaggi, concurrency::ordered_message_processor. La ordered_message_processor classe consente ai blocchi di messaggi di elaborare i messaggi nell'ordine in cui li riceve.

Per comprendere meglio il modo in cui i blocchi di messaggi sono correlati alle origini e alle destinazioni, considerare l'esempio seguente. Questo esempio mostra la dichiarazione della classe concurrency::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

La transformer classe deriva da propagator_blocke quindi funge da blocco di origine e come blocco di destinazione. Accetta messaggi di tipo _Input e invia messaggi di tipo _Output. La transformer classe specifica single_link_registry come gestore collegamenti per i blocchi di destinazione e multi_link_registry come gestore dei collegamenti per tutti i blocchi di origine. Pertanto, un transformer oggetto può avere fino a una destinazione e un numero illimitato di origini.

Una classe che deriva da source_block deve implementare sei metodi: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message e resume_propagation. Una classe che deriva da target_block deve implementare il metodo propagate_message e può facoltativamente implementare il metodo send_message . La derivazione da propagator_block è funzionalmente equivalente alla derivazione da source_block e target_block.

Il propagate_to_any_targets metodo viene chiamato dal runtime per elaborare in modo asincrono o sincrono tutti i messaggi in arrivo e propagare eventuali messaggi in uscita. Il accept_message metodo viene chiamato dai blocchi di destinazione per accettare messaggi. Molti tipi di blocchi di messaggi, ad esempio unbounded_buffer, inviano messaggi solo alla prima destinazione che la riceverebbe. Pertanto, trasferisce la proprietà del messaggio alla destinazione. Altri tipi di blocchi di messaggi, ad esempio concurrency::overwrite_buffer, offrono messaggi a ognuno dei relativi blocchi di destinazione. Pertanto, overwrite_buffer crea una copia del messaggio per ognuna delle destinazioni.

I reserve_messagemetodi , consume_message, release_messagee resume_propagation consentono ai blocchi di messaggi di partecipare alla prenotazione dei messaggi. I blocchi di destinazione chiamano il reserve_message metodo quando vengono offerti un messaggio e devono riservare il messaggio per usarlo in un secondo momento. Dopo che un blocco di destinazione riserva un messaggio, può chiamare il consume_message metodo per utilizzare tale messaggio o il release_message metodo per annullare la prenotazione. Come per il accept_message metodo , l'implementazione di consume_message può trasferire la proprietà del messaggio o restituire una copia del messaggio. Dopo che un blocco di destinazione utilizza o rilascia un messaggio riservato, il runtime chiama il resume_propagation metodo . In genere, questo metodo continua la propagazione dei messaggi, a partire dal messaggio successivo nella coda.

Il runtime chiama il propagate_message metodo per trasferire in modo asincrono un messaggio da un altro blocco a quello corrente. Il send_message metodo è simile a propagate_message, ad eccezione del fatto che, in modo sincrono, anziché in modo asincrono, invia il messaggio ai blocchi di destinazione. L'implementazione predefinita di send_message rifiuta tutti i messaggi in arrivo. Il runtime non chiama uno di questi metodi se il messaggio non passa la funzione di filtro facoltativa associata al blocco di destinazione. Per altre informazioni sui filtri dei messaggi, vedere Blocchi messaggi asincroni.

[Torna all'inizio]

Definizione della classe priority_buffer

La priority_buffer classe è un tipo di blocco di messaggi personalizzato che ordina i messaggi in arrivo prima per priorità e quindi in base all'ordine in cui vengono ricevuti i messaggi. La priority_buffer classe è simile alla classe concurrency::unbounded_buffer perché contiene una coda di messaggi e anche perché funge sia da origine che da blocco di messaggi di destinazione e può avere più origini e più destinazioni. Tuttavia, unbounded_buffer basa la propagazione dei messaggi solo sull'ordine in cui riceve i messaggi dalle relative origini.

La priority_buffer classe riceve messaggi di tipo std::tuple che contengono PriorityType elementi e Type . PriorityType fa riferimento al tipo che contiene la priorità di ogni messaggio; Type fa riferimento alla parte dei dati del messaggio. La priority_buffer classe invia messaggi di tipo Type. La priority_buffer classe gestisce anche due code di messaggi: un oggetto std::p riority_queue per i messaggi in arrivo e un oggetto std::queue per i messaggi in uscita. L'ordinamento dei messaggi per priorità è utile quando un priority_buffer oggetto riceve più messaggi contemporaneamente o quando riceve più messaggi prima che i messaggi vengano letti dai consumer.

Oltre ai sette metodi da cui deve essere implementata una classe che deriva da propagator_block , la classe esegue anche l'override priority_buffer dei link_target_notification metodi e send_message . La priority_buffer classe definisce anche due metodi helper pubblici, enqueue e dequeuee un metodo helper privato, propagate_priority_order.

La procedura seguente descrive come implementare la priority_buffer classe .

Per definire la classe priority_buffer

  1. Creare un file di intestazione C++ e denominarlo priority_buffer.h. In alternativa, è possibile usare un file di intestazione esistente che fa parte del progetto.

  2. In priority_buffer.haggiungere il codice seguente.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. Nello spazio dei std nomi definire le specializzazioni di std::less e std::greater che agiscono sugli oggetti concurrency::message .

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    La priority_buffer classe archivia message gli oggetti in un priority_queue oggetto . Queste specializzazioni di tipo consentono alla coda di priorità di ordinare i messaggi in base alla priorità. La priorità è il primo elemento dell'oggetto tuple .

  4. Nello spazio dei concurrencyex nomi dichiarare la priority_buffer classe .

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    La classe priority_buffer deriva da propagator_block. Pertanto, può sia inviare che ricevere messaggi. La priority_buffer classe può avere più destinazioni che ricevono messaggi di tipo Type. Può anche avere più origini che inviano messaggi di tipo tuple<PriorityType, Type>.

  5. private Nella sezione della priority_buffer classe aggiungere le variabili membro seguenti.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    L'oggetto priority_queue contiene messaggi in arrivo; l'oggetto queue contiene messaggi in uscita. Un priority_buffer oggetto può ricevere più messaggi contemporaneamente. L'oggetto sincronizza l'accesso critical_section alla coda di messaggi di input.

  6. private Nella sezione definire il costruttore di copia e l'operatore di assegnazione. In questo modo, gli priority_queue oggetti non possono essere assegnati.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public Nella sezione definire i costruttori comuni a molti tipi di blocchi di messaggi. Definire anche il distruttore.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. public Nella sezione definire i metodi enqueue e dequeue. Questi metodi helper forniscono un modo alternativo per inviare e ricevere messaggi da un priority_buffer oggetto .

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. protected Nella sezione definire il propagate_to_any_targets metodo .

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Il propagate_to_any_targets metodo trasferisce il messaggio che si trova all'inizio della coda di input nella coda di output e propaga tutti i messaggi nella coda di output.

  10. protected Nella sezione definire il accept_message metodo .

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Quando un blocco di destinazione chiama il accept_message metodo , la priority_buffer classe trasferisce la proprietà del messaggio al primo blocco di destinazione che lo accetta. Questo è simile al comportamento di unbounded_buffer.

  11. protected Nella sezione definire il reserve_message metodo .

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    La priority_buffer classe consente a un blocco di destinazione di riservare un messaggio quando l'identificatore del messaggio specificato corrisponde all'identificatore del messaggio che si trova all'inizio della coda. In altre parole, una destinazione può riservare il messaggio se l'oggetto priority_buffer non ha ancora ricevuto un messaggio aggiuntivo e non ha ancora propagato quello corrente.

  12. protected Nella sezione definire il consume_message metodo .

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Un blocco di destinazione chiama consume_message per trasferire la proprietà del messaggio che ha riservato.

  13. protected Nella sezione definire il release_message metodo .

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Un blocco di destinazione chiama release_message per annullare la prenotazione a un messaggio.

  14. protected Nella sezione definire il resume_propagation metodo .

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Il runtime chiama resume_propagation dopo che un blocco di destinazione utilizza o rilascia un messaggio riservato. Questo metodo propaga tutti i messaggi presenti nella coda di output.

  15. protected Nella sezione definire il link_target_notification metodo .

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    La _M_pReservedFor variabile membro è definita dalla classe base , source_block. Questa variabile membro punta al blocco di destinazione, se presente, che contiene una prenotazione al messaggio che si trova all'inizio della coda di output. Il runtime chiama link_target_notification quando una nuova destinazione è collegata all'oggetto priority_buffer . Questo metodo propaga tutti i messaggi presenti nella coda di output se nessuna destinazione contiene una prenotazione.

  16. private Nella sezione definire il propagate_priority_order metodo .

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Questo metodo propaga tutti i messaggi dalla coda di output. Ogni messaggio nella coda viene offerto a ogni blocco di destinazione finché uno dei blocchi di destinazione non accetta il messaggio. La priority_buffer classe mantiene l'ordine dei messaggi in uscita. Pertanto, il primo messaggio nella coda di output deve essere accettato da un blocco di destinazione prima che questo metodo offra qualsiasi altro messaggio ai blocchi di destinazione.

  17. protected Nella sezione definire il propagate_message metodo .

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Il propagate_message metodo consente alla priority_buffer classe di agire come ricevitore di messaggi o come destinazione. Questo metodo riceve il messaggio offerto dal blocco di origine fornito e lo inserisce nella coda di priorità. Il propagate_message metodo invia quindi in modo asincrono tutti i messaggi di output ai blocchi di destinazione.

    Il runtime chiama questo metodo quando si chiama la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.

  18. protected Nella sezione definire il send_message metodo .

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Il send_message metodo è simile a propagate_message. Tuttavia, invia i messaggi di output in modo sincrono anziché in modo asincrono.

    Il runtime chiama questo metodo durante un'operazione di invio sincrono, ad esempio quando si chiama la funzione concurrency::send .

La priority_buffer classe contiene overload del costruttore tipici in molti tipi di blocchi di messaggi. Alcuni overload del costruttore accettano oggetti concurrency::Scheduler o concurrency::ScheduleGroup , che consentono di gestire il blocco di messaggi da un'utilità di pianificazione specifica. Altri overload del costruttore accettano una funzione di filtro. Le funzioni di filtro consentono ai blocchi di messaggi di accettare o rifiutare un messaggio in base al relativo payload. Per altre informazioni sui filtri dei messaggi, vedere Blocchi messaggi asincroni. Per altre informazioni sulle utilità di pianificazione delle attività, vedere Utilità di pianificazione.

Poiché la priority_buffer classe ordina i messaggi in base alla priorità e quindi in base all'ordine in cui vengono ricevuti i messaggi, questa classe è più utile quando riceve messaggi in modo asincrono, ad esempio quando si chiama la funzione concurrency::asend o quando il blocco di messaggi è connesso ad altri blocchi di messaggi.

[Torna all'inizio]

Esempio completo

Nell'esempio seguente viene illustrata la definizione completa della priority_buffer classe .

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

L'esempio seguente esegue simultaneamente una serie di asend operazioni concurrency ::receive su un priority_buffer oggetto .

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

In questo esempio viene generato l'output di esempio seguente.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

La priority_buffer classe ordina i messaggi prima per priorità e quindi in base all'ordine in cui riceve i messaggi. In questo esempio, i messaggi con priorità numerica maggiore vengono inseriti verso la parte anteriore della coda.

[Torna all'inizio]

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollare la definizione della priority_buffer classe in un file denominato priority_buffer.h e il programma di test in un file denominato priority_buffer.cpp e quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Vedi anche

Procedure dettagliate del runtime di concorrenza
Blocchi dei messaggi asincroni
Funzioni di passaggio dei messaggi