Doporučené postupy v knihovně asynchronní agenti
Tento dokument popisuje, jak využít asynchronní agenti knihovny.Knihovna agenti propaguje k programovací model založený na herce a v procesu předávání pro coarse-grained tok dat a použití kanálů úkoly.
Další informace o knihovně agenti, viz Asynchronní agenti knihovny.
Oddíly
Tento dokument obsahuje následující oddíly:
Použití agentů izolát státu
Omezit počet zpráv v potrubí dat pomocí omezení mechanismus
Neprovádějte uzamykání pracovních příležitostí dat
Neprojde velké přenášených zpráv podle hodnoty
Použít shared_ptr datové sítě při vlastnictví je nedefinovaný
Použití agentů izolát státu
Agenti knihovna poskytuje alternativy sdíleného stavu umožňuje připojení izolovaných součástí prostřednictvím asynchronní mechanismus předávání zpráv.Asynchronní agenti jsou nejefektivnější při jejich zjištění jejich vnitřní stav z jiných součástí.Izolováním stát více součástí nevztahovala obvykle na sdílená data.Izolace stavu můžete povolit aplikace měřítko, protože snižuje tvrzení sdílené paměti.Stavu izolace také snižuje pravděpodobnost zablokování a rasy podmínky, protože není nutné synchronizovat přístup ke sdíleným datům součásti.
Obvykle izolovat státu agent podržením datové členy v private nebo protected třídy agent a pomocí vyrovnávací paměti zpráva komunikovat změny stavu.Následující příklad ukazuje basic_agent třídy, která pochází z concurrency::agent.basic_agent Třída používá ke komunikaci s externí součásti vyrovnávací paměti dvě zprávy.Jednu vyrovnávací paměť uchovává příchozí zprávy; Další zpráva vyrovnávací uchovává odchozí zprávy.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrives the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
Kompletní příklady jak definovat a používat agenty naleznete v tématu Názorný postup: Vytváření aplikace založená na agenta a Názorný postup: Vytvoření agenta datový tok.
Top
Omezit počet zpráv v potrubí dat pomocí omezení mechanismus
Mnoho typů vyrovnávací paměť zpráv jako concurrency::unbounded_buffer, může obsahovat neomezený počet zpráv.Když výrobce zprávu odesílá zprávy potrubí data rychleji než spotřebitel může zpracovat tyto zprávy, aplikace zadat stavu nedostatku paměti nebo o nedostatku paměti.Můžete omezit počet zpráv, které jsou současně aktivní potrubí data omezení mechanismu, například semafor.
Následující základní příklad demonstruje použití semafor omezit počet zpráv v potrubí data.Využití příležitostí data concurrency::wait funkce pro simulaci operaci, která trvá nejméně 100 milisekund.Protože odesílatel vytváří zprávy rychleji než spotřebitel může zpracovávat zprávy, tento příklad definuje semaphore třídy povolí aplikaci omezit počet aktivních zpráv.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
semaphore Objektu omezuje příležitosti zpracovat současně maximálně dvě zprávy.
Výrobce v tomto příkladu odešle zprávy relativně málo spotřebiteli.V tomto příkladu tedy není prokázat potenciální stavu nedostatku paměti nebo o nedostatku paměti.Tento mechanismus je však užitečné při příležitosti dat obsahuje poměrně vysoký počet zpráv.
Další informace o tom, jak vytvořit semafor třídy, která je použita v tomto příkladu Jak: použití třídy kontext pro provádění spolupráce semafor.
Top
Neprovádějte uzamykání pracovních příležitostí dat
Knihovna agentů je nejužitečnější při práci, která provádí dat potrubí je poměrně coarse-grained.Jednu součást aplikace mohou číst data ze souboru nebo připojení k síti a příležitostně odesíláním dat na jiné součásti.Protokol, který používá knihovna agenty k šíření zpráv způsobuje mechanismus předávání zpráv má další nároky než úkol paralelní konstrukce, poskytovaných Paralelní knihovnu vzorků (PPL).Proto zkontrolujte činnost vykonávaná potrubím data dostatečně dlouhé, aby toto zatížení posun.
Ačkoli data potrubí je nejúčinnější, pokud jsou coarse-grained jeho úkoly, každé fáze příležitosti dat pomocí PPL konstrukce skupiny úkolů a paralelní algoritmy více detailní práci.Příklad používající uzamykání rovnoběžnosti v každé fázi zpracování coarse-grained datové sítě naleznete v tématu Názorný postup: Vytváření sítě zpracování obrazu.
Top
Neprojde velké přenášených zpráv podle hodnoty
V některých případech runtime vytvoří kopii každé zprávy předá jednu vyrovnávací paměť pro zprávy do vyrovnávací paměti jinou zprávu.Například concurrency::overwrite_buffer třída nabízí kopie všech zpráv, které obdrží každé jeho cíle.Runtime také vytvoří kopii zprávy dat při použití funkce předávání zpráv, jako concurrency::send a concurrency::receive psát zprávy a číst zprávy z vyrovnávací paměť pro zprávy.Přestože tento mechanismus pomáhá eliminovat riziko souběžně sdílená data zápisu, může vést k výkonu špatné paměti při relativně velké datové zprávy.
Můžete použít odkazy nebo odkazy ke zlepšení výkonu paměti při předání zprávy mají velké datové části.Následující příklad porovnává passing velkých zpráv hodnotou ukazatele předání zprávy stejného typu.Příklad definuje dva typy agent producer a consumer, která působí na message_data objektů.Příklad porovnává čas potřebný pro výrobce odeslat několika message_data objekty spotřebiteli na dobu vyžadovanou pro agenta producentů odeslat několika ukazatele na message_data objekty spotřebiteli.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Tento příklad vytvoří následující výstup:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
Verzi, která používá ukazatele provádí lépe, protože odstraňuje požadavek runtime vytvořit kopii každé message_data objekt, který předá producent spotřebiteli.
Top
Použít shared_ptr datové sítě při vlastnictví je nedefinovaný
Při odesílání zpráv ukazatel prostřednictvím předávání zpráv potrubí nebo sítě obvykle přidělit paměť pro každou zprávu na přední straně sítě a uvolnit paměť na konci sítě.Ačkoli tento mechanismus často funguje dobře, existují případy, ve kterých je obtížné nebo není možné ji používat.Zvažte například případ, kdy datovou síť obsahuje více uzlů konec.V tomto případě není žádné jasné umístění uvolnit paměť pro zprávy.
Tento problém vyřešit, můžete pomocí mechanismu, například std::shared_ptr, umožňující ukazatel vlastněné více součástmi.Při závěrečné shared_ptr zdroje je rovněž uvolněna, je zničení objektu, který je vlastníkem prostředku.
Následující příklad demonstruje použití shared_ptr sdílet hodnoty ukazatele mezi vyrovnávací paměti více zpráv.Příklad připojení concurrency::overwrite_buffer objektu na tři concurrency::call objektů.overwrite_buffer Třída nabízí zprávy každé jeho cíle.Protože existuje více vlastníků dat na konci datové sítě, v tomto příkladu shared_ptr každé povolení call objektu sdílení zpráv.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Tento příklad vytvoří následující výstup:
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
Viz také
Úkoly
Názorný postup: Vytváření aplikace založená na agenta
Názorný postup: Vytvoření agenta datový tok
Názorný postup: Vytváření sítě zpracování obrazu
Koncepty
Doporučené postupy v paralelní knihovny vzorků
Obecné doporučené postupy v souběžném běhu