Jak: wdrożenia różnych wzorców producentów i konsumentów
W tym temacie opisano sposób implementacji deseń producentów i konsumentów w aplikacji.W tym deseniu producenta wysyła wiadomości do bloku komunikatu i konsumentów czyta wiadomości z tego bloku.
Temat ilustruje dwa scenariusze.W pierwszym scenariuszu konsument musi otrzymać każdy producent wysyła wiadomość.W drugim scenariuszu konsumenta okresowo sonduje dla danych i nie jest więc każdy komunikat.
Przykłady zarówno w tym temacie przesy³aæ agentów, bloki komunikatów i funkcje służące do przekazywania wiadomości wiadomości od producenta do konsumenta.Agent producent używa concurrency::send funkcji pisać wiadomości, aby concurrency::ITarget obiektu.Agent konsumenta używa concurrency::receive funkcji do odczytu wiadomości z concurrency::ISource obiektu.Zarówno agenci, przytrzymaj wartość wskaźnikowe do koordynowania końca przetwarzania.
Aby uzyskać więcej informacji na temat agentów asynchronicznych, zobacz Agenci asynchroniczne.Aby uzyskać więcej informacji na temat funkcji przekazywania wiadomości i bloków komunikatów zobacz Asynchroniczne blokuje wiadomości i Funkcji przekazywania wiadomości.
Przykład
W tym przykładzie agenta producent wysyła serii numerów do agenta konsumenta.Konsument otrzymuje każdego z tych numerów i oblicza średnią ich.Aplikacja zapisuje średnią konsoli.
W tym przykładzie concurrency::unbounded_buffer obiekt, aby umożliwić producenta do wiadomości w kolejce.unbounded_buffer Klasy implementuje ITarget i ISource tak, aby producent i konsumenta, można wysyłać i odbierać wiadomości do i z udostępniony bufor.send i receive funkcje koordynowanie zadań rozmnożeniowego danych od producenta do konsumenta.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
Ten przykład generuje następujące wyniki.
The average is 50.
W tym przykładzie agenta producent wysyła serię giełdowe agenta konsumenta.Agent konsumenta okresowo odczytuje bieżącej oferty i drukuje je do konsoli.
W tym przykładzie podobny do poprzedniego, z wyjątkiem, że używa concurrency::overwrite_buffer obiekt, aby umożliwić producent udostępnianie jednej wiadomości konsumenta.Jak w poprzednim przykładzie overwrite_buffer klasy implementuje ITarget i ISource tak, aby producent i konsument może działać w wiadomości udostępniony bufor.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
Ten przykład generuje następujące przykładowe dane wyjściowe.
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
W odróżnieniu od z unbounded_buffer obiektu, receive funkcja nie usuwa wiadomości z overwrite_buffer obiektu.Jeśli konsument odczytuje z buforu wiadomości więcej niż jeden raz przed producenta zastępuje tę wiadomość, odbiornik uzyskuje tę samą wiadomość co czas.
Kompilowanie kodu
Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wkleić go w pliku o nazwie producenta consumer.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia usługi programu Visual Studio.
cl.exe /EHsc producer-consumer.cpp
Zobacz też
Koncepcje
Biblioteka agentów asynchroniczne