Sdílet prostřednictvím


Jak: implementovat různé vzorky producentem spotřebiteli

Toto téma popisuje, jak implementovat vzorek producentem spotřebiteli v aplikaci.V tomto vzorku producentů bloku zprávy odesílá zprávy a spotřebitele čtení zpráv z tohoto bloku.

Téma ukazuje dva scénáře.V prvním scénáři spotřebitel musí mít každé zprávy, odešle producenta.Ve druhém scénáři spotřebitele pravidelně zjišťuje data a proto nemá každou zprávu.

Oba příklady v tomto tématu pomocí funkce předávání zpráv, bloků zprávy a agenti přenosu zpráv od výrobce spotřebiteli.Používá agent producentů concurrency::send funkce pro zápis zprávy concurrency::ITarget objektu.Používá agent spotřebitele concurrency::receive funkci čtení zpráv z concurrency::ISource objektu.Oba agenti obsahovat hodnotu sentinel koordinovat konec zpracování.

Další informace o asynchronní agenti, viz Asynchronní agenti.Další informace o bloků zprávy a funkce předávání zpráv naleznete v Asynchronní bloků zprávy a Funkce předávání zpráv.

Příklad

V tomto příkladu odešle agent producentů agent spotřebitele řadu čísel.Spotřebitel obdrží každý z těchto čísel a vypočítá průměrnou.Aplikace zapisuje do konzoly průměr.

V tomto příkladu concurrency::unbounded_buffer objektu povolit producentů do fronty zpráv.unbounded_buffer Třída implementuje ITarget a ISource tak, aby bylo možné odesílat a přijímat zprávy z sdílené vyrovnávací paměti a výrobce a spotřebitele.send a receive funkce koordinovat úkol rozmnožovacího data z producentem spotřebiteli.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Tento příklad vytvoří následující výstup.

The average is 50.

V tomto příkladu odešle agent producentů agent spotřebitele řadu akcií.Agent spotřebitele pravidelně čte aktuální nabídky a vytiskne do konzoly.

V tomto příkladu podobá předchozí, až na to, že používá concurrency::overwrite_buffer objektu povolit producentů sdílet jednu zprávu s spotřebitele.Jako v předchozím příkladu overwrite_buffer třída implementuje ITarget a ISource tak, aby výrobce a spotřebitele může působit na vyrovnávací paměti sdílené zprávy.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Tento příklad vytvoří následující ukázkový výstup.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Na rozdíl od s unbounded_buffer objektu, receive funkce neodeberete zprávu z overwrite_buffer objektu.Pokud spotřebitele vyrovnávací paměť pro zprávy z více než jednou před producent přepíše tuto zprávu, příjemce získá vždy stejnou zprávu.

Probíhá kompilace kódu

Příklad kódu zkopírujte a vložte do projektu Visual Studio nebo vložit do souboru s názvem výrobce consumer.cpp a spusťte následující příkaz v okně příkazového řádku Visual Studio.

cl.exe /EHsc producer-consumer.cpp

Viz také

Koncepty

Asynchronní agenti knihovny

Asynchronní agenti

Asynchronní bloků zprávy

Funkce předávání zpráv