方法: さまざまなプロデューサー/コンシューマー パターンを実装する
ここでは、アプリケーションにプロデューサー/コンシューマー パターンを実装する方法について説明します。このパターンでは、プロデューサーがメッセージをメッセージ ブロックに送信し、コンシューマーがそのブロックからメッセージを読み取ります。
このトピックでは、2 つのシナリオに従って説明します。最初のシナリオでは、コンシューマーはプロデューサーが送信した各メッセージを受信する必要があります。2 番目のシナリオでは、コンシューマーは定期的にデータをポーリングします。そのため、各メッセージを受信する必要はありません。
このトピックのどちらの例も、エージェント、メッセージ ブロック、およびメッセージ パッシング関数を使用して、メッセージをプロデューサーからコンシューマーに転送します。プロデューサーのエージェントを使用して、 concurrency::send にメッセージを書き込む関数を concurrency::ITarget オブジェクト。消費者エージェントを使用して、 concurrency::receive からのメッセージを読み取る機能、 concurrency::ISource オブジェクト。どちらのエージェントにも、処理の終了を調整するための sentinel 値が保持されます。
非同期エージェントの詳細については、「非同期エージェント」を参照してください。メッセージ ブロックおよびメッセージ パッシング関数の詳細については、「非同期メッセージ ブロック」および「メッセージ パッシング関数」を参照してください。
使用例
この例では、プロデューサー エージェントは一連の数字をコンシューマー エージェントに送信します。コンシューマーはこれらの各数字を受け取り、その平均を計算します。アプリケーションはその平均をコンソールに出力します。
次の使用例を使用して、 concurrency::unbounded_buffer 、プロデューサー、キューのメッセージを有効にするのには、[オブジェクト。unbounded_buffer クラスに、ITarget および ISource を実装して、プロデューサーおよびコンシューマーと共有バッファーとの間でメッセージを送受信できるようにします。send 関数および receive 関数で、プロデューサーからコンシューマーにデータを伝達するタスクを調整します。
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
この例を実行すると、次の出力が生成されます。
The average is 50.
この例では、プロデューサー エージェントは一連の株式相場をコンシューマー エージェントに送信します。コンシューマー エージェントは定期的に現在の相場を読み取り、それをコンソールに出力します。
それを使用する以外は前の仕事では、次の使用例のような concurrency::overwrite_buffer プロデューサーがコンシューマーとの 1 つのメッセージを共有するを有効にするオブジェクト。前の例と同様に、overwrite_buffer クラスに、ITarget および ISource を実装して、プロデューサーおよびコンシューマーが共有メッセージ バッファーを操作できるようにします。
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
この例では、次のサンプル出力が生成されます。
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
unbounded_buffer オブジェクトとは異なり、receive 関数によって、overwrite_buffer オブジェクトからメッセージが削除されることはありません。プロデューサーがメッセージを上書きする前に、コンシューマーがメッセージ バッファーから 2 回以上読み取りを行った場合、コンシューマーは毎回同じメッセージを取得します。
コードのコンパイル
コード例をコピーして、Visual Studio プロジェクトでは、貼り付けるまたはという名前のファイルに貼り付けてプロデューサー consumer.cpp と、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドを実行します。
cl.exe /EHsc producer-consumer.cpp