연습: 사용자 지정 메시지 블록 만들기
이 문서에서는 들어오는 메시지를 우선 순위별로 정렬하는 사용자 지정 메시지 블록 유형을 만드는 방법을 설명합니다.
기본 제공 메시지 블록 형식은 광범위한 기능을 제공하지만 사용자 고유의 메시지 블록 유형을 만들고 애플리케이션의 요구 사항에 맞게 사용자 지정할 수 있습니다. 비동기 에이전트 라이브러리에서 제공하는 기본 제공 메시지 블록 형식에 대한 설명은 비동기 메시지 블록을 참조 하세요.
필수 조건
이 연습을 시작하기 전에 다음 문서를 읽어보세요.
섹션
이 연습에는 다음과 같은 섹션이 있습니다.
사용자 지정 메시지 블록 디자인
메시지 블록은 메시지를 보내고 받는 동작에 참여합니다. 메시지를 보내는 메시지 블록을 원본 블록이라고 합니다. 메시지를 받는 메시지 블록을 대상 블록이라고 합니다. 메시지를 보내고 받는 메시지 블록을 전파자 블록이라고 합니다. 에이전트 라이브러리는 추상 클래스 동시성::ISource 를 사용하여 원본 블록을 나타내고 추상 클래스 동시성::ITarget 을 사용하여 대상 블록을 나타냅니다. 원본으로 작동하는 메시지 블록 형식은 파생 ISource
됩니다. 대상 역할을 하는 메시지 블록 형식은 파생 ITarget
됩니다.
메시지 블록 형식을 ISource
ITarget
직접 파생시킬 수 있지만 에이전트 라이브러리는 모든 메시지 블록 형식에 공통적인 기능(예: 오류 처리 및 동시성 안전 방식으로 메시지 블록 연결)을 수행하는 세 가지 기본 클래스를 정의합니다. 동시성::source_block 클래스는 ISource
파생되어 다른 블록으로 메시지를 보냅니다. 동시성::target_block 클래스는 다른 블록에서 ITarget
파생되고 메시지를 받습니다. concurrency::p ropagator_block 클래스는 파생 ISource
되어 ITarget
다른 블록으로 메시지를 보내고 다른 블록에서 메시지를 받습니다. 메시지 블록의 동작에 집중할 수 있도록 이러한 세 가지 기본 클래스를 사용하여 인프라 세부 정보를 처리하는 것이 좋습니다.
및 target_block
클래스는 source_block
원본 블록과 propagator_block
대상 블록 간의 연결 또는 링크를 관리하는 형식과 메시지 처리 방법을 관리하는 형식에서 매개 변수가 있는 템플릿입니다. 에이전트 라이브러리는 링크 관리, 동시성::single_link_registry 및 동시성::multi_link_registry 수행하는 두 가지 형식을 정의합니다. 이 single_link_registry
클래스를 사용하면 메시지 블록을 하나의 원본 또는 하나의 대상에 연결할 수 있습니다. 이 multi_link_registry
클래스를 사용하면 메시지 블록을 여러 원본 또는 여러 대상에 연결할 수 있습니다. 에이전트 라이브러리는 메시지 관리, 동시성::ordered_message_processor 수행하는 하나의 클래스를 정의합니다. 이 ordered_message_processor
클래스를 사용하면 메시지 블록이 메시지를 받는 순서대로 메시지를 처리할 수 있습니다.
메시지 블록이 원본 및 대상과 어떻게 관련되어 있는지 더 잘 이해하려면 다음 예제를 고려하세요. 이 예제에서는 동시성::변환기 클래스의 선언을 보여줍니다.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
클래스는 transformer
파생 propagator_block
되므로 소스 블록 및 대상 블록으로 작동합니다. 형식의 메시지를 수락하고 형식 _Input
_Output
의 메시지를 보냅니다. 클래스는 transformer
모든 대상 블록에 대한 링크 관리자로 지정하고 multi_link_registry
소스 블록에 대한 링크 관리자로 지정 single_link_registry
합니다. 따라서 개체에는 transformer
최대 하나의 대상과 무제한의 원본이 있을 수 있습니다.
파생 source_block
되는 클래스는 propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message 및 resume_propagation 6개의 메서드를 구현해야 합니다. 파생 target_block
되는 클래스는 propagate_message 메서드를 구현해야 하며 필요에 따라 send_message 메서드를 구현할 수 있습니다. propagator_block
파생은 기능적으로 둘 다 source_block
target_block
에서 파생되는 것과 동일합니다.
이 propagate_to_any_targets
메서드는 런타임에서 들어오는 메시지를 비동기적으로 또는 동기적으로 처리하고 나가는 메시지를 전파하기 위해 호출됩니다. 이 accept_message
메서드는 대상 블록에서 메시지를 수락하기 위해 호출됩니다. 많은 메시지 블록 유형(예: unbounded_buffer
메시지를 수신할 첫 번째 대상에만 메시지를 보냅니다). 따라서 메시지의 소유권을 대상으로 전송합니다. 동시성::overwrite_buffer 같은 다른 메시지 블록 형식은 각 대상 블록에 메시지를 제공합니다. 따라서 overwrite_buffer
각 대상에 대한 메시지의 복사본을 만듭니다.
reserve_message
, consume_message
, release_message
및 resume_propagation
메서드를 사용하면 메시지 블록이 메시지 예약에 참여할 수 있습니다. 대상 블록은 메시지가 제공되면 메서드를 호출 reserve_message
하고 나중에 사용할 수 있도록 메시지를 예약해야 합니다. 대상 블록이 메시지를 예약한 후 해당 메시지를 사용하는 메서드 또는 release_message
예약을 취소하는 메서드를 호출 consume_message
할 수 있습니다. 메서드와 accept_message
마찬가지로 구현은 메시지의 consume_message
소유권을 이전하거나 메시지의 복사본을 반환할 수 있습니다. 대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임은 메서드를 호출합니다 resume_propagation
. 일반적으로 이 메서드는 큐의 다음 메시지부터 시작하여 메시지 전파를 계속합니다.
런타임은 메서드를 propagate_message
호출하여 다른 블록에서 현재 블록으로 메시지를 비동기적으로 전송합니다. 메서드는 send_message
propagate_message
비동기식이 아니라 동기적으로 대상 블록에 메시지를 보내는 것을 제외하고 유사합니다. 기본 구현은 send_message
들어오는 모든 메시지를 거부합니다. 메시지가 대상 블록과 연결된 선택적 필터 함수를 전달하지 않으면 런타임에서 이러한 메서드 중 하나를 호출하지 않습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조 하세요.
[맨 위로 이동]
priority_buffer 클래스 정의
클래스는 priority_buffer
들어오는 메시지를 우선 순위에 따라 정렬한 다음 메시지를 받는 순서에 따라 정렬하는 사용자 지정 메시지 블록 형식입니다. 클래스는 priority_buffer
메시지 큐를 보유하고 있으며 원본 및 대상 메시지 블록의 역할을 하며 여러 원본과 여러 대상을 모두 가질 수 있기 때문에 동시성::unbounded_buffer 클래스와 유사합니다. 그러나 unbounded_buffer
원본에서 메시지를 수신하는 순서에 대해서만 메시지 전파를 기반으로 합니다.
클래스는 priority_buffer
포함된 PriorityType
요소와 Type
std::tuple 형식의 메시지를 받습니다. PriorityType
는 각 메시지의 우선 순위를 보유하는 형식을 나타냅니다. Type
는 메시지의 데이터 부분을 나타냅니다. 클래스는 priority_buffer
형식 Type
의 메시지를 보냅니다. 또한 이 클래스는 priority_buffer
들어오는 메시지에 대한 std::p riority_queue 개체와 나가는 메시지의 std::queue 개체라는 두 개의 메시지 큐 를 관리합니다. 우선 순위별로 메시지 순서 지정은 개체가 priority_buffer
동시에 여러 메시지를 수신하거나 소비자가 메시지를 읽기 전에 여러 메시지를 수신할 때 유용합니다.
파생 propagator_block
되는 클래스가 구현 priority_buffer
해야 하는 7개의 메서드 외에도 클래스는 및 send_message
메서드를 재정의 link_target_notification
합니다. 또한 클래스는 priority_buffer
두 개의 공용 도우미 메서드 enqueue
와 dequeue
프라이빗 도우미 메서드 propagate_priority_order
를 정의합니다.
다음 절차에서는 클래스를 구현 priority_buffer
하는 방법을 설명합니다.
priority_buffer 클래스를 정의하려면
C++ 헤더 파일을 만들고 이름을 지정합니다
priority_buffer.h
. 또는 프로젝트의 일부인 기존 헤더 파일을 사용할 수 있습니다.에서
priority_buffer.h
다음 코드를 추가합니다.#pragma once #include <agents.h> #include <queue>
네임스페이
std
스에서 동시성::message 개체에 대해 작동하는 std::less 및 std::greater의 특수화를 정의합니다.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
클래스는
priority_buffer
개체에 개체를priority_queue
저장message
합니다. 이러한 형식 특수화를 사용하면 우선 순위 큐가 우선 순위에 따라 메시지를 정렬할 수 있습니다. 우선 순위는 개체의 첫 번째 요소입니다tuple
.네임스페이
concurrencyex
스에서 클래스를 선언합니다priority_buffer
.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer
클래스는propagator_block
에서 파생됩니다. 따라서 메시지를 보내고 받을 수 있습니다. 클래스에는priority_buffer
형식Type
의 메시지를 받는 여러 대상이 있을 수 있습니다. 형식의 메시지를 보내는 여러 원본을 가질 수도 있습니다tuple<PriorityType, Type>
.클래스의
private
priority_buffer
섹션에서 다음 멤버 변수를 추가합니다.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
개체는
priority_queue
들어오는 메시지를 보유합니다. 개체는queue
나가는 메시지를 보유합니다. 개체는priority_buffer
동시에 여러 메시지를 받을 수 있습니다. 개체는critical_section
입력 메시지 큐에 대한 액세스를 동기화합니다.private
섹션에서 복사 생성자 및 대입 연산자를 정의합니다. 이렇게 하면 개체를priority_queue
할당할 수 없습니다.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public
이 섹션에서는 많은 메시지 블록 형식에 공통적인 생성자를 정의합니다. 또한 소멸자를 정의합니다.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public
섹션에서 메서드enqueue
를 정의하고dequeue
. 이러한 도우미 메서드는 개체에서priority_buffer
메시지를 보내고 받는 다른 방법을 제공합니다.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected
섹션에서 메서드를 정의합니다propagate_to_any_targets
.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
이 메서드는
propagate_to_any_targets
입력 큐의 맨 앞에 있는 메시지를 출력 큐로 전송하고 출력 큐의 모든 메시지를 전파합니다.protected
섹션에서 메서드를 정의합니다accept_message
.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
대상 블록이 메서드를
accept_message
호출하면 클래스는priority_buffer
메시지의 소유권을 수락하는 첫 번째 대상 블록으로 전송합니다. (이 동작은 .)의unbounded_buffer
동작과 유사합니다.protected
섹션에서 메서드를 정의합니다reserve_message
.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
클래스는
priority_buffer
제공된 메시지 식별자가 큐 앞에 있는 메시지의 식별자와 일치하는 경우 대상 블록에서 메시지를 예약할 수 있도록 허용합니다. 즉, 개체가 아직 추가 메시지를 받지 못하고 현재 메시지를 전파하지 않은 경우priority_buffer
대상은 메시지를 예약할 수 있습니다.protected
섹션에서 메서드를 정의합니다consume_message
.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
대상 블록은 예약된 메시지의 소유권을 전송하기 위해 호출
consume_message
합니다.protected
섹션에서 메서드를 정의합니다release_message
.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
대상 블록은 메시지에 대한 예약을 취소하기 위해 호출
release_message
합니다.protected
섹션에서 메서드를 정의합니다resume_propagation
.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임이 호출
resume_propagation
됩니다. 이 메서드는 출력 큐에 있는 모든 메시지를 전파합니다.protected
섹션에서 메서드를 정의합니다link_target_notification
.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor
멤버 변수는 기본 클래스source_block
에 의해 정의됩니다. 이 멤버 변수는 출력 큐의 맨 앞에 있는 메시지에 대한 예약을 보유하는 대상 블록(있는 경우)을 가리킵니다. 런타임은 새 대상이 개체에 연결priority_buffer
되면 호출link_target_notification
됩니다. 이 메서드는 예약을 보유하고 있는 대상이 없는 경우 출력 큐에 있는 모든 메시지를 전파합니다.private
섹션에서 메서드를 정의합니다propagate_priority_order
.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
이 메서드는 출력 큐에서 모든 메시지를 전파합니다. 큐의 모든 메시지는 대상 블록 중 하나가 메시지를 수락할 때까지 모든 대상 블록에 제공됩니다. 이 클래스는
priority_buffer
보내는 메시지의 순서를 유지합니다. 따라서 이 메서드가 대상 블록에 다른 메시지를 제공하려면 먼저 대상 블록에서 출력 큐의 첫 번째 메시지를 수락해야 합니다.protected
섹션에서 메서드를 정의합니다propagate_message
.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
이
propagate_message
메서드를 사용하면 클래스가priority_buffer
메시지 수신자 또는 대상으로 작동할 수 있습니다. 이 메서드는 제공된 소스 블록에서 제공하는 메시지를 수신하고 해당 메시지를 우선 순위 큐에 삽입합니다. 그런 다음 메서드는propagate_message
모든 출력 메시지를 대상 블록에 비동기적으로 보냅니다.런타임은 동시성::asend 함수를 호출하거나 메시지 블록이 다른 메시지 블록에 연결된 경우 이 메서드를 호출합니다.
protected
섹션에서 메서드를 정의합니다send_message
.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
메서드는
send_message
다음과 유사합니다propagate_message
. 그러나 비동기식이 아닌 동기적으로 출력 메시지를 보냅니다.런타임은 동시성::send 함수를 호출하는 경우와 같이 동기 송신 작업 중에 이 메서드를 호출합니다.
클래스에는 priority_buffer
많은 메시지 블록 형식에서 일반적인 생성자 오버로드가 포함됩니다. 일부 생성자 오버로드는 동시성::Scheduler 또는 동시성::ScheduleGroup 개체를 사용합니다. 이를 통해 메시지 블록을 특정 작업 스케줄러에서 관리할 수 있습니다. 다른 생성자 오버로드는 필터 함수를 사용합니다. 필터 함수를 사용하면 메시지 블록이 페이로드를 기반으로 메시지를 수락하거나 거부할 수 있습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조 하세요. 작업 스케줄러에 대한 자세한 내용은 작업 스케줄러를 참조 하세요.
클래스는 priority_buffer
메시지를 우선 순위에 따라 정렬한 다음 메시지를 받는 순서에 따라 정렬하기 때문에 이 클래스는 메시지를 비동기적으로 수신할 때 가장 유용합니다(예: 동시성::asend 함수를 호출하거나 메시지 블록이 다른 메시지 블록에 연결된 경우).
[맨 위로 이동]
전체 예제
다음 예제에서는 클래스의 priority_buffer
전체 정의를 보여줍니다.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
다음 예제에서는 개체에 대해 여러 asend
가지 동시성::receive 작업을 동시에 수행합니다 priority_buffer
.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
이 예제에서는 다음 샘플 출력을 생성합니다.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
클래스는 priority_buffer
먼저 우선 순위에 따라 메시지를 정렬한 다음 메시지를 받는 순서로 메시지를 정렬합니다. 이 예제에서는 숫자 우선 순위가 더 큰 메시지가 큐의 맨 앞으로 삽입됩니다.
[맨 위로 이동]
코드 컴파일
예제 코드를 복사하여 Visual Studio 프로젝트에 붙여넣거나 이름이 지정된 priority_buffer.h
파일에 클래스 정의 priority_buffer
와 테스트 프로그램을 priority_buffer.cpp
붙여넣은 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.
cl.exe /EHsc priority_buffer.cpp