Postupy: Implementace různých vzorů typu výrobce-spotřebitel
Toto téma popisuje, jak ve vaší aplikaci implementovat model producenta a příjemce. V tomto vzoru producent odesílá zprávy do bloku zpráv a příjemce čte zprávy z tohoto bloku.
Téma ukazuje dva scénáře. V prvním scénáři musí příjemce obdržet každou zprávu, kterou producent odešle. Ve druhém scénáři se spotřebitel pravidelně dotazuje na data, a proto nemusí přijímat každou zprávu.
Oba příklady v tomto tématu používají agenty, bloky zpráv a funkce předávání zpráv k přenosu zpráv od producenta do příjemce. Agent producenta používá funkci concurrency::send k zápisu zpráv do objektu concurrency::ITarget . Agent příjemce používá funkci concurrency::receive ke čtení zpráv z objektu concurrency::ISource . Oba agenti uchovávají hodnotu sentinelu, která koordinuje konec zpracování.
Další informace o asynchronních agentech naleznete v tématu Asynchronní agenti. Další informace o blocích zpráv a funkcích předávání zpráv naleznete v tématu Asynchronní bloky zpráv a funkce předávání zpráv.
Příklad: Odeslání řady čísel do agenta příjemce
V tomto příkladu agent producenta odešle řadu čísel do agenta příjemce. Příjemce obdrží každé z těchto čísel a vypočítá jejich průměr. Aplikace zapíše průměr do konzoly.
V tomto příkladu se používá objekt concurrency::unbounded_buffer , který producentovi umožní zařazení zpráv do fronty. Třída unbounded_buffer
implementuje ITarget
a ISource
aby producent a příjemce mohli odesílat a přijímat zprávy do a ze sdílené vyrovnávací paměti. receive
Funkce send
koordinuje úlohu šíření dat od producenta do příjemce.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
Tento příklad vytvoří následující výstup.
The average is 50.
Příklad: Odeslání řady akcií do spotřebitelského agenta
V tomto příkladu agent producenta odešle sérii akcií do spotřebitelského agenta. Agent příjemce pravidelně čte aktuální nabídku a vytiskne ji do konzoly.
Tento příklad se podobá předchozímu, s tím rozdílem, že používá souběžnost::overwrite_buffer objekt, aby producent mohl sdílet jednu zprávu se příjemcem. Stejně jako v předchozím příkladu overwrite_buffer
třída implementuje ITarget
a ISource
tak, aby producent a příjemce mohli reagovat na sdílenou vyrovnávací paměť zpráv.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
Tento příklad vytvoří následující ukázkový výstup.
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
Na rozdíl od objektu unbounded_buffer
receive
funkce neodebere zprávu z objektu overwrite_buffer
. Pokud příjemce přečte z vyrovnávací paměti zprávy více než jednou před tím, než producent přepíše tuto zprávu, příjemce obdrží stejnou zprávu pokaždé.
Probíhá kompilace kódu
Zkopírujte ukázkový kód a vložte ho do projektu sady Visual Studio nebo ho vložte do pojmenovaného producer-consumer.cpp
souboru a potom v okně příkazového řádku sady Visual Studio spusťte následující příkaz.
cl.exe /EHsc producer-consumer.cpp
Viz také
Knihovna asynchronních agentů
Asynchronní agenti
Asynchronní bloky zpráv
Funkce pro předávání zpráv