Comment : implémenter divers modèles de producteur-consommateur
Cette rubrique décrit comment implémenter le modèle de producteur-consommateur dans votre application. Dans ce modèle, le producteur envoie des messages à un bloc de messages et le consommateur lit les messages à partir de ce bloc.
La rubrique illustre deux scénarios. Dans le premier scénario, le consommateur doit recevoir chaque message envoyé par le producteur. Dans le deuxième scénario, le consommateur vérifie périodiquement la présence de données, et par conséquent n'est pas obligé de recevoir chaque message.
Les deux exemples de cette rubrique utilisent des agents, des blocs de messages et des fonctions de passage de messages pour transmettre les messages du producteur au consommateur. L'agent producteur utilise la fonction Concurrency::send pour écrire des messages sur un objet Concurrency::ITarget. L'agent du consommateur utilise la fonction Concurrency::receive pour lire les messages d'un objet Concurrency::ISource. Les deux agents conservent une valeur de sentinelle pour coordonner la fin du traitement.
Pour plus d'informations sur les agents asynchrones, consultez Agents asynchrones. Pour plus d'informations sur les blocs de messages et les fonctions de passage de messages, consultez Blocs de messages asynchrones et Fonctions de passage de messages.
Exemple
Dans cet exemple, le agent producteur envoie une série de nombres à l'agent consommateur. Le consommateur reçoit chacun de ces nombres et calcule leur moyenne. L'application écrit la moyenne dans la console.
Cet exemple utilise un objet Concurrency::unbounded_buffer pour permettre au producteur de mettre les messages en file d'attente. La classe unbounded_buffer implémente ITarget et ISource afin que le producteur et le consommateur puissent envoyer et recevoir des messages vers et à partir d'une mémoire tampon partagée. Les fonctions receive et send coordonnent la tâche de propagation des données du producteur au consommateur.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
Cet exemple génère la sortie suivante :
The average is 50.
Dans cet exemple, l'agent producteur envoie une série de cotations boursières à l'agent consommateur. L'agent consommateur lit périodiquement la cotation actuelle et l'imprime sur la console.
Cet exemple ressemble au précédent, mais il utilise un objet Concurrency::overwrite_buffer pour permettre au producteur de partager un message avec le consommateur. Comme dans l'exemple précédent, la classe overwrite_buffer implémente ITarget et ISource afin que le producteur et le consommateur puissent agir sur un tampon de messages partagé.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace Concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (quotes.begin(), quotes.end(), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
Concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
Concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
Cet exemple génère l'exemple de sortie suivant.
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
Contrairement à ce qui se produit avec un objet unbounded_buffer, la fonction receive ne supprime pas le message de l'objet overwrite_buffer. Si le consommateur lit à partir du tampon de messages à plusieurs reprises avant que le producteur ne remplace ce message, le récepteur obtient le même message à chaque fois.
Compilation du code
Copiez l'exemple de code et collez-le dans un projet Visual Studio, ou collez-le dans un fichier nommé producer-consumer.cpp puis exécutez la commande suivante dans une fenêtre d'invite de commandes Visual Studio 2010.
cl.exe /EHsc producer-consumer.cpp