Udostępnij za pośrednictwem


Biblioteka agentów asynchronicznych — Najlepsze praktyki

Ten dokument zawiera informacje dotyczące efektywnego wykorzystania asynchronicznego biblioteki agentów.Biblioteki agentów promuje model programowania opartego na aktor i wiadomości w trakcie przekazywania dla gruboziarnistych przepływ danych oraz przetwarzanie potokowe zadań.

Aby uzyskać więcej informacji dotyczących biblioteki agentów, zobacz Biblioteka agentów asynchronicznych.

Sekcje

Ten dokument zawiera następujące sekcje:

  • Skorzystaj z agentów, aby odizolować stan

  • Użyj mechanizmu dławiącego, aby ograniczyć liczbę komunikatów w potoku danych

  • Nie wykonuj szczegółowych prac w potoku danych

  • Nie przepuszczaj dużych bloków komunikatów według wartości

  • Użyj shared_ptr w danych sieciowych, gdy nie określono własności

Skorzystaj z agentów, aby odizolować stan

Biblioteka agentów zapewnia rozwiązania alternatywne w stosunku do stanu udostępnionego umożliwiając łączą składniki na białym tle poprzez mechanizm przekazywania wiadomości asynchronicznych.Asynchroniczne agentów są najbardziej efektywne, gdy izolować ich stan wewnętrzny z innymi składnikami.Izolując Państwo wiele składników nie zwykle działają na udostępnionych danych.Izolacja stanu można włączyć aplikacji do skalowania, ponieważ zmniejsza Rywalizacja o pamięci współużytkowanej.Stan izolacji zmniejsza szanse na powstanie warunków zakleszczenie i wyścigu ponieważ składniki nie mają dostępu do udostępnionych danych synchronizacji.

Zazwyczaj wyizolować Państwa w agenta poprzez posiadanie danych członków w private lub protected sekcji klasy agenta i za pomocą informacji o zmianach stanu wiadomości buforów.W poniższym przykładzie pokazano basic_agent klasa, która wywodzi się z concurrency::agent.basic_agent Klasa wykorzystuje dwa bufory wiadomość do komunikacji ze składnikami zewnętrznych.Posiada jeden bufor komunikatów przychodzących wiadomości; Bufor komunikatów przechowuje wiadomości wychodzących.

// basic-agent.cpp 
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate 
// with other components. 
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer. 
         int value = concurrency::receive(_input);

         // TODO: Do something with the value. 
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Kompletne przykłady dotyczące sposobu definiowania i korzystania z agentów, zobacz Wskazówki: tworzenie aplikacji opartej o agentów i Wskazówki: tworzenie agenta przepływu danych.

[U góry]

Użyj mechanizmu dławiącego, aby ograniczyć liczbę komunikatów w potoku danych

Wiele typów bufor komunikatów, takich jak concurrency::unbounded_buffer, może zawierać nieograniczoną liczbę wiadomości.Gdy producent wiadomości szybciej niż konsument może przetwarzać te wiadomości wysyła wiadomości do rurociągu danych, aplikacji można wprowadzić stan niskiego pamięci lub braku pamięci.Mechanizm ograniczania przepustowości, na przykład semafora umożliwia Ogranicz liczbę wiadomości, które są jednocześnie aktywnych w rurociągu danych.

Poniższy przykład podstawowe ilustruje umożliwia ograniczenie liczby wiadomości w potoku danych semafora.Rurociągi do przesyłania danych używa concurrency::wait funkcji do symulacji operacji, która co najmniej 100 milisekund.Ponieważ nadawca daje szybciej niż konsument może przetwarzać wiadomości te wiadomości, w tym przykładzie definiuje semaphore klasy, aby umożliwić aplikacji Ogranicz liczbę aktywnych wiadomości.

// message-throttling.cpp 
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics. 
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore. 
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count  
      // falls below zero. When this happens, add the current context to the  
      // back of the wait queue and block the current context. 
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore. 
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context. 
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not 
         // yet finished adding the context to the queue.  
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to  
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its  
// count reaches zero. 
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero. 
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter. 
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter. 
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set. 
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero. 
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer. 
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time. 
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest 
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion 
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages. 

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The second stage of the pipeline simulates a  
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The third stage of the pipeline releases the semaphore 
   // and signals to the main appliation that the message has 
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

semaphore Obiektu ogranicza potoku do przetwarzania w tym samym czasie co najwyżej dwa komunikaty.

Producent w tym przykładzie wysyła komunikaty stosunkowo niewiele dla konsumenta.W związku z tym w tym przykładzie nie wykazują potencjalne warunek ilości pamięci lub braku pamięci.Jednak mechanizm ten jest przydatny, gdy rurociąg danych zawiera stosunkowo dużej liczby wiadomości.

Aby uzyskać więcej informacji dotyczących sposobu tworzenia klasy semafora, który jest używany w tym przykładzie, zobacz Porady: korzystanie z klasy kontekstu do wdrażania a kooperatywnego semafora.

[U góry]

Nie wykonuj szczegółowych prac w potoku danych

Biblioteki agentów jest najbardziej przydatna podczas pracy, która jest wykonywana przez rurociąg danych jest dość gruboziarnistych.Na przykład jeden składnik aplikacji może odczytać danych z pliku lub połączenie sieciowe i od czasu do czasu wysłania danych do innego składnika.Protokół, który używa biblioteki agentów do propagowania wiadomości powoduje, że mechanizm przekazywania wiadomości mają większe obciążenie niż konstrukcje równoległych zadań, dostarczanych przez Równoległe Biblioteka wzorców (PPL).W związku z tym upewnij się, że praca, wykonywaną przez rurociąg danych jest wystarczająco długi zrównoważyć obciążenie tego.

Mimo że rurociąg danych jest najbardziej efektywne, gdy jego zadania są gruboziarnistych, każdy etap potoku danych służą do wykonywania bardziej szczegółowymi zasadami pracy konstrukcje PPL, takich jak grupy zadań i algorytmy równoległe.Na przykład sieci gruboziarnistych danych, które używa drobnoziarnistych równoległości na każdym etapie przetwarzania, zobacz Wskazówki: tworzenie sieci przetwarzania obrazów.

[U góry]

Nie przepuszczaj dużych bloków komunikatów według wartości

W niektórych przypadkach środowiska wykonawczego tworzona jest kopia każdej wiadomości, która przechodzi z jednego buforu wiadomość do innego bufor komunikatów.Na przykład concurrency::overwrite_buffer klasa oferuje kopię każdej wiadomości odbierającym każdemu z jego elementów docelowych.Środowisko wykonawcze tworzy również kopię danych wiadomości podczas korzystania z funkcji przekazywania wiadomości w takich jak concurrency::send i concurrency::receive do zapisywania wiadomości do i odczytywać wiadomości z bufor komunikatów.Chociaż ten mechanizm pomaga wyeliminować ryzyko jednocześnie pisania do udostępnionych danych, gdy zawartość wiadomości jest stosunkowo duże może prowadzić do pamięci słabej wydajności.

Można użyć wskaźników lub odwołania, aby zwiększyć wydajność pamięci podczas przekazywania wiadomości mają o dużej ładowności.Poniższy przykład porównuje umierający dużych wiadomości przez wartość do przekazania wskaźniki komunikaty tego samego typu.W przykładzie zdefiniowano dwa typy agenta, producer i consumer, które działają na message_data obiektów.W przykładzie porównuje czas wymagany dla producenta do wysyłania kilku message_data obiektów dla konsumenta do czasu, który jest wymagany dla producentów agenta do wysyłania kilku wskaźników do message_data obiektów do konsumenta.

// message-payloads.cpp 
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds  
// that it takes to call that function. 
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data. 
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values. 
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*. 
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values. 
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.  
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ... 

      // Release the memory for the message. 
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send. 
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents. 
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time. 
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Ten przykład generuje następujące przykładowe dane wyjściowe:

  

Wersja, która za pomocą wskaźników działa lepiej, ponieważ eliminuje wymogów dotyczących czasu wykonywania utworzyć pełną kopię każdego message_data obiekt, który przechodzi od producenta do konsumenta.

[U góry]

Użyj shared_ptr w danych sieciowych, gdy nie określono własności

Podczas wysyłania wiadomości przez wskaźnik za pośrednictwem procesu przekazywania wiadomości lub sieci, zazwyczaj przydzielić pamięci dla każdej wiadomości w przedniej części sieci i zwolnić pamięć, że pod koniec sieci.Chociaż ten mechanizm często działa dobrze, istnieją sytuacje, w których jest trudne lub nie można go używać.Na przykład należy rozważyć przypadek, w którym sieci danych zawiera wiele węzłów koniec.W tym przypadku jest lokalizacja nie Wyczyść, aby zwolnić pamięć dla wiadomości.

Aby rozwiązać ten problem, można użyć mechanizmu, na przykład, std::shared_ptr, który umożliwia wskaźnik do posiadanych przez wiele składników.Po końcowym shared_ptr niszczony jest obiekt, który jest właścicielem zasobu, zasób również zostanie zwolniona.

Poniższy przykład pokazuje sposób użycia shared_ptr do wartości wskaźnika wśród wielu buforów komunikatu akcji.Przykład łączy concurrency::overwrite_buffer obiektu do trzech concurrency::call obiektów.overwrite_buffer Klasa oferuje wiadomości do każdego z jego elementów docelowych.Ponieważ istnieje wiele właścicieli danych na końcu sieci danych, w tym przykładzie użyto shared_ptr umożliwienie każdej call obiekt, aby udostępnić prawa własnooci wiadomości.

// message-sharing.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource. 
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource. 
   int id() const { return _id; }

   // TODO: Add additional members here. 
private:
   // An identifier for the resource. 
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects 
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to  
   // receive its message.
   e.wait();
}

Ten przykład generuje następujące przykładowe dane wyjściowe:

  

Zobacz też

Zadania

Wskazówki: tworzenie aplikacji opartej o agentów

Wskazówki: tworzenie agenta przepływu danych

Wskazówki: tworzenie sieci przetwarzania obrazów

Koncepcje

Biblioteka agentów asynchronicznych

Biblioteka wzorów równoległych — Najlepsze praktyki

Współbieżność środowiska wykonawczego — Najlepsze praktyki ogólne

Inne zasoby

Współbieżność środowiska wykonawczego — Najlepsze praktyki