逐步解說:建立資料流程代理程式
本文件說明如何建立以資料流程 (而不是控制流程) 為基礎的代理程式架構應用程式。
「控制流程」(Control Flow) 是指程式中作業執行的順序。 控制流程是透過條件陳述式、迴圈等控制結構來規範。 另一方面,「資料流程」(Dataflow) 是指只在所有必要資料皆可用的情況下執行計算的程式撰寫模型。 資料流程程式撰寫模型與訊息傳遞的概念有關,在此模型中,程式的獨立元件可藉由訊息傳送相互通訊。
非同步代理程式支援控制流程和資料流程程式撰寫模型。 雖然控制流程模型適合許多情況,但在某些情況下,資料流程模型更適用,例如當代理程式接收資料並根據該資料的裝載來執行動作時。
必要條件
在您開始閱讀此逐步解說前,請先參閱下列文件:
章節
此逐步解說包含下列章節:
建立基本的控制流程代理程式
建立基本的資料流程代理程式
建立訊息記錄代理程式
建立基本的控制流程代理程式
請考慮下列會定義 control_flow_agent 類別的範例。 control_flow_agent 類別作用於三個訊息緩衝區:一個輸入緩衝區和兩個輸出緩衝區。 run 方法會在迴圈中從來源訊息緩衝區讀取,並使用條件陳述式來導向程式執行流程。 如果是非零的負值,代理程式會遞增一個計數值,如果是非零的正值,則會遞增另一個計數器。 在代理程式收到零的 Sentinel 值之後,它會將計數器的值傳送至輸出訊息緩衝區。 negatives 和 positives 方法可讓應用程式從代理程式讀取負值和正值的計數。
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
雖然這個範例在代理程式中使用控制流程的基本用法,但是它示範控制流程架構程式設計的序列本質。 每則訊息都必須循序處理,即使在輸入訊息緩衝區中有多個訊息也一樣。 資料流程模型可讓條件陳述式的分支同時評估。 資料流程模型也可讓您建立更複雜的訊息網路,在資料可用時作用於資料。
[上方]
建立基本的資料流程代理程式
本節說明如何將 control_flow_agent 類別轉換為使用資料流程模型,執行相同的工作。
資料流程代理程式的運作方式是建立訊息緩衝區網路,每個訊息緩衝區都會做為特定用途。 某些訊息區塊使用篩選函式,根據訊息裝載來接受或拒絕訊息。 篩選函式可確保訊息區塊只接收特定值。
若要將控制流程代理程式轉換為資料流程代理程式
將 control_flow_agent 類別的主體複製到另一個類別,例如 dataflow_agent。 或者,您可以重新命名 control_flow_agent 類別。
從 run 方法中移除會呼叫 receive 的迴圈主體。
void run() { // Counts the number of negative and positive values that // the agent receives. size_t negative_count = 0; size_t positive_count = 0; // Write the counts to the message buffers. send(_negatives, negative_count); send(_positives, positive_count); // Set the agent to the completed state. done(); }
在 run 方法中,於變數 negative_count 和 positive_count 的初始化之後,加入 countdown_event 物件,追蹤使用中作業的計數。
// Tracks the count of active operations. countdown_event active; // An event that is set by the sentinel. event received_sentinel;
本主題稍後會說明 countdown_event 類別。
建立會參與資料流程網路的訊息緩衝區物件。
// // Create the members of the dataflow network. // // Increments the active counter. transformer<int, int> increment_active( [&active](int value) -> int { active.add_count(); return value; }); // Increments the count of negative values. call<int> negatives( [&](int value) { ++negative_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value < 0; }); // Increments the count of positive values. call<int> positives( [&](int value) { ++positive_count; // Decrement the active counter. active.signal(); }, [](int value) -> bool { return value > 0; }); // Receives only the sentinel value of 0. call<int> sentinel( [&](int value) { // Decrement the active counter. active.signal(); // Set the sentinel event. received_sentinel.set(); }, [](int value) { return value == 0; }); // Connects the _source message buffer to the rest of the network. unbounded_buffer<int> connector;
連接訊息緩衝區以形成網路。
// // Connect the network. // // Connect the internal nodes of the network. connector.link_target(&negatives); connector.link_target(&positives); connector.link_target(&sentinel); increment_active.link_target(&connector); // Connect the _source buffer to the internal network to // begin data flow. _source.link_target(&increment_active);
等候 event 和 countdown event 物件設定。 這些事件表示代理程式已收到 Sentinel 值,而且所有作業都已完成。
// Wait for the sentinel event and for all operations to finish. received_sentinel.wait(); active.wait();
下圖顯示 dataflow_agent 類別的完整資料流程網路:
下表說明網路成員。
成員 |
說明 |
---|---|
increment_active |
concurrency::transformer 物件,會遞增使用中事件計數器,並將輸入值傳遞給網路的其餘部分。 |
negatives, positives |
concurrency::call 物件,會遞增數字計數,並遞減使用中事件計數器。 每個這些物件都會使用篩選條件以接受負數或正數。 |
sentinel |
concurrency::call 物件,只會接受零的 Sentinel 值,並遞減使用中事件計數器。 |
connector |
concurrency::unbounded_buffer 物件,會將來源訊息緩衝區連接至內部網路。 |
因為 run 方法是在個別執行緒上呼叫的,所以其他執行緒可以在網路完全連接之前傳送訊息給網路。 _source 資料成員是 unbounded_buffer 物件,會緩衝從應用程式傳送至代理程式的所有輸入。 為了確定網路處理所有輸入訊息,代理程式會先連結網路的內部節點,然後將該網路前端 (connector) 連結至 _source 資料成員。 這會確保不會在網路形成過程中處理訊息。
因為在這個範例中網路是以資料流程 (而不是控制流程) 為基礎,網路必須向代理程式表示它已完成處理每個輸入值,以及 Sentinel 節點已收到其值。 這個範例使用 countdown_event 物件以表示所有輸入值都已處理,使用 concurrency::event 物件以表示 Sentinel 節點收到其值。 countdown_event 類別使用 event 物件,在計數器值達到零時發出訊號。 每次資料流程網路前端收到值時,它會遞增計數器。 網路的每個終端節點在處理輸入值之後,它會遞減計數器。 在代理程式形成資料流程網路之後,它會等候 Sentinel 節點設定 event 物件,以及等候 countdown_event 物件表示其計數器已達到零。
下列範例顯示 control_flow_agent、dataflow_agent 和 countdown_event 類別。 wmain 函式會建立 control_flow_agent 和 dataflow_agent 物件,並使用 send_values 函式將一系列的隨機值傳送給代理程式。
// dataflow-agent.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>
using namespace concurrency;
using namespace std;
// A basic agent that uses control-flow to regulate the order of program
// execution. This agent reads numbers from a message buffer and counts the
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
explicit control_flow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Read from the source buffer until we receive
// the sentinel value of 0.
int value = 0;
while ((value = receive(_source)) != 0)
{
// Send negative values to the first target and
// non-negative values to the second target.
if (value < 0)
++negative_count;
else
++positive_count;
}
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
_event.set();
}
// Decrements the event counter.
void signal() {
if(InterlockedDecrement(&_current) == 0L) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(InterlockedIncrement(&_current) == 1L) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// A basic agent that resembles control_flow_agent, but uses uses dataflow to
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
dataflow_agent(ISource<int>& source)
: _source(source)
{
}
// Retrieves the count of negative numbers that the agent received.
size_t negatives()
{
return receive(_negatives);
}
// Retrieves the count of positive numbers that the agent received.
size_t positives()
{
return receive(_positives);
}
protected:
void run()
{
// Counts the number of negative and positive values that
// the agent receives.
size_t negative_count = 0;
size_t positive_count = 0;
// Tracks the count of active operations.
countdown_event active;
// An event that is set by the sentinel.
event received_sentinel;
//
// Create the members of the dataflow network.
//
// Increments the active counter.
transformer<int, int> increment_active(
[&active](int value) -> int {
active.add_count();
return value;
});
// Increments the count of negative values.
call<int> negatives(
[&](int value) {
++negative_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value < 0;
});
// Increments the count of positive values.
call<int> positives(
[&](int value) {
++positive_count;
// Decrement the active counter.
active.signal();
},
[](int value) -> bool {
return value > 0;
});
// Receives only the sentinel value of 0.
call<int> sentinel(
[&](int value) {
// Decrement the active counter.
active.signal();
// Set the sentinel event.
received_sentinel.set();
},
[](int value) {
return value == 0;
});
// Connects the _source message buffer to the rest of the network.
unbounded_buffer<int> connector;
//
// Connect the network.
//
// Connect the internal nodes of the network.
connector.link_target(&negatives);
connector.link_target(&positives);
connector.link_target(&sentinel);
increment_active.link_target(&connector);
// Connect the _source buffer to the internal network to
// begin data flow.
_source.link_target(&increment_active);
// Wait for the sentinel event and for all operations to finish.
received_sentinel.wait();
active.wait();
// Write the counts to the message buffers.
send(_negatives, negative_count);
send(_positives, positive_count);
// Set the agent to the completed state.
done();
}
private:
// Source message buffer to read from.
ISource<int>& _source;
// Holds the number of negative and positive numbers that the agent receives.
single_assignment<size_t> _negatives;
single_assignment<size_t> _positives;
};
// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
// Send a series of random numbers to the source buffer.
mt19937 rnd(42);
for (size_t i = 0; i < count; ++i)
{
// Generate a random number that is not equal to the sentinel value.
int n;
while ((n = rnd()) == sentinel);
send(source, n);
}
// Send the sentinel value.
send(source, sentinel);
}
int wmain()
{
// Signals to the agent that there are no more values to process.
const int sentinel = 0;
// The number of samples to send to each agent.
const size_t count = 1000000;
// The source buffer that the application writes numbers to and
// the agents read numbers from.
unbounded_buffer<int> source;
//
// Use a control-flow agent to process a series of random numbers.
//
wcout << L"Control-flow agent:" << endl;
// Create and start the agent.
control_flow_agent cf_agent(source);
cf_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&cf_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << cf_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << cf_agent.positives()
<< L" positive numbers."<< endl;
//
// Perform the same task, but this time with a dataflow agent.
//
wcout << L"Dataflow agent:" << endl;
// Create and start the agent.
dataflow_agent df_agent(source);
df_agent.start();
// Send values to the agent.
send_values(source, sentinel, count);
// Wait for the agent to finish.
agent::wait(&df_agent);
// Print the count of negative and positive numbers.
wcout << L"There are " << df_agent.negatives()
<< L" negative numbers."<< endl;
wcout << L"There are " << df_agent.positives()
<< L" positive numbers."<< endl;
}
這個範例 (Example) 會產生下列範例 (Sample) 輸出:
編譯程式碼
請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 dataflow-agent.cpp 的檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。
cl.exe /EHsc dataflow-agent.cpp
[上方]
建立訊息記錄代理程式
下列範例顯示 log_agent 類別,這個類別與 dataflow_agent 類別類似。 log_agent 類別實作非同步記錄代理程式,會將記錄訊息寫入檔案和主控台。 log_agent 類別可讓應用程式將訊息分類為資訊、警告或錯誤。 它也會讓應用程式指定每個記錄分類要寫入檔案、主控台或兩者。 這個範例會將所有記錄訊息寫入檔案,而且只將錯誤訊息寫入主控台。
// log-filter.cpp
// compile with: /EHsc
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(unsigned int count = 0L)
: _current(static_cast<long>(count))
{
// Set the event if the initial count is zero.
if (_current == 0L)
{
_event.set();
}
}
// Decrements the event counter.
void signal()
{
if(InterlockedDecrement(&_current) == 0L)
{
_event.set();
}
}
// Increments the event counter.
void add_count()
{
if(InterlockedIncrement(&_current) == 1L)
{
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait()
{
_event.wait();
}
private:
// The current count.
volatile long _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
// Defines message types for the logger.
enum log_message_type
{
log_info = 0x1,
log_warning = 0x2,
log_error = 0x4,
};
// An asynchronous logging agent that writes log messages to
// file and to the console.
class log_agent : public agent
{
// Holds a message string and its logging type.
struct log_message
{
wstring message;
log_message_type type;
};
public:
log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
: _file(file_path)
, _file_messages(file_messages)
, _console_messages(console_messages)
, _active(0)
{
if (_file.bad())
{
throw invalid_argument("Unable to open log file.");
}
}
// Writes the provided message to the log.
void log(const wstring& message, log_message_type type)
{
// Increment the active message count.
_active.add_count();
// Send the message to the network.
log_message msg = { message, type };
send(_log_buffer, msg);
}
void close()
{
// Signal that the agent is now closed.
_closed.set();
}
protected:
void run()
{
//
// Create the dataflow network.
//
// Writes a log message to file.
call<log_message> writer([this](log_message msg)
{
if ((msg.type & _file_messages) != 0)
{
// Write the message to the file.
write_to_stream(msg, _file);
}
if ((msg.type & _console_messages) != 0)
{
// Write the message to the console.
write_to_stream(msg, wcout);
}
// Decrement the active counter.
_active.signal();
});
// Connect _log_buffer to the internal network to begin data flow.
_log_buffer.link_target(&writer);
// Wait for the closed event to be signaled.
_closed.wait();
// Wait for all messages to be processed.
_active.wait();
// Close the log file and flush the console.
_file.close();
wcout.flush();
// Set the agent to the completed state.
done();
}
private:
// Writes a logging message to the specified output stream.
void write_to_stream(const log_message& msg, wostream& stream)
{
// Write the message to the stream.
wstringstream ss;
switch (msg.type)
{
case log_info:
ss << L"info: ";
break;
case log_warning:
ss << L"warning: ";
break;
case log_error:
ss << L"error: ";
}
ss << msg.message << endl;
stream << ss.str();
}
private:
// The file stream to write messages to.
wofstream _file;
// The log message types that are written to file.
log_message_type _file_messages;
// The log message types that are written to the console.
log_message_type _console_messages;
// The head of the network. Propagates logging messages
// to the rest of the network.
unbounded_buffer<log_message> _log_buffer;
// Counts the number of active messages in the network.
countdown_event _active;
// Signals that the agent has been closed.
event _closed;
};
int wmain()
{
// Union of all log message types.
log_message_type log_all = log_message_type(log_info | log_warning | log_error);
// Create a logging agent that writes all log messages to file and error
// messages to the console.
log_agent logger(L"log.txt", log_all, log_error);
// Start the agent.
logger.start();
// Log a few messages.
logger.log(L"===Logging started.===", log_info);
logger.log(L"This is a sample warning message.", log_warning);
logger.log(L"This is a sample error message.", log_error);
logger.log(L"===Logging finished.===", log_info);
// Close the logger and wait for the agent to finish.
logger.close();
agent::wait(&logger);
}
這個範例會將下列輸出寫入主控台。
這個範例也會產生 log.txt 檔案,包含下列文字。
編譯程式碼
請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 log-filter.cpp 的檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。
cl.exe /EHsc log-filter.cpp
[上方]