Sdílet prostřednictvím


Názorný postup: Vytvoření agenta datový tok

V tomto dokumentu ukazuje, jak vytvořit agenta aplikacích založených na datový tok namísto řízení toku.

Řízení toku odkazuje na pořadí provádění operací v programu.Řízení toku je regulována pomocí struktury řízení podmíněné příkazy smyček a tak dále.Případně datový tok odkazuje na programovací model, ve kterém jsou provedeny výpočty, pouze pokud všechna požadovaná data je k dispozici.Programovací model datový tok souvisí pojem předávání, zpráv, ve kterém nezávislé součásti programu komunikují pomocí odesílání zpráv.

Asynchronní agenti podporu toku řízení i programování modely datový tok.Přestože model řízení toku je vhodné v mnoha případech je tok dat modelu v jiné například agent přijímá data a provede akci, která je na základě tohoto data.

Požadavky

Před zahájením tohoto postupu, přečtěte si následující dokumenty:

Oddíly

Tento návod obsahuje následující oddíly:

  • Vytvoření základní Agent tok řízení

  • Vytvoření základní Agent datový tok

  • Vytvoření agenta protokolování zpráv

Vytvoření základní Agent tok řízení

Zvažte následující příklad, který definuje control_flow_agent třídy.control_flow_agent Třída slouží pro tři zprávy vyrovnávací paměti: jeden vstupní vyrovnávací paměti a dvě výstupní vyrovnávací paměti.run Metoda načte z vyrovnávací paměti zdroj zprávy ve smyčce a pomocí podmíněného příkazu tok provádění programu.Agent zvýší jeden čítač nulová, záporné hodnoty a zvyšuje hodnotu čítače jiné nulová, kladné hodnoty.Jakmile agent obdrží sentinel hodnota nula, odešle výstupní vyrovnávací paměti zprávy hodnoty čítačů.negatives a positives metod povolit aplikaci čtení počty záporné a kladné hodnoty od agenta.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Přestože tento příklad vytvoří základní použití řízení toku agent, ukazuje sériové povahy programování založené řízení toku.Každá zpráva musí být zpracovány sekvenčně, přestože více zpráv, může být k dispozici zprávu vstupní vyrovnávací paměti.Tok dat modelu umožňuje obě větve podmíněný příkaz vyhodnotit souběžně.Tok dat modelu můžete také vytvořit složitější messaging sítě, které fungují na data, jakmile je k dispozici.

Top

Vytvoření základní Agent datový tok

Tato část ukazuje, jak převést control_flow_agent třídy použití modelu tok dat provést stejný úkol.

Agent tok dat funguje na vytvoření sítě vyrovnávací paměti pro zprávy, které slouží konkrétní účel.Blokuje určité zprávy pomocí funkce filtru přijmout nebo odmítnout zprávu na základě své datové části.Funkce filtru zajišťuje, že blok zprávy přijímá pouze určité hodnoty.

Převést na datový tok agenta agent tok řízení

  1. Kopírování textu control_flow_agent třídy do jiné třídy, například dataflow_agent.Alternativně můžete přejmenovat control_flow_agent třídy.

  2. Odebrat těla smyčky, která volá receive z run metoda.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. V run metodu inicializace proměnných po negative_count a positive_count, přidat countdown_event objekt, který sleduje počet aktivních operací.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    countdown_event Třídy je uveden dále v tomto tématu.

  4. Vytvořte vyrovnávací paměti objektů, které se budou podílet zprávy v síti datový tok.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Vyrovnávací paměti zprávy k síti připojte.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Počkejte event a countdown event objektů nastavení.Tyto události signál, že agent obdržel sentinel hodnotu a že po dokončení všech operací.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

Následující diagram znázorňuje tok dat dokončení sítě dataflow_agent třídy:

Síťový datový tok

Následující tabulka popisuje členům sítě.

Člen

Description

increment_active

A concurrency::transformer objekt, který zvyšuje hodnotu čítače událostí aktivní a předává zbytek sítě vstupní hodnotu.

negatives, positives

CONCURRENCY::Call objekty, které zvýšit počet čísel a sníží Čítač Aktivní události.Jednotlivé objekty pomocí filtru přijmout záporná čísla nebo kladná čísla.

sentinel

A concurrency::call objektu, která přijímá sentinel hodnotu nula a sníží Čítač Aktivní události.

connector

A concurrency::unbounded_buffer objekt, který se připojuje k interní síti vyrovnávací paměť pro zprávy zdroje.

Protože run na samostatný podproces je volána metoda, další podprocesy mohou odesílat zprávy v síti před síti připojen._source Je datový člen unbounded_buffer objekt, který všechny vstupní odeslané z aplikace agenta vyrovnávacích pamětí.K síti zpracovává všechny vstupní zprávy, agent nejprve odkazy vnitřní uzly v síti a potom propojí start této sítě, connector, až _source datový člen.Zaručuje, že zprávy není získat zpracována jako vytvořeného v síti.

Protože v tomto příkladu sítě je založena na datový tok, spíše než na toku řízení sítě sdělí agent že ji dokončil zpracování každé vstupní hodnoty a že sentinel uzlu obdržel jeho hodnotu.V tomto příkladu countdown_event objektu signál všechny vstupní hodnoty byly zpracovány a concurrency::event objektu označuje, že uzel sentinel získala její hodnotu.countdown_event Třída používá event signál dosáhne nulové hodnoty čítače objektu.Vedoucí sítě datový tok zvýší Čítač pokaždé, kdy je obdrží hodnotu.Každý terminálový uzel sítě sníží Čítač po zpracovává vstupní hodnotu.Po formulářů datový tok síťového agenta čeká sentinel uzlu nastavení event objektu a countdown_event objektu signál, že jeho Čítač dosáhl nuly.

Následující příklad ukazuje control_flow_agent, dataflow_agent, a countdown_event tříd.wmain Funkce vytvoří control_flow_agent a dataflow_agent objekt a použije send_values funkce Odeslat agentů posloupnost náhodných hodnot.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Tento příklad vytvoří následující výstup:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Ff398051.collapse_all(cs-cz,VS.110).gifProbíhá kompilace kódu

Příklad kódu zkopírujte a vložte do projektu Visual Studio nebo vložit do souboru s názvem datový tok agent.cpp a spusťte následující příkaz v okně příkazového řádku Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

Top

Vytvoření agenta protokolování zpráv

Následující příklad ukazuje log_agent třídy, která se podobá dataflow_agent třídy.log_agent Třída implementuje protokolování asynchronní agent, zapíše do souboru a konzoly protokolovat zprávy.log_agent Třídy povolí aplikaci do kategorií jako informační zprávy, upozornění nebo chyba.Také umožňuje aplikaci určit, zda každé kategorie protokolu zapisovány do souboru nebo konzoly.Tento příklad zapíše všechny zprávy protokolu do souboru a pouze chybové zprávy konzoly.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

Tento příklad zapíše následující výstup konzoly.

error: This is a sample error message.

Tento příklad vytvoří také soubor log.txt, který obsahuje následující text.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Ff398051.collapse_all(cs-cz,VS.110).gifProbíhá kompilace kódu

Příklad kódu zkopírujte a vložte do projektu Visual Studio nebo vložit do souboru s názvem protokolu filter.cpp a spusťte následující příkaz v okně příkazového řádku Visual Studio.

cl.exe /EHsc log-filter.cpp

Top

Viz také

Další zdroje

Souběžnost Runtime návody