チュートリアル: カスタム メッセージ ブロックの作成
ここでは、受信メッセージを優先順位に従って並べるカスタム メッセージ ブロックの型を作成する方法について説明します。
組み込みのメッセージ ブロックの型には幅広い機能が備わっていますが、独自のメッセージ ブロックの型を作成して、アプリケーションの要件を満たすようにカスタマイズすることもできます。 非同期エージェント ライブラリに用意されている組み込みのメッセージ ブロックの型については、「非同期メッセージ ブロック」をご覧ください。
前提条件
このチュートリアルを開始する前に、次のドキュメントを参照してください。
セクション
このチュートリアルは、次のセクションで構成されています。
カスタム メッセージ ブロックの設計
メッセージ ブロックは、メッセージの送受信処理に参加します。 メッセージを送信するメッセージ ブロックは ソース ブロック と呼ばれます。 メッセージを受信するメッセージ ブロックは ターゲット ブロック と呼ばれます。 メッセージを送受信するメッセージ ブロックは 伝達子ブロック と呼ばれます。 エージェント ライブラリは、concurrency::ISource 抽象クラスを使用してソース ブロックを表し、concurrency::ITarget 抽象クラスを使用してターゲット ブロックを表します。 ソースとして機能するメッセージ ブロックの型は ISource
から派生します。ターゲットとして機能するメッセージ ブロックの型は ITarget
から派生します。
メッセージ ブロックの型は ISource
および ITarget
から直接派生させることもできますが、エージェント ライブラリには、メッセージ ブロックのすべての型に共通の大部分の機能を実行する 3 つの基底クラスが定義されています。これらの基底クラスによって、エラーの処理やメッセージ ブロックの接続などの操作がコンカレンシー セーフに行われます。 concurrency::source_block クラスは ISource
から派生し、メッセージを他のブロックに送信します。 concurrency::target_block クラスは ITarget
から派生し、他のブロックからメッセージを受信します。 concurrency::propagator_block クラスは ISource
および ITarget
から派生し、他のブロックとの間でメッセージを送受信します。 メッセージ ブロックの動作に焦点を合わせることができるように、インフラストラクチャの細部の処理にはこれらの 3 つの基底クラスを使用することをお勧めします。
source_block
、target_block
、および propagator_block
の各クラスはテンプレートであり、ソース ブロックとターゲット ブロック間の接続 (リンク) を管理する型、およびメッセージの処理方法を管理する型でパラメーター化されます。 エージェント ライブラリには、リンクの管理を行う 2 つの型 concurrency::single_link_registry および concurrency::multi_link_registry が定義されています。 single_link_registry
クラスは、メッセージ ブロックを 1 つのソースまたは 1 つのターゲットにリンクできるようにします。 multi_link_registry
クラスは、メッセージ ブロックを複数のソースまたは複数のターゲットにリンクできるようにします。 エージェント ライブラリには、メッセージの管理を行う 1 つのクラス concurrency::ordered_message_processor が定義されています。 ordered_message_processor
クラスは、メッセージ ブロックでメッセージを受信順に処理できるようにします。
メッセージ ブロックとソースおよびターゲットとの相互関係をより深く理解できるように、次の例を考えてみてください。 この例は、concurrency::transformer クラスの宣言を示しています。
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer
クラスは propagator_block
から派生するため、ソース ブロックとしてもターゲット ブロックとしても機能します。 _Input
型のメッセージを受け入れ、_Output
型のメッセージを送信します。 transformer
クラスは、single_link_registry
をターゲット ブロックのリンク マネージャーとして指定し、multi_link_registry
をソース ブロックのリンク マネージャーとして指定します。 したがって、transformer
オブジェクトで許容されるターゲットは 1 つだけですが、ソースの数に制限はありません。
source_block
から派生するクラスは、propagate_to_any_targets、accept_message、reserve_message、consume_message、release_message、および resume_propagation の 6 つのメソッドを実装する必要があります。 target_block
から派生するクラスは、propagate_message メソッドを実装する必要があり、必要に応じて send_message メソッドを実装できます。 propagator_block
からの派生は、source_block
および target_block
からの派生と機能的には同等です。
propagate_to_any_targets
メソッドは、受信メッセージを非同期的または同期的に処理し、送信メッセージを伝達するためにランタイムによって呼び出されます。 accept_message
メソッドは、メッセージを受け入れるためにターゲット ブロックによって呼び出されます。 unbounded_buffer
などのメッセージ ブロックの型の多くは、最初にメッセージを受信するターゲットにのみメッセージを送信します。 したがって、メッセージの所有権はそのターゲットに譲渡されます。 concurrency::overwrite_buffer などの他のメッセージ ブロックの型は、メッセージを各ターゲット ブロックに提供します。 したがって、overwrite_buffer
は各ターゲット用にメッセージのコピーを作成します。
reserve_message
、consume_message
、release_message
、および resume_propagation
メソッドは、メッセージ ブロックがメッセージの予約に参加できるようにします。 ターゲット ブロックは、提供されたメッセージを予約して後で使用できるようにする場合に、reserve_message
メソッドを呼び出します。 メッセージを受信したターゲット ブロックは、consume_message
メソッドを呼び出してそのメッセージを処理するか、release_message
メソッドを呼び出して予約を取り消すことができます。 accept_message
メソッドと同様に、consume_message
の実装では、メッセージの所有権を譲渡するか、メッセージのコピーを返すことができます。 ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは resume_propagation
メソッドを呼び出します。 通常、このメソッドは、キュー内の次のメッセージからメッセージ伝達を続行します。
ランタイムは、propagate_message
メソッドを呼び出して、別のブロックから現在のブロックにメッセージを非同期的に転送します。 send_message
メソッドは、非同期的にではなく同期的にメッセージをターゲット ブロックに送信する点を除いて、propagate_message
と似ています。 send_message
の既定の実装では、すべての受信メッセージが拒否されます。 ターゲット ブロックに関連付けられているオプションのフィルター関数をメッセージが通過しない場合、ランタイムはどちらのメソッドも呼び出しません。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」をご覧ください。
[トップ]
priority_buffer クラスの定義
priority_buffer
クラスは、受信メッセージを優先順位に従って並べてから、メッセージの受信順に並べるカスタム メッセージ ブロックの型です。 priority_buffer
クラスは、メッセージのキューを保持するという点と、ソース メッセージ ブロックとしてもターゲット メッセージ ブロックとしても機能し、どちらの場合も複数のソースおよび複数のターゲットが許容されるという点で、concurrency::unbounded_buffer クラスと似ています。 ただし、unbounded_buffer
でのメッセージの伝達順は、必ずソースからのメッセージの受信順になります。
priority_buffer
クラスは、PriorityType
要素と Type
要素を含む std::tuple 型のメッセージを受信します。 PriorityType
は各メッセージの優先順位を保持する型を表し、Type
はメッセージのデータ部分を表します。 priority_buffer
クラスは、Type
型のメッセージを送信します。 priority_buffer
クラスは、受信メッセージ用の std::priority_queue オブジェクトと送信メッセージ用の std::queue オブジェクトの 2 つのメッセージ キューの管理も行います。 priority_buffer
オブジェクトが複数のメッセージを同時に受信する場合、またはコンシューマーがまだメッセージを読み取っていないときに複数のメッセージを受信する場合、メッセージを優先順位に従って並べ替えると便利です。
propagator_block
クラスでは、priority_buffer
の派生クラスで実装する必要のある 7 つのメソッドに加えて、link_target_notification
メソッドと send_message
メソッドもオーバーライドします。 priority_buffer
クラスでは、2 つのパブリック ヘルパー メソッド (enqueue
および dequeue
) と 1 つのプライベート ヘルパー メソッド (propagate_priority_order
) も定義します。
次の手順では、priority_buffer
クラスを実装する方法について説明します。
priority_buffer クラスを定義するには
C++ ヘッダーファイルを作成し、
priority_buffer.h
という名前を付けます。 または、プロジェクトに含まれる既存のヘッダー ファイルを使用することもできます。priority_buffer.h
に次のコードを追加します。#pragma once #include <agents.h> #include <queue>
std
名前空間で、std::less および std::greater の特殊化を定義して、concurrency::message オブジェクトで動作するようにします。namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer
クラスは、message
オブジェクトにpriority_queue
オブジェクトを格納します。 このような型の特殊化によって、優先順位キューでメッセージが優先順位に従って並べ替えられるようになります。 優先順位は、tuple
オブジェクトの最初の要素です。concurrencyex
名前空間で、priority_buffer
クラスを宣言します。namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer
クラスは、propagator_block
から派生したものです。 したがって、メッセージを送信することも受信することもできます。priority_buffer
クラスは、Type
型のメッセージを受信する複数のターゲットを持つことができます。 さらに、tuple<PriorityType, Type>
型のメッセージを送信する複数のソースを持つことができます。private
クラスのpriority_buffer
セクションに、次のメンバー変数を追加します。// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
priority_queue
オブジェクトは受信メッセージを保持し、queue
オブジェクトは送信メッセージを保持します。priority_buffer
オブジェクトは、複数のメッセージを同時に受信できます。critical_section
オブジェクトは、入力メッセージのキューへのアクセスを同期します。private
セクションで、コピー コンストラクターと代入演算子を定義します。 これにより、priority_queue
オブジェクトが割り当て可能になるのを防ぎます。// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public
セクションで、多くのメッセージ ブロックの型に共通のコンストラクターを定義します。 デストラクターも定義します。// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public
セクションで、enqueue
メソッドとdequeue
メソッドを定義します。 これらのヘルパー メソッドによって、priority_buffer
オブジェクトとの間でメッセージを送受信する別の手段が提供されます。// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected
セクションで、propagate_to_any_targets
メソッドを定義します。// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets
メソッドは、入力キューの先頭にあるメッセージを出力キューに転送し、出力キュー内のすべてのメッセージを伝達します。protected
セクションで、accept_message
メソッドを定義します。// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
ターゲット ブロックから
accept_message
メソッドが呼び出されたとき、priority_buffer
クラスは、メッセージを最初に受け入れるターゲット ブロックにそのメッセージの所有権を譲渡します (この動作はunbounded_buffer
の動作と似ています)。protected
セクションで、reserve_message
メソッドを定義します。// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
priority_buffer
クラスは、指定されたメッセージ識別子とキューの先頭にあるメッセージの識別子が一致する場合に、ターゲット ブロックでメッセージを予約できるようにします。 つまり、priority_buffer
オブジェクトがまだ追加のメッセージを受信しておらず、現在のメッセージも伝達していない場合、ターゲットはメッセージを予約できます。protected
セクションで、consume_message
メソッドを定義します。// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
ターゲット ブロックは、予約したメッセージの所有権を譲渡するために
consume_message
を呼び出します。protected
セクションで、release_message
メソッドを定義します。// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
ターゲット ブロックは、メッセージの予約を取り消すために
release_message
を呼び出します。protected
セクションで、resume_propagation
メソッドを定義します。// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは
resume_propagation
を呼び出します。 このメソッドは、出力キュー内のすべてのメッセージを伝達します。protected
セクションで、link_target_notification
メソッドを定義します。// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor
メンバー変数は、source_block
基底クラスによって定義されます。 このメンバー変数は、出力キューの先頭にあるメッセージの予約を保持しているターゲット ブロック (存在する場合) を指します。 新しいターゲットがlink_target_notification
オブジェクトにリンクされると、ランタイムはpriority_buffer
を呼び出します。 このメソッドは、ターゲットが予約を保持していない場合に、出力キュー内のすべてのメッセージを伝達します。private
セクションで、propagate_priority_order
メソッドを定義します。// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
このメソッドは、出力キュー内のすべてのメッセージを伝達します。 ターゲット ブロックの 1 つがメッセージを受け入れるまで、キュー内の各メッセージが各ターゲット ブロックに提供されます。
priority_buffer
クラスは、送信メッセージの順序を保持しています。 そこで、このメソッドでは、出力キュー内の最初のメッセージがいずれかのターゲット ブロックによって受け入れられるまで、他のメッセージをターゲット ブロックに提供しないようにします。protected
セクションで、propagate_message
メソッドを定義します。// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message
メソッドは、priority_buffer
クラスがメッセージの受信側 (つまりターゲット) として機能するようにします。 このメソッドは、指定されたソース ブロックから提供されたメッセージを受信し、そのメッセージを優先順位キューに挿入します。 その後、propagate_message
メソッドは、すべての出力メッセージをターゲット ブロックに非同期的に送信します。ランタイムは、concurrency::asend 関数が呼び出されたとき、またはメッセージ ブロックが他のメッセージ ブロックに接続されたときに、このメソッドを呼び出します。
protected
セクションで、send_message
メソッドを定義します。// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message
メソッドはpropagate_message
と似ています。 ただし、このメソッドは、非同期的にではなく同期的に出力メッセージを送信します。ランタイムは、concurrency::send 関数が呼び出された場合など、同期送信操作中にこのメソッドを呼び出します。
priority_buffer
クラスには、多くのメッセージ ブロックの型に共通のコンストラクター オーバーロードが含まれています。 一部のコンストラクター オーバーロードは、concurrency::Scheduler オブジェクトまたは concurrency::ScheduleGroup オブジェクトを受け取ります。これらのオブジェクトを使用すると、特定のタスク スケジューラでメッセージ ブロックを管理できます。 フィルター関数を受け取るコンストラクター オーバーロードもあります。 フィルター関数を使用すると、メッセージ ブロックでのメッセージの受け入れまたは拒否をメッセージ ペイロードに基づいて行うことができます。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」をご覧ください。 タスク スケジューラの詳細については、「タスク スケジューラ」をご覧ください。
priority_buffer
クラスはメッセージを優先順位に従って並べてから受信順に並べるため、concurrency::asend 関数を呼び出した場合や、メッセージ ブロックが他のメッセージ ブロックに接続された場合など、メッセージを非同期的に受信する場合に非常に役立ちます。
[トップ]
完全な例
次の例では、priority_buffer
クラスの完全な定義を示します。
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
次の例では、priority_buffer
オブジェクトに対して多くの asend
操作と concurrency::receive 操作を同時に実行します。
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
この例では、次のサンプル出力が生成されます。
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
priority_buffer
クラスは、メッセージを優先順位に従って並べてから、メッセージの受信順に並べます。 この例では、優先順位を示す数値が大きいメッセージほど、キューの先頭近くに挿入されています。
[トップ]
コードのコンパイル
プログラム例をコピーし、それを Visual Studio プロジェクトに貼り付けるか、priority_buffer
クラスの定義を priority_buffer.h
という名前のファイルに、テスト プログラムを priority_buffer.cpp
という名前のファイルにそれぞれ貼り付け、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドを実行します。
cl.exe /EHsc priority_buffer.cpp