Názorný postup: Vytváření vlastní Message Block
Tento dokument popisuje, jak vytvořit vlastní zpráva typ bloku, který objednávky příchozích zpráv podle priority.
Ačkoli typy bloku předdefinovaných zpráv široký rozsah funkcí, můžete vytvořit vlastní typ bloku zprávy a přizpůsobit požadavkům vaší aplikace.Popis typů bloku předdefinovaných zpráv poskytovaných asynchronní agenti knihovny v Asynchronní bloků zprávy.
Požadavky
Před zahájením tohoto postupu, přečtěte si následující dokumenty:
Oddíly
Tento návod obsahuje následující oddíly:
Navrhování vlastní Message Block
Definování priority_buffer třídy
Kompletní příklad
Navrhování vlastní Message Block
Blokuje zprávy účastnit aktu odesílání a přijímání zpráv.Blok zprávy, které odesílá zprávy je označován jako zdrojového bloku.Blok zprávy, která přijímá zprávy je označován jako cílového bloku.Blok zprávy odesílá a přijímá zprávy je označován jako bloku Šiřitel.Abstraktní třída používá knihovna agenti concurrency::ISource představovat zdroj bloky a abstraktní třída concurrency::ITarget představující cíl bloky.Blok zprávy typy aktu jako zdroje odvozen od ISource; blok zprávy typy aktu jako cíle odvozují z ITarget.
Přestože lze odvodit zprávy typu bloku přímo z ISource a ITarget, agenti knihovny definuje tři základní třídy, které provádějí mnoho funkcí, které jsou společné pro všechny typy zpráv blok, například zpracování chyb a zpráv připojení blokuje společně způsobem bezpečné souběžnosti.Concurrency::source_block je odvozen z třídy ISource a ostatní bloky odesílá zprávy.Concurrency::target_block je odvozen z třídy ITarget a přijímá zprávy z dalších bloků.Concurrency::propagator_block je odvozen z třídy ISource a ITarget a odesílá zprávy do jiné bloky a přijímá zprávy z dalších bloků.Doporučujeme použít tyto tři základní třídy zpracovávat údaje infrastruktury tak, aby se mohli zaměřit na chování message block.
source_block, target_block, A propagator_block třídy jsou šablony, které jsou parametrizované–uživatelé na typ, který spravuje připojení nebo propojení mezi zdrojovou a cílovou bloky a na typu, který spravuje zpracování zpráv.Agenti knihovny definuje dva typy, které provádět správu odkaz concurrency::single_link_registry a concurrency::multi_link_registry.single_link_registry Třídy umožňuje propojení jednoho zdrojového nebo cílového bloku zprávy.multi_link_registry Třídy umožňuje blokovat zprávy propojit více zdrojů nebo více cílů.Knihovna agenti definuje jednu třídu, která provádí správu zpráva concurrency::ordered_message_processor.ordered_message_processor Třídy umožňuje bloků zprávy ke zpracování zpráv v pořadí, ve kterém je obdrží.
Chcete-li lépe pochopit, jak souvisí bloků zprávy jejich zdroje a cíle, zvažte následující příklad.Tento příklad ukazuje prohlášení concurrency::transformer třídy.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer Je odvozen z třídy propagator_blocka proto působí jako obou zdrojového bloku a cílového bloku.Přijímá zprávy typu _Input a odešle zprávy typu _Output.transformer Třídy určuje single_link_registry jako správce odkaz pro cílové bloky a multi_link_registry jako správce propojení zdroje bloky.Proto transformer objekt může obsahovat maximálně jeden cíl a neomezený počet zdrojů.
Třídy, který je odvozen od source_block musí implementovat metody šest: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, a resume_propagation.Třídy, který je odvozen od target_block musí implementovat propagate_message metoda a volitelně lze implementovat send_message metoda.Vyplývající z propagator_block je funkčně ekvivalentní vyplývající z obou source_block a target_block.
propagate_to_any_targets Je volána metoda modulem runtime synchronně nebo asynchronně zpracovávat všechny příchozí zprávy a šíří všechny odchozí zprávy.accept_message Je volána metoda podle cílové bloků pro příjem zpráv.Mnoho typů blok zprávy jako unbounded_buffer, zprávy odeslat pouze první cíl, který by přijímat.Proto převede vlastnictví zprávy k cíli.Typy jiných blok zprávy, například concurrency::overwrite_buffer, nabídnout každému jeho cíl bloky zpráv.Proto overwrite_buffer pro každou jeho cíle vytvoří kopii zprávy.
reserve_message, consume_message, release_message, A resume_propagation povolení metody bloků zprávy účastnit zprávy rezervace.Blokuje cíl volání reserve_message metody jsou nabízeny zprávy a nutné rezervovat zprávu pro pozdější použití.Po rezervy cílový blok zprávy, můžete volat consume_message metody zpracovávat zprávy nebo release_message metoda zrušit rezervaci.Stejně jako accept_message metody, provádění consume_message můžete převést vlastnictví zprávy nebo vrátit kopii zprávy.Po cílového bloku spotřebovává nebo uvolní vyhrazených zpráv, runtime volá resume_propagation metoda.Tato metoda obvykle pokračuje v šíření zpráv, počínaje další zprávu ve frontě.
Runtime volání propagate_message metody asynchronně přenosu zprávy z jiného bloku do aktuální.send_message Metody se podobá propagate_message, synchronně, namísto asynchronně, odesílá zprávy do cílové bloky.Výchozí implementace send_message odmítne všechny příchozí zprávy.Běhový modul nevolá kterékoli z těchto metod Pokud zpráva neprojde přidružený cílový blok filtru volitelné funkce.Další informace o filtrech zprávy, viz Asynchronní bloků zprávy.
Top
Definování priority_buffer třídy
priority_buffer Třídy je typ bloku vlastní zprávu, která objednávky příchozí zprávy nejprve podle priority a podle pořadí, ve kterém jsou zprávy přijímány.priority_buffer Se podobá třídě concurrency::unbounded_buffer třídy, protože obsahuje fronty zpráv a také protože pracuje jako zdroj a cíl blok zprávy a může mít více zdrojů a více cílů.Však unbounded_buffer zpráva základů šíření pouze na objednávku, obdrží zprávy z jeho zdrojů.
priority_buffer Třídy přijímá zprávy typu std::tuple , obsahující PriorityType a Type prvky.PriorityTypeodkazuje na typ, který má prioritu jednotlivých zpráv; Typeodkazuje na data část zprávy.priority_buffer Třídy odesílá zprávy typu Type.priority_buffer Třídy také spravuje dvě fronty zpráv: std::priority_queue objekt pro příchozí zprávy a std::queue objektu pro odchozí zprávy.Pořadí podle priority zprávy je užitečné, pokud priority_buffer objektu současně přijímá více zpráv nebo při přijetí zprávy více než spotřebitelé jsou číst všechny zprávy.
Vedle sedm metod, třídy, která pochází z propagator_block musí implementovat priority_buffer třídy také lokální změny link_target_notification a send_message metod.priority_buffer Třída rovněž definuje dvě metody pomocné veřejné enqueue a dequeuea soukromé pomocné metody, propagate_priority_order.
Následující postup popisuje, jak implementovat priority_buffer třídy.
Definovat třídy priority_buffer
Vytvořte soubor C++ záhlaví s názvem priority_buffer.h.Můžete také použít existující soubor záhlaví, který je součástí projektu.
V priority_buffer.h, přidejte následující kód.
#pragma once #include <agents.h> #include <queue>
V std oboru názvů, definovat specializace z std::less a std::greater , působit na concurrency::message objektů.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer Třídy úložiště message objekty v priority_queue objektu.Povolit tyto specializace typ fronty prioritu řazení zpráv podle jejich priority.Priorita je prvním prvkem tuple objektu.
V concurrency oboru názvů, deklarovat priority_buffer třídy.
namespace concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer Je odvozen z třídy propagator_block.Proto je možné jak odesílat a přijímat zprávy.priority_buffer Třída může mít více cílů, které jsou zprávy typu Type.Můžete mít také více zdrojů, které odesílají zprávy typu tuple<PriorityType, Type>.
V private část priority_buffer třídy, přidejte následující proměnné členů.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
priority_queue Objekt obsahuje příchozí zprávy; queue objekt obsahuje odchozí zprávy.A priority_buffer objektu můžete přijímat více současně; critical_section objektu synchronizuje přístup k frontě vstupní zprávy.
V private oddílu, definovat Kopírovat konstruktor a operátor přiřazení.Tím se zabrání priority_queue objekty z právě Přiřaditelné.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
V public oddíl, definovat konstruktory, které jsou společné mnoha typů blok zprávy.Definujte se objekt.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
V public oddíl, definovat metody enqueue a dequeue.Tyto pomocné metody poskytují alternativní způsob zprávy odesílat a přijímat zprávy z priority_buffer objektu.
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
V protected oddíl, definovat propagate_to_any_targets metoda.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets Metoda převede zprávu, která je na přední straně vstupní fronty do výstupní fronty a šíří všechny zprávy ve frontě výstup.
V protected oddíl, definovat accept_message metoda.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Při volání cílového bloku accept_message metodou, priority_buffer třídu vlastnictví zprávy přenese do první blok cíl, který ji přijímá.(Toto chování se podobá unbounded_buffer.)
V protected oddíl, definovat reserve_message metoda.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
priority_buffer Třídy umožňuje cílového bloku rezervovat zprávu, pokud odpovídá identifikátor zprávy zadaný identifikátor zprávy, která je na začátek fronty.Jinými slovy, cíl můžete rezervovat zprávy Pokud priority_buffer objekt ještě neobdržel další zprávy a má ještě nerozšíří mimo aktuální.
V protected oddíl, definovat consume_message metoda.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Volá cílového bloku consume_message převést vlastnictví zprávu, která je vyhrazena.
V protected oddíl, definovat release_message metoda.
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Volá cílového bloku release_message zrušení rezervace na zprávu.
V protected oddíl, definovat resume_propagation metoda.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Volání runtime resume_propagation po cílového bloku spotřebovává nebo uvolní vyhrazených zpráv.Tato metoda se rozšíří mimo všechny zprávy, které jsou ve frontě výstup.
V protected oddíl, definovat link_target_notification metoda.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor Členské proměnné je definována základní třídou source_block.Tato proměnná členské bodů na cílový blok případné rezervace na zprávu, která je na přední straně výstupní fronty drží.Volání runtime link_target_notification při nový cíl je propojen priority_buffer objektu.Tato metoda se rozšíří mimo všechny zprávy, které jsou ve frontě výstupní, pokud žádný cíl drží rezervace.
V private oddíl, definovat propagate_priority_order metoda.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Tato metoda se šíří všechny zprávy z fronty výstupu.Všechny zprávy ve frontě nabídnuty každý cílový blok dokud jeden z bloků cíl přijímá zprávy.priority_buffer Třídy zachovává pořadí odchozích zpráv.Proto první zprávu ve frontě výstupu musí uznat cílového bloku před tato metoda nabízí všechny zprávy do cílové bloky.
V protected oddíl, definovat propagate_message metoda.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message Metoda umožňuje priority_buffer třídy jako příjemce zprávy nebo cíl.Tato metoda obdrží zprávu, která nabízejí poskytnutého zdrojového bloku a vloží do fronty priority zprávy.propagate_message Metoda poté asynchronně odešle všechny výstupní zprávy Cíl bloky.
Runtime volá tuto metodu při volání concurrency::asend funkci nebo při připojení blok zprávy do jiných bloků zprávy.
V protected oddíl, definovat send_message metoda.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message Metody se podobá propagate_message.Avšak odešle výstup zprávy synchronně namísto asynchronně.
Runtime volá v průběhu operace Odeslat synchronní, například při volání této metody concurrency::send funkce.
priority_buffer Třída obsahuje konstruktor přetížení, které jsou obvyklé v mnoha typech blok zprávy.Některé konstruktor přetížení vzít concurrency::Scheduler nebo concurrency::ScheduleGroup objekty, které umožňují blokovat zprávy spravovat specifické úlohy plánovačem.Přijmout další přetížení konstruktoru funkce filtru.Povolit funkce filtr blokuje zprávu přijmout nebo odmítnout zprávu na základě své datové části.Další informace o filtrech zprávy, viz Asynchronní bloků zprávy.Další informace o plánovače úloh, viz Plánovač úloh (souběžnosti Runtime).
Protože priority_buffer třída objednávky podle priority zprávy a potom podle pořadí, ve kterém jsou zprávy přijímány, této třídy je nejužitečnější při obdržení zprávy asynchronně, například při volání concurrency::asend funkci nebo při připojení blok zprávy do jiných bloků zprávy.
Top
Kompletní příklad
Následující příklad ukazuje úplnou definici priority_buffer třídy.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Následující příklad provádí souběžně počet asend a concurrency::receive operace priority_buffer objektu.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Tento příklad vytvoří následující ukázkový výstup.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
priority_buffer Třída objednávky zprávy nejprve podle priority a podle pořadí, ve kterém ji přijímá zprávy.V tomto příkladu jsou vloženy zprávy s větší číselnou prioritu zpředu fronty.
Top
Probíhá kompilace kódu
Příklad kódu kopírovat a vložit v projektu Visual Studio nebo vložit definici priority_buffer třídy v souboru s názvem priority_buffer.h a testovací program v souboru s názvem priority_buffer.cpp a spusťte následující příkaz v okně příkazového řádku Visual Studio.
cl.exe /EHsc priority_buffer.cpp