Freigeben über


Gewusst wie: Implementieren verschiedener Producer-Consumer-Muster

In diesem Thema wird die Implementierung des Producer-Consumer-Musters in der Anwendung beschrieben. Bei diesem Muster sendet der Producer Nachrichten an einen Nachrichtenblock, während der Consumer Nachrichten aus diesem Block ausliest.

In diesem Thema werden zwei Szenarien beschrieben. Im ersten Szenario muss der Consumer alle Nachrichten empfangen, die der Producer sendet. Im zweiten Szenario ruft der Consumer in regelmäßigen Abständen Daten ab und muss daher nicht jede Nachricht empfangen.

In beiden Beispielen in diesem Thema werden Nachrichten mithilfe von Agents, Nachrichtenblöcken und Nachrichtenübergabefunktionen vom Producer an den Consumer übertragen. Der Produzent-Agent verwendet die Funktion "concurrency::send " zum Schreiben von Nachrichten in ein Parallelcurrency::ITarget-Objekt . Der Consumer-Agent verwendet die Funktion "concurrency::receive" zum Lesen von Nachrichten aus einem Parallelitätsobjekt::ISource.The consumer agent uses the concurrency::receive function to read messages from a concurrency::ISource object. Beide Agents enthalten einen Sentinelwert, um das Ende der Verarbeitung zu koordinieren.

Weitere Informationen zu asynchronen Agents finden Sie unter "Asynchrone Agents". Weitere Informationen zu Nachrichtenblöcken und Nachrichtenübergabefunktionen finden Sie unter "Asynchrone Nachrichtenblöcke " und "Nachrichtenübergabefunktionen".

Beispiel: Senden von Nummernreihen an Consumer-Agent

In diesem Beispiel sendet der Producer-Agent eine Reihe von Zahlen an den Consumer-Agent. Diese werden vom Consumer empfangen und ihr Durchschnitt wird berechnet. Anschließend wird der Durchschnitt in die Konsole geschrieben.

In diesem Beispiel wird ein Parallelitätsobjekt::unbounded_buffer verwendet, um dem Produzenten die Warteschlange von Nachrichten zu ermöglichen. Die unbounded_buffer-Klasse implementiert ITarget und ISource, damit Producer und Consumer Nachrichten über einen gemeinsamen Puffer senden und empfangen können. Die Funktionen send und receive koordinieren die Aufgabe, die Daten vom Producer an den Consumer weiterzugeben.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Folgende Ergebnisse werden zurückgegeben:

The average is 50.

Beispiel: Senden einer Reihe von Aktienkursen an Verbrauchervertreter

In diesem Beispiel sendet der Producer-Agent eine Reihe von Aktienkursen an den Consumer-Agent. Der Consumer-Agent liest den aktuellen Kurs in regelmäßigen Abständen und gibt ihn an der Konsole aus.

Dieses Beispiel ähnelt dem vorherigen, mit der Ausnahme, dass es ein Parallelitätsobjekt::overwrite_buffer verwendet, damit der Produzent eine Nachricht für den Verbraucher freigeben kann. Wie im vorherigen Beispiel implementiert die overwrite_buffer-Klasse ITarget und ISource, sodass Producer und Consumer einen gemeinsamen Nachrichtenpuffer nutzen können.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Dieses Beispiel erzeugt die folgende Beispielausgabe.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Anders als bei einem unbounded_buffer-Objekt entfernt die receive-Funktion die Nachricht nicht aus dem overwrite_buffer-Objekt. Wenn der Consumer den Nachrichtenpuffer mehr als einmal ausliest, bevor der Producer diese Nachricht überschreibt, erhält der Empfänger jedes Mal dieselbe Nachricht.

Kompilieren des Codes

Kopieren Sie den Beispielcode, fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie ihn in eine Datei ein, die benannt producer-consumer.cpp ist, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.

cl.exe /EHsc producer-consumer.cpp

Siehe auch

Asynchrone Agents Library
Asynchrone Agents
Asynchrone Nachrichtenblöcke
Funktionen zum Übergeben von Nachrichten