방법: 다양한 공급자/소비자 패턴 구현
이 항목에서는 응용 프로그램에서 공급자/소비자 패턴을 구현하는 방법에 대해 설명합니다. 이 패턴에서 공급자는 메시지 블록에 메시지를 보내고 소비자는 해당 블록에서 메시지를 읽습니다.
이 항목에서는 두 가지 시나리오를 보여 줍니다. 첫 번째 시나리오에서 소비자는 공급자가 보내는 각 메시지를 받아야 합니다. 두 번째 시나리오에서 소비자는 주기적으로 데이터를 폴링하므로 각 메시지를 받을 필요가 없습니다.
이 항목에 나오는 두 예제에서는 모두 에이전트, 메시지 블록 및 메시지 전달 함수를 사용하여 공급자가 소비자에게 메시지를 전송합니다. 공급자 에이전트는 concurrency::asend 함수를 사용하여 concurrency::ITarget 개체에 메시지를 작성하고, 소비자 에이전트는 concurrency::receive 함수를 사용하여 concurrency::ISource 개체에서 메시지를 읽습니다. 두 에이전트 모두 처리의 끝을 조정하는 데 사용할 센티널 값을 포함합니다.
비동기 에이전트에 대한 자세한 내용은 비동기 에이전트를 참조하십시오. 메시지 블록 및 메시지 전달 함수에 대한 자세한 내용은 비동기 메시지 블록 및 메시지 전달 함수를 참조하십시오.
예제
이 예제에서 공급자 에이전트는 일련의 숫자를 소비자 에이전트에게 보냅니다. 소비자는 이러한 각 숫자를 받아 평균 값을 계산합니다. 그러면 응용 프로그램에서 평균 값을 콘솔에 씁니다.
이 예제에서는 concurrency::unbounded_buffer 개체를 사용하여 공급자가 메시지를 큐에 넣을 수 있도록 합니다. unbounded_buffer 클래스는 ITarget 및 ISource를 구현하여 공급자와 소비자가 공유 버퍼에 메시지를 보내거나 공유 버퍼에서 메시지를 받을 수 있도록 합니다. send 및 receive 함수는 공급자에서 소비자로 데이터를 전파하는 작업을 조정합니다.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
이 예제의 결과는 다음과 같습니다.
이 예제에서 공급자 에이전트는 일련의 주식 시세를 소비자 에이전트에게 보냅니다. 소비자 에이전트는 현재 시세를 주기적으로 읽고 콘솔에 출력합니다.
이 예제는 이전 예제와 유사하지만 concurrency::overwrite_buffer 개체를 사용하여 공급자가 하나의 메시지를 소비자와 공유할 수 있도록 한다는 점이 다릅니다. 이전 예제와 같이 overwrite_buffer 클래스는 ITarget 및 ISource를 구현하여 공급자와 소비자가 공유 메시지 버퍼에서 동작할 수 있도록 합니다.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.
unbounded_buffer 개체와 달리 receive 함수는 overwrite_buffer 개체에서 메시지를 제거하지 않습니다. 공급자가 메시지를 덮어쓰기 전에 소비자가 해당 메시지 버퍼를 두 번 이상 읽으면 수신자는 매번 같은 메시지를 가져옵니다.
코드 컴파일
예제 코드를 복사하여 Visual Studio 프로젝트 또는 producer-consumer.cpp 파일에 붙여넣고 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행합니다.
cl.exe /EHsc producer-consumer.cpp