Condividi tramite


Procedure consigliate nella libreria di agenti asincroni

In questo documento viene descritto come ottimizzare l'utilizzo della libreria di agenti asincroni. La libreria di agenti promuove un modello di programmazione basato su attori e un passaggio dei messaggi in-process per le attività di pipelining o per un flusso di dati con granularità grossolana.

Per ulteriori informazioni sulla libreria di agenti, vedere Libreria di agenti asincroni.

Sezioni

Il documento include le sezioni seguenti:

  • Utilizzare gli agenti per isolare lo stato

  • Utilizzare un meccanismo di limitazione per limitare il numero di messaggi in una pipeline di dati

  • Non eseguire il lavoro con granularità fine in una pipeline di dati

  • Non passare payload di messaggi di grandi dimensioni per valore

  • Utilizzare shared_ptr in una rete di dati quando la proprietà non è definita

Utilizzare gli agenti per isolare lo stato

La libreria di agenti fornisce le alternative allo stato condiviso consentendo di connettere componenti isolati tramite un meccanismo di passaggio dei messaggi asincrono. Gli agenti asincroni sono più efficaci quando isolano il relativo stato interno da altri componenti. Isolando lo stato, più componenti non agiscono in genere su dati condivisi. L'isolamento dello stato consente all'applicazione di essere scalata poiché riduce la contesa sulla memoria condivisa. L'isolamento dello stato riduce inoltre la possibilità di deadlock e race condition poiché non è necessario per i componenti sincronizzare l'accesso ai dati condivisi.

In genere, lo stato in un agente viene isolato mantenendo i membri dati nelle sezioni protected o private della classe dell'agente e utilizzando i buffer dei messaggi per comunicare le modifiche dello stato. Nell'esempio seguente viene definita la classe basic_agent, che deriva da Concurrency::agent. La classe basic_agent utilizza due buffer dei messaggi per comunicare con i componenti esterni. Un buffer dei messaggi contiene i messaggi in arrivo, l'altro contiene i messaggi in uscita.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

Per gli esempi completi su come definire e utilizzare gli agenti, vedere Procedura dettagliata: creazione di un'applicazione basata sugli agenti e Procedura dettagliata: creazione di un agente del flusso di dati.

[vai all'inizio]

Utilizzare un meccanismo di limitazione per limitare il numero di messaggi in una pipeline di dati

Molti tipi di buffer dei messaggi, ad esempio Concurrency::unbounded_buffer, possono contenere un numero illimitato di messaggi. Quando un producer di messaggi invia i messaggi in una pipeline di dati più velocemente di quanto il consumer sia in grado di elaborare, l'applicazione può passare a uno stato di memoria insufficiente. È possibile utilizzare un meccanismo di limitazione, ad esempio un semaforo, per limitare il numero di messaggi contemporaneamente attivi in una pipeline di dati.

Nell'esempio di base seguente viene illustrato come utilizzare un semaforo per limitare il numero di messaggi in una pipeline di dati. La pipeline di dati utilizza la funzione Concurrency::wait per simulare un'operazione che richiede almeno 100 millisecondi. Poiché il mittente produce messaggi più velocemente di quanto il consumer sia in grado di elaborare, in questo esempio viene definita la classe semaphore per consentire all'applicazione di limitare il numero di messaggi attivi.

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity);

   // Acquires access to the semaphore.
   void acquire();

   // Releases access to the semaphore.
   void release();

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L);

   // Decrements the event counter.
   void signal();

   // Increments the event counter.
   void add_count();

   // Blocks the current context until the event is set.
   void wait();

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};


int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();   
}

//
// semaphore class implementation.
//

semaphore::semaphore(LONG capacity)
   : _semaphore_count(capacity)
{
}

// Acquires access to the semaphore.
void semaphore::acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (InterlockedDecrement(&_semaphore_count) < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}

// Releases access to the semaphore.
void semaphore::release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (InterlockedIncrement(&_semaphore_count) <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      if (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

//
// countdown_event class implementation.
//

countdown_event::countdown_event(unsigned int count)
   : _current(static_cast<long>(count)) 
{
   // Set the event if the initial count is zero.
   if (_current == 0L)
      _event.set();
}

// Decrements the event counter.
void countdown_event::signal() {
   if(InterlockedDecrement(&_current) == 0L) {
      _event.set();
   }
}

// Increments the event counter.
void countdown_event::add_count() {
   if(InterlockedIncrement(&_current) == 1L) {
      _event.reset();
   }
}

// Blocks the current context until the event is set.
void countdown_event::wait() {
   _event.wait();
}

Questo esempio produce l'output seguente:

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

L'oggetto semaphore limita la pipeline in modo da elaborare al massimo due messaggi contemporaneamente.

In questo esempio il producer invia al consumer un numero relativamente basso di messaggi. Nell'esempio non viene pertanto illustrata una condizione potenziale di memoria insufficiente. Tuttavia, questo meccanismo risulta utile quando una pipeline di dati contiene un numero relativamente elevato di messaggi.

Per ulteriori informazioni su come creare la classe semaforo utilizzata in questo esempio, vedere Procedura: utilizzare la classe Context per implementare una classe semaforo di cooperazione.

[vai all'inizio]

Non eseguire il lavoro con granularità fine in una pipeline di dati

La libreria di agenti è molto utile quando il lavoro eseguito da una pipeline di dati è con granularità grossolana. Ad esempio, un componente dell'applicazione potrebbe leggere i dati da un file o da una connessione di rete e inviare saltuariamente i dati a un altro componente. Il protocollo utilizzato dalla libreria di agenti per propagare i messaggi comporta un sovraccarico maggiore del meccanismo di passaggio dei messaggi rispetto ai costrutti paralleli delle attività forniti dalla libreria PPL (Parallel Patterns Library). Assicurarsi pertanto che il lavoro eseguito da una pipeline di dati sia sufficientemente lungo da compensare tale sovraccarico.

Sebbene una pipeline di dati sia più efficiente quando le relative attività sono con granularità grossolana, ogni fase della pipeline di dati può utilizzare i costrutti della libreria PPL come i gruppi di attività e gli algoritmi paralleli per eseguire un lavoro con granularità più fine. Per un esempio di una rete di dati con granularità grossolana che utilizza un parallelismo con granularità fine in ogni fase di elaborazione, vedere Procedura dettagliata: creazione di una rete per l'elaborazione di immagini.

[vai all'inizio]

Non passare payload di messaggi di grandi dimensioni per valore

In alcuni casi, il runtime crea una copia di ogni messaggio che passa da un buffer dei messaggi all'altro. La classe Concurrency::overwrite_buffer offre, ad esempio, una copia di ogni messaggio che riceve a ciascuna delle relative destinazioni. Il runtime crea inoltre una copia dei dati del messaggio quando si utilizzano le funzioni di passaggio dei messaggi come Concurrency::send e Concurrency::receive per scrivere e leggere i messaggi da un buffer dei messaggi. Sebbene questo meccanismo consenta di eliminare il rischio di scrivere contemporaneamente nei dati condivisi, può comportare una riduzione delle prestazioni di memoria quando le dimensioni del payload del messaggio risultano relativamente grandi.

È possibile utilizzare puntatori o riferimenti per migliorare le prestazioni di memoria quando si passano messaggi con un payload di grandi dimensioni. Nell'esempio seguente viene confrontato il passaggio di messaggi di grandi dimensioni per valore con il passaggio di puntatori allo stesso tipo di messaggio. Nell'esempio vengono definiti due tipi di agente, producer e consumer, che agiscono sugli oggetti message_data. Nell'esempio viene confrontato il tempo impiegato dal producer per inviare al consumer diversi oggetti message_data con il tempo impiegato dall'agente del producer per inviare al consumer diversi puntatori agli oggetti message_data.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Questo esempio produce l'output seguente:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Le prestazioni della versione utilizzata dai puntatori sono migliori poiché non è necessario per il runtime creare una copia completa di ogni oggetto message_data che passa dal producer al consumer.

[vai all'inizio]

Utilizzare shared_ptr in una rete di dati quando la proprietà non è definita

Quando si inviano messaggi dal puntatore tramite una pipeline o una rete di passaggio dei messaggi, in genere viene allocata memoria per ogni messaggio all'inizio della rete e liberata memoria alla fine della rete. Sebbene questo meccanismo funzioni quasi sempre bene, vi sono casi in cui è più complesso o non è possibile utilizzarlo. Ad esempio, si consideri il caso in cui la rete di dati contiene più nodi finali. In questo caso, la posizione in cui liberare la memoria per i messaggi non è chiara.

Per risolvere questo problema, è possibile utilizzare un meccanismo, ad esempio std::shared_ptr, che consente a più componenti di essere proprietari di un puntatore. Quando l'oggetto shared_ptr finale proprietario di una risorsa viene eliminato, viene liberata anche la risorsa.

Nell'esempio seguente seguito viene illustrato come utilizzare shared_ptr per condividere i valori del puntatore tra più buffer dei messaggi. Nell'esempio un oggetto Concurrency::overwrite_buffer viene connesso a tre oggetti Concurrency::call. La classe overwrite_buffer offre i messaggi a ciascuna delle relative destinazioni. Poiché esistono più proprietari dei dati alla fine della rete di dati, nell'esempio viene utilizzato shared_ptr per consentire a ogni oggetto call di condividere la proprietà dei messaggi.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Questo esempio produce l'output seguente:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Vedere anche

Attività

Procedura dettagliata: creazione di un'applicazione basata sugli agenti

Concetti

Procedure consigliate del runtime di concorrenza

Libreria di agenti asincroni

Altre risorse

Procedura dettagliata: creazione di un agente del flusso di dati

Procedura dettagliata: creazione di una rete per l'elaborazione di immagini

Procedure consigliate nella libreria PPL (Parallel Patterns Library)

Procedure consigliate generali nel runtime di concorrenza