비동기 에이전트 라이브러리의 모범 사례
이 문서에서는 비동기 에이전트 라이브러리를 효과적으로 사용하는 방법을 설명합니다. 에이전트 라이브러리는 거친 데이터 흐름 및 파이프라인 작업을 위해 행위자 기반 프로그래밍 모델 및 In-Process 메시지를 전달합니다.
에이전트 라이브러리에 대한 자세한 내용은 비동기 에이전트 라이브러리를 참조 하세요.
섹션
이 문서는 다음 섹션으로 구성됩니다.
에이전트를 사용하여 상태 격리
에이전트 라이브러리는 비동기 메시지 전달 메커니즘을 통해 격리된 구성 요소를 연결할 수 있도록 하여 공유 상태에 대한 대안을 제공합니다. 비동기 에이전트는 내부 상태를 다른 구성 요소와 격리할 때 가장 효과적입니다. 상태를 격리하여 여러 구성 요소는 일반적으로 공유 데이터에 대해 작동하지 않습니다. 상태 격리를 사용하면 공유 메모리에 대한 경합이 줄어들기 때문에 애플리케이션의 크기를 조정할 수 있습니다. 또한 상태 격리는 구성 요소가 공유 데이터에 대한 액세스를 동기화할 필요가 없으므로 교착 상태 및 경합 상태의 가능성을 줄입니다.
일반적으로 에이전트 클래스의 섹션 또는 protected
섹션에 데이터 멤버 private
를 보관하고 메시지 버퍼를 사용하여 상태 변경 내용을 전달하여 에이전트의 상태를 격리합니다. 다음 예제에서는 동시성::agent에서 파생되는 클래스를 보여줍니다basic_agent
. 클래스는 basic_agent
두 개의 메시지 버퍼를 사용하여 외부 구성 요소와 통신합니다. 하나의 메시지 버퍼는 들어오는 메시지를 보유합니다. 다른 메시지 버퍼는 나가는 메시지를 보유합니다.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrieves the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
에이전트 를 정의하고 사용하는 방법에 대한 전체 예제는 연습: 에이전트 기반 애플리케이션 만들기 및 연습: 데이터 흐름 에이전트 만들기를 참조하세요.
[맨 위로 이동]
제한 메커니즘을 사용하여 데이터 파이프라인의 메시지 수 제한
동시성::unbounded_buffer 같은 많은 메시지 버퍼 형식은 무제한의 메시지를 보유할 수 있습니다. 메시지 생산자가 소비자가 이러한 메시지를 처리할 수 있는 것보다 더 빨리 데이터 파이프라인에 메시지를 보내는 경우 애플리케이션은 메모리 부족 또는 메모리 부족 상태를 입력할 수 있습니다. 제한 메커니즘(예: 세마포)을 사용하여 데이터 파이프라인에서 동시에 활성화된 메시지 수를 제한할 수 있습니다.
다음 기본 예제에서는 세마포를 사용하여 데이터 파이프라인의 메시지 수를 제한하는 방법을 보여 줍니다. 데이터 파이프라인은 동시성::wait 함수를 사용하여 최소 100밀리초가 걸리는 작업을 시뮬레이션합니다. 보낸 사람은 소비자가 해당 메시지를 처리할 수 있는 것보다 더 빠르게 메시지를 생성하므로 이 예제에서는 애플리케이션이 활성 메시지 수를 제한할 수 있도록 클래스를 정의 semaphore
합니다.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
이 개체는 semaphore
파이프라인이 최대 두 개의 메시지를 동시에 처리하도록 제한합니다.
이 예제의 생산자는 소비자에게 상대적으로 적은 메시지를 보냅니다. 따라서 이 예제에서는 잠재적인 메모리 부족 또는 메모리 부족 상태를 보여 주지 않습니다. 그러나 이 메커니즘은 데이터 파이프라인에 상대적으로 많은 수의 메시지가 포함된 경우에 유용합니다.
이 예제에서 사용되는 세마포 클래스를 만드는 방법에 대한 자세한 내용은 방법: 컨텍스트 클래스를 사용하여 협조적 세마포 구현을 참조 하세요.
[맨 위로 이동]
데이터 파이프라인에서 세분화된 작업 수행 안 함
에이전트 라이브러리는 데이터 파이프라인에서 수행하는 작업이 상당히 세분화된 경우에 가장 유용합니다. 예를 들어 한 애플리케이션 구성 요소는 파일 또는 네트워크 연결에서 데이터를 읽고 경우에 따라 해당 데이터를 다른 구성 요소로 보낼 수 있습니다. 에이전트 라이브러리가 메시지를 전파하는 데 사용하는 프로토콜은 메시지 전달 메커니즘이 PPL(병렬 패턴 라이브러리)에서 제공하는 작업 병렬 구문보다 오버헤드가 더 많이 발생합니다. 따라서 데이터 파이프라인에서 수행하는 작업이 이 오버헤드를 상쇄할 만큼 충분히 긴지 확인합니다.
데이터 파이프라인은 태스크가 세분화된 경우 가장 효과적이지만 데이터 파이프라인의 각 단계에서 작업 그룹 및 병렬 알고리즘과 같은 PPL 구문을 사용하여 보다 세분화된 작업을 수행할 수 있습니다. 각 처리 단계에서 세분화된 병렬 처리를 사용하는 거친 데이터 네트워크의 예는 연습: 이미지 처리 네트워크 만들기를 참조하세요.
[맨 위로 이동]
값으로 큰 메시지 페이로드 전달 안 함
경우에 따라 런타임은 한 메시지 버퍼에서 다른 메시지 버퍼로 전달하는 모든 메시지의 복사본을 만듭니다. 예를 들어 동시성::overwrite_buffer 클래스는 각 대상에 수신하는 모든 메시지의 복사본을 제공합니다. 또한 런타임은 동시성::send 및 concurrency::receive와 같은 메시지 전달 함수를 사용하여 메시지를 쓰고 메시지 버퍼에서 메시지를 읽을 때 메시지 데이터의 복사본을 만듭니다. 이 메커니즘은 공유 데이터에 동시에 쓸 위험을 제거하는 데 도움이 되지만 메시지 페이로드가 상대적으로 클 때 메모리 성능이 저하될 수 있습니다.
큰 페이로드가 있는 메시지를 전달할 때 포인터 또는 참조를 사용하여 메모리 성능을 향상시킬 수 있습니다. 다음 예제에서는 값별로 큰 메시지를 전달하는 것과 동일한 메시지 유형에 포인터를 전달하는 방법을 비교합니다. 이 예제에서는 두 가지 에이전트 형식 producer
을 정의하고 consumer
개체에 대해 message_data
작동합니다. 이 예제에서는 생산자가 여러 message_data
개체를 소비자에게 보내는 데 필요한 시간과 생산자 에이전트가 개체에 대한 여러 포인터를 소비자에게 보내는 데 필요한 시간을 비교합니다 message_data
.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
이 예제에서는 다음 샘플 출력을 생성합니다.
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
포인터를 사용하는 버전은 런타임이 생산자에서 소비자에게 전달하는 모든 message_data
개체의 전체 복사본을 만들 필요가 없으므로 더 나은 성능을 발휘합니다.
[맨 위로 이동]
소유권이 정의되지 않은 경우 데이터 네트워크에서 shared_ptr 사용
메시지 전달 파이프라인 또는 네트워크를 통해 포인터로 메시지를 보내는 경우 일반적으로 네트워크 맨 앞에 있는 각 메시지에 대한 메모리를 할당하고 네트워크 끝에 해당 메모리를 해제합니다. 이 메커니즘은 자주 잘 작동하지만 사용하기 어렵거나 사용할 수 없는 경우가 있습니다. 예를 들어 데이터 네트워크에 여러 엔드 노드가 포함된 경우를 고려합니다. 이 경우 메시지의 메모리를 해제할 명확한 위치가 없습니다.
이 문제를 해결하기 위해 여러 구성 요소에서 포인터를 소유할 수 있도록 하는 메커니즘(예 : std::shared_ptr)을 사용할 수 있습니다. 리소스를 소유하는 최종 shared_ptr
개체가 제거되면 리소스도 해제됩니다.
다음 예제에서는 여러 메시지 버퍼 간에 포인터 값을 공유하는 방법을 shared_ptr
보여 줍니다. 이 예제에서는 동시성::overwrite_buffer 개체를 세 개의 동시성::call 개체에 연결합니다. 클래스는 overwrite_buffer
각 대상에 메시지를 제공합니다. 데이터 네트워크의 끝에는 여러 명의 데이터 소유자가 있으므로 이 예제에서는 각 call
개체가 메시지의 소유권을 공유하도록 설정하는 데 사용합니다shared_ptr
.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
이 예제에서는 다음 샘플 출력을 생성합니다.
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
참고 항목
동시성 런타임 유용한 정보
비동기 에이전트 라이브러리
연습: 에이전트 기반 애플리케이션 만들기
연습: 데이터 흐름 에이전트 만들기
연습: 이미지 처리 네트워크 만들기
병렬 패턴 라이브러리의 유용한 정보
동시성 런타임의 유용한 일반 정보