Práticas recomendadas para a biblioteca de agentes assíncronos
Este documento descreve como fazer uso eficaz da biblioteca de agentes assíncronos. A biblioteca de agentes promove um modelo de programação baseada em ator e uma mensagem de no processo, passando para o fluxo de dados refinado e a canalização de tarefas.
Para obter mais informações sobre a biblioteca de agentes, consulte Biblioteca de agentes assíncronos.
Seções
Este documento contém as seções a seguir:
Usar os agentes para isolar estado
Use um mecanismo de otimização para limitar o número de mensagens em um Pipeline de dados
Não execute o trabalho minucioso em um Pipeline de dados
Não passe grandes cargas de mensagem por valor
Use shared_ptr um dados rede quando propriedade é indefinido
Usar os agentes para isolar estado
A biblioteca de agentes fornece alternativas para o estado compartilhado, permitindo que você conecte componentes isolados através de um mecanismo de passagem de mensagens assíncrono. Agentes assíncronos são mais eficazes quando eles isolar o seu estado interno de outros componentes. Isolando o estado, vários componentes normalmente atuam em dados compartilhados. Isolamento de estado pode habilitar seu aplicativo dimensionar, pois ele reduz a contenção de memória compartilhada. Isolamento de estado também reduz a chance de deadlock, condições de corrida porque não tem componentes sincronizar o acesso aos dados compartilhados.
Você normalmente isolar o estado de um agente mantendo os membros de dados na private ou protected seções da classe de agente e usando os buffers de mensagem para comunicar alterações de estado. A exemplo a seguir mostra a basic_agent classe, que é derivada de Concurrency::agent. O basic_agent classe usa dois buffers de mensagem para se comunicar com componentes externos. Um buffer de mensagem contém mensagens de entrada; buffer de mensagem contém mensagens de saída.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
basic_agent(Concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrives the message buffer that holds output messages.
Concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = Concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
Concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
Concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
Concurrency::unbounded_buffer<int> _output;
};
Para obter exemplos completos sobre como definir e usar os agentes, consulte Demonstra Passo a passo: Criando um aplicativo baseado em agente e Demonstra Passo a passo: Criando um agente de fluxo de dados.
go to top
Use um mecanismo de otimização para limitar o número de mensagens em um Pipeline de dados
Muitos tipos de buffer de mensagem, como Concurrency::unbounded_buffer, pode conter um número ilimitado de mensagens. Quando um produtor de mensagem envia mensagens a um pipeline de dados mais rápido do que o consumidor pode processar essas mensagens, o aplicativo pode entrar em um estado de pouca memória ou insuficiência de memória. Você pode usar um mecanismo de otimização, por exemplo, um semáforo para limitar o número de mensagens que estão ativas simultaneamente em um pipeline de dados.
O seguinte exemplo básico demonstra como usar um semáforo para limitar o número de mensagens em um pipeline de dados. Os dados do pipeline usa a Concurrency::wait a função para simular uma operação que leva menos de 100 milissegundos. Como o remetente produz mensagens mais rapidamente do que o consumidor pode processar essas mensagens, este exemplo define o semaphore classe para habilitar o aplicativo limitar o número de mensagens de ativo.
// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace Concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(LONG capacity);
// Acquires access to the semaphore.
void acquire();
// Releases access to the semaphore.
void release();
private:
// The semaphore count.
LONG _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L);
// Decrements the event counter.
void signal();
// Increments the event counter.
void add_count();
// Blocks the current context until the event is set.
void wait();
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const int MessageCount = 5;
// The number of messages that can be active at the same time.
const long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(int i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
//
// semaphore class implementation.
//
semaphore::semaphore(LONG capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void semaphore::acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (InterlockedDecrement(&_semaphore_count) < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void semaphore::release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (InterlockedIncrement(&_semaphore_count) <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
if (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
//
// countdown_event class implementation.
//
countdown_event::countdown_event(unsigned int count)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void countdown_event::signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void countdown_event::add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void countdown_event::wait() {
_event.wait();
}
Este exemplo produz a saída de exemplo a seguir:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
O semaphore objeto limita o pipeline de processamento de no máximo duas mensagens ao mesmo tempo.
O produtor neste exemplo envia mensagens relativamente poucas ao consumidor. Portanto, este exemplo não demonstre uma possível condição de pouca memória ou insuficiência de memória. No entanto, esse mecanismo é útil quando um pipeline de dados contém um número relativamente alto de mensagens.
Para obter mais informações sobre como criar a classe semaphore usada neste exemplo, consulte Como: Use a classe de contexto para implementar um semáforo cooperativo.
go to top
Não execute o trabalho minucioso em um Pipeline de dados
A biblioteca de agentes é mais útil quando o trabalho realizado por um pipeline de dados é bastante refinado. Por exemplo, um componente do aplicativo pode ler dados de um arquivo ou uma conexão de rede e ocasionalmente enviar esses dados para outro componente. O protocolo que a biblioteca de agentes usa para propagar as mensagens faz com que o mecanismo de passagem de mensagem ter mais sobrecarga do que as construções de paralela de tarefas são fornecidos pelo Biblioteca paralela de padrões (PPL). Portanto, certifique-se de que o trabalho realizado por um pipeline de dados é grande o suficiente para compensar essa sobrecarga.
Embora um pipeline de dados é mais eficaz quando suas tarefas são refinadas, cada estágio do pipeline de dados pode usar construções PPL como, por exemplo, grupos de tarefas e algoritmos paralelos para realizar mais refinado trabalho. Para obter um exemplo de uma rede de dados refinados que usa o paralelismo refinado em cada estágio de processamento, consulte Demonstra Passo a passo: A criação de uma rede de processamento de imagens.
go to top
Não passe grandes cargas de mensagem por valor
Em alguns casos, o runtime cria uma cópia de cada mensagem que ele passa de um buffer de mensagem para outro buffer de mensagem. Por exemplo, o Concurrency::overwrite_buffer classe oferece uma cópia de cada mensagem que ele recebe a cada um de seus destinos. O runtime também cria uma cópia dos dados da mensagem ao usar funções de transmissão de mensagens, como Concurrency::send e Concurrency::receive para escrever mensagens e ler mensagens de um buffer de mensagem. Embora este mecanismo ajuda a eliminar o risco de gravar simultaneamente dados compartilhados, poderia levar a desempenho ruim de memória quando a carga da mensagem é relativamente grande.
Você pode usar ponteiros ou referências para melhorar o desempenho de memória quando você passa mensagens que têm uma grande carga. O exemplo a seguir compara as mensagens grandes passando pelo valor para passar ponteiros para o mesmo tipo de mensagem. O exemplo define dois tipos de agente, producer e consumer, que atuam em message_data objetos. O exemplo compara o tempo necessário para o produtor enviar várias message_data objetos para o consumidor até o momento em que é necessário para o agente de produtor enviar vários ponteiros para message_data objetos para o consumidor.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Este exemplo produz a saída de exemplo a seguir:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
A versão que usa ponteiros realiza, melhor, pois elimina a necessidade de tempo de execução criar uma cópia completa de todas as message_data o objeto que ele passa do produtor para o consumidor.
go to top
Use shared_ptr um dados rede quando propriedade é indefinido
Quando você enviar mensagens pelo ponteiro através de um pipeline de transmissão de mensagens ou rede, você normalmente alocar a memória para cada mensagem na frente da rede e liberar memória no final da rede. Embora esse mecanismo freqüentemente funciona bem, há casos em que é difícil ou não é possível usá-lo. Por exemplo, considere o caso em que a rede de dados contém vários nós de extremidade. Nesse caso, não há nenhum local de limpar para liberar a memória para as mensagens.
Para resolver esse problema, você pode usar um mecanismo, por exemplo, std::shared_ptr, que permite que um ponteiro para ser possuídos por vários componentes. Quando o final shared_ptr o objeto que possui um recurso é destruído, o recurso também é liberado.
O exemplo a seguir demonstra como usar shared_ptr para compartilhar valores de ponteiro entre vários buffers de mensagem. O exemplo conecta um Concurrency::overwrite_buffer o objeto para três Concurrency::call objetos. O overwrite_buffer classe oferece mensagens para cada um dos destinos. Como há vários proprietários dos dados no final da rede de dados, este exemplo usa shared_ptr para habilitar cada call o objeto para compartilhar a propriedade mensagens.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace Concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Este exemplo produz a saída de exemplo a seguir:
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
Consulte também
Tarefas
Demonstra Passo a passo: Criando um aplicativo baseado em agente
Conceitos
As práticas recomendadas de Runtime de simultaneidade
Biblioteca de agentes assíncronos
Outros recursos
Demonstra Passo a passo: Criando um agente de fluxo de dados
Demonstra Passo a passo: A criação de uma rede de processamento de imagens
Práticas recomendadas para a biblioteca de padrões paralelos
Práticas recomendadas de gerais no Runtime de simultaneidade