次の方法で共有


非同期メッセージ ブロック

エージェント ライブラリには、アプリケーション コンポーネント間でメッセージをスレッド セーフに伝達できるメッセージ ブロックの型がいくつか用意されています。 通常、これらのメッセージ ブロックの型は、concurrency::sendconcurrency::asendconcurrency::receiveconcurrency::try_receive などの各種メッセージ パッシング ルーチンと共に使用します。 エージェント ライブラリで定義されているメッセージ パッシング ルーチンの詳細については、「メッセージ パッシング関数」を参照してください。

セクション

このトピックは、次のセクションで構成されています。

ソースとターゲット

ソースとターゲットは、メッセージ パッシングの 2 つの重要な参加要素です。 "ソース" は、メッセージを送信する通信エンドポイントを指します。 "ターゲット" は、メッセージを受信する通信エンドポイントを指します。 ソースは読み取るエンドポイントであり、ターゲットは書き込むエンドポイントであると考えることができます。 アプリケーションは、ソースとターゲットを接続して "メッセージング ネットワーク" を形成します。

エージェント ライブラリでは、concurrency::ISourceconcurrency::ITarget の 2 つの抽象クラスを使用して、ソースとターゲットを表します。 ソースとして機能するメッセージ ブロックの型は ISource から派生します。ターゲットとして機能するメッセージ ブロックの型は ITarget から派生します。 ソースおよびターゲットとして機能するメッセージ ブロックの型は、それぞれ ISource および ITarget から派生します。

[トップ]

メッセージ伝達

"メッセージの伝達" とは、コンポーネント間でのメッセージの送信処理のことです。 メッセージ ブロックにメッセージが提供されると、そのメッセージ ブロックはメッセージを受け入れるか、拒否するか、または延期します。 各メッセージ ブロックの型は、さまざまな方法でメッセージを格納および送信します。 たとえば、unbounded_buffer クラスはメッセージを無制限に格納し、overwrite_buffer クラスは一度に 1 つのメッセージを格納し、transformer クラスは各メッセージの変更後のバージョンを格納します。 これらのメッセージ ブロックの型については、このドキュメントの後半で詳しく説明します。

メッセージ ブロックは、メッセージを受け入れるとき、必要に応じて処理を実行できます。メッセージ ブロックがソースである場合は、結果のメッセージをネットワークの別のメンバーに渡します。 メッセージ ブロックは、フィルター関数を使用して、不要なメッセージを拒否できます。 フィルターについては、このトピックの「メッセージのフィルター処理」で後ほど詳しく説明します。 メッセージを延期するメッセージ ブロックは、そのメッセージを予約しておいて後で処理できます。 メッセージの予約については、このトピックの「メッセージの予約」で後ほど詳しく説明します。

エージェント ライブラリを使用すると、メッセージ ブロックによってメッセージを非同期的に渡すことも、同期的に渡すこともできます。 send 関数などを使用してメッセージを同期的にメッセージ ブロックに渡すと、ターゲット ブロックがそのメッセージを受け入れるか拒否するまで、ランタイムは現在のコンテキストをブロックします。 asend 関数などを使用してメッセージを非同期的にメッセージ ブロックに渡すと、ランタイムはそのメッセージをターゲットに提供します。ターゲットがそのメッセージを受け入れると、ランタイムはメッセージを受信側に伝達する非同期タスクをスケジュールします。 ランタイムは、軽量タスクを使用して、メッセージを協調的に伝達します。 軽量タスクの詳細については、タスク スケジューラに関する記事を参照してください。

アプリケーションは、ソースとターゲットを接続して "メッセージング ネットワーク" を形成します。 通常、ネットワークをリンクし、send または asend を呼び出してデータをネットワークに渡します。 ソース メッセージ ブロックをターゲットに接続するには、concurrency::ISource::link_target メソッドを呼び出します。 ソース ブロックをターゲットから接続解除するには、concurrency::ISource::unlink_target メソッドを呼び出します。 ソース ブロックをすべてのターゲットから接続解除するには、concurrency::ISource::unlink_targets メソッドを呼び出します。 定義済みのメッセージ ブロックの型のいずれかがスコープから外れるか、破棄されると、自動的にすべてのターゲット ブロックから接続解除されます。 メッセージ ブロックの型の中には、書き込み先として使用できるターゲットの最大数が制限されているものもあります。 次のセクションでは、定義済みのメッセージ ブロックの型に適用される制限事項について説明します。

[トップ]

メッセージ ブロックの種類の概要

次の表では、重要なメッセージ ブロックの型の役割について簡単に説明しています。

unbounded_buffer
メッセージのキューを格納します。

overwrite_buffer
複数回の書き込みと読み取りを行うことができるメッセージを 1 つ格納します。

single_assignment
1 回の書き込みと複数回の読み取りを行うことができるメッセージを 1 つ格納します。

call
メッセージを受信するときに処理を実行します。

transformer
データを受け取り、その処理の結果を別のターゲット ブロックに送信するときに処理を実行します。 transformer クラスでは、異なる入力と出力の種類を操作できます。

choice
一連のソースから最初の使用可能なメッセージを選択します。

join と multitype join
一連のソースからすべてのメッセージを受信するまで待機し、別のメッセージ ブロックのために、複数のメッセージを結合して 1 つのメッセージにします。

タイマー
メッセージを定期的にターゲット ブロックに送信します。

これらのメッセージ ブロックの型には、それぞれ異なる状況で役立つ特性があります。 その特性のいくつかを次に示します。

  • "伝達の種類" : メッセージ ブロックがデータのソースとして機能するか、データの受信側として機能するか、またはその両方として機能するか。

  • "メッセージの順序付け" : メッセージ ブロックで、メッセージが送受信されたときの元の順序を保持するかどうか。 定義済みのメッセージ ブロックの型では、メッセージが送受信されたときの元の順序が保持されます。

  • "ソース カウント" : メッセージ ブロックで読み取ることができるソースの最大数。

  • "ターゲット カウント" : メッセージ ブロックで書き込むことができるターゲットの最大数。

これらの特性とさまざまなメッセージ ブロックの型の関係を次の表に示します。

メッセージ ブロックの型 伝達の種類 (ソース、ターゲット、または両方) メッセージの順序付け (順序ありまたは順序なし) ソース カウント ターゲット カウント
unbounded_buffer 両方 発行済 制限なし 制限なし
overwrite_buffer 両方 発行済 制限なし 制限なし
single_assignment 両方 発行済 制限なし 制限なし
call ターゲット 発行済 制限なし 適用外
transformer 両方 発行済 制限なし 1
choice 両方 発行済 10 1
join 両方 発行済 制限なし 1
multitype_join 両方 発行済 10 1
timer ソース 適用外 適用外 1

以下のセクションでは、メッセージ ブロックの型について詳しく説明します。

[トップ]

unbounded_buffer クラス

concurrency::unbounded_buffer クラスは、汎用の非同期メッセージング構造体を表します。 このクラスでは、複数のソースが書き込むことができるメッセージ、または複数のターゲットが読み取ることができるメッセージの先入れ先出し (FIFO) のキューを格納します。 ターゲットが unbounded_buffer オブジェクトからメッセージを受信すると、そのメッセージはメッセージ キューから削除されます。 そのため、unbounded_buffer オブジェクトには複数のターゲットを設定できますが、各メッセージを受信するターゲットは 1 つだけです。 unbounded_buffer クラスは、複数のメッセージを別のコンポーネントに渡し、そのコンポーネントで各メッセージを受信する必要がある場合に便利です。

次の例では、unbounded_buffer クラスの使用方法の基本的な構造を示します。 この例では、3 つの値を unbounded_buffer オブジェクトに送信し、同じオブジェクトからそれらの値を再び読み取ります。

// unbounded_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that works with
   // int data.
   unbounded_buffer<int> items;

   // Send a few items to the unbounded_buffer object.
   send(items, 33);
   send(items, 44);
   send(items, 55);

   // Read the items from the unbounded_buffer object and print
   // them to the console.
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
}

この例を実行すると、次の出力が生成されます。

334455

unbounded_buffer クラスの使用方法を示す完全な例については、「方法: さまざまなプロデューサー/コンシューマー パターンを実装する」を参照してください。

[トップ]

overwrite_buffer クラス

concurrency::overwrite_buffer クラスは unbounded_buffer クラスに似ていますが、overwrite_buffer オブジェクトに格納されるメッセージが 1 つだけである点が異なります。 また、ターゲットが overwrite_buffer オブジェクトからメッセージを受信しても、そのメッセージはバッファーから削除されません。 そのため、複数のターゲットがメッセージのコピーを受信します。

overwrite_buffer クラスは、複数のメッセージを別のコンポーネントに渡すときに、そのコンポーネントで必要になるのが最新の値のみである場合に便利です。 また、このクラスは、メッセージを複数のコンポーネントにブロードキャストする場合にも便利です。

次の例では、overwrite_buffer クラスの使用方法の基本的な構造を示します。 この例では、3 つの値を overwrite _buffer オブジェクトに送信し、同じオブジェクトから現在の値を 3 回読み取ります。 この例は unbounded_buffer クラスの例に似ています。 ただし、overwrite_buffer クラスに格納されるメッセージは 1 つだけです。 また、メッセージが読み取られた後も、overwrite_buffer オブジェクトからそのメッセージは削除されません。

// overwrite_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an overwrite_buffer object that works with
   // int data.
   overwrite_buffer<int> item;

   // Send a few items to the overwrite_buffer object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the overwrite_buffer object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

この例を実行すると、次の出力が生成されます。

555555

overwrite_buffer クラスの使用方法を示す完全な例については、「方法: さまざまなプロデューサー/コンシューマー パターンを実装する」を参照してください。

[トップ]

single_assignment クラス

concurrency::single_assignment クラスは overwrite_buffer クラスに似ていますが、single_assignment オブジェクトに 1 回しか書き込むことができない点が異なります。 overwrite_buffer クラスと同様に、ターゲットが single_assignment オブジェクトからメッセージを受信しても、そのメッセージはそのオブジェクトから削除されません。 そのため、複数のターゲットがメッセージのコピーを受信します。 また、single_assignment クラスは、1 つのメッセージを複数のコンポーネントにブロードキャストする場合にも便利です。

次の例では、single_assignment クラスの使用方法の基本的な構造を示します。 この例では、3 つの値を single_assignment オブジェクトに送信し、同じオブジェクトから現在の値を 3 回読み取ります。 この例は overwrite_buffer クラスの例に似ています。 overwrite_buffer クラスおよび single_assignment クラスに格納されるメッセージはどちらの場合も 1 つだけですが、single_assignment クラスには 1 回しか書き込むことができません。

// single_assignment-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an single_assignment object that works with
   // int data.
   single_assignment<int> item;

   // Send a few items to the single_assignment object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the single_assignment object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

この例を実行すると、次の出力が生成されます。

333333

single_assignment クラスの使用方法を示す完全な例については、「チュートリアル: フューチャの実装」を参照してください。

[トップ]

call クラス

concurrency::call クラスは、データを受信するときに処理関数を実行するメッセージの受信側として機能します。 この処理関数には、ラムダ式、関数オブジェクト、または関数ポインターを使用できます。 call オブジェクトは、メッセージを送信する他のコンポーネントに対して並列に動作するため、通常の関数呼び出しとは動作が異なります。 call オブジェクトがメッセージを受信したときに処理を実行していると、そのメッセージはキューに追加されます。 各 call オブジェクトでは、キューに配置されたメッセージを受信した順序で処理します。

次の例では、call クラスの使用方法の基本的な構造を示します。 この例では、受信した各値をコンソールに出力する call オブジェクトを作成します。 その後、3 つの値を call オブジェクトに送信します。 また、call オブジェクトはメッセージを個別のスレッドで処理するため、この例ではカウンター変数と event オブジェクトを使用して、wmain 関数から制御が戻る前に call オブジェクトがすべてのメッセージを処理できるようにしています。

// call-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // An event that is set when the call object receives all values.
   event received_all;

   // Counts the 
   long receive_count = 0L;
   long max_receive_count = 3L;

   // Create an call object that works with int data.
   call<int> target([&received_all,&receive_count,max_receive_count](int n) {
      // Print the value that the call object receives to the console.
      wcout << n << endl;
      
      // Set the event when all messages have been processed.
      if (++receive_count == max_receive_count)
         received_all.set();
   });

   // Send a few items to the call object.
   send(target, 33);
   send(target, 44);
   send(target, 55);

   // Wait for the call object to process all items.
   received_all.wait();
}

この例を実行すると、次の出力が生成されます。

334455

call クラスの使用方法を示す完全な例については、「方法: call クラスおよび transformer クラスに処理関数を提供する」を参照してください。

[トップ]

transformer クラス

concurrency::transformer クラスは、メッセージの受信側と送信側の両方として機能します。 transformer クラスは、データを受信するとユーザー定義の処理関数を実行するため、call クラスに似ています。 ただし、transformer クラスも処理関数の結果を受信側のオブジェクトに送信します。 call オブジェクトと同様に、transformer オブジェクトはメッセージを送信する他のコンポーネントに対して並列に動作します。 transformer オブジェクトがメッセージを受信したときに処理を実行していると、そのメッセージはキューに追加されます。 各 transformer オブジェクトでは、キューに配置されたメッセージを受信した順序で処理します。

transformer クラスでは、メッセージを 1 つのターゲットに送信します。 コンストラクターの _PTarget パラメーターを NULL に設定した場合は、concurrency::link_target メソッドを呼び出すことで、後からターゲットを指定できます。

エージェント ライブラリに用意されている他のすべての非同期メッセージ ブロックの型とは異なり、transformer クラスでは異なる入力と出力の種類を操作できます。 データをある型から別の型に変換するこの機能により、transformer クラスは多くの同時実行ネットワークで重要なコンポーネントとなっています。 また、transformer オブジェクトの処理関数により詳細な並列機能を追加できます。

次の例では、transformer クラスの使用方法の基本的な構造を示します。 この例では、transformer 値の出力を生成するために、int の各入力値に 0.33 を乗算する double オブジェクトを作成します。 その後、変換後の値を同じ transformer オブジェクトから受け取り、それらをコンソールに出力します。

// transformer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an transformer object that receives int data and 
   // sends double data.
   transformer<int, double> third([](int n) {
      // Return one-third of the input value.
      return n * 0.33;
   });

   // Send a few items to the transformer object.
   send(third, 33);
   send(third, 44);
   send(third, 55);

   // Read the processed items from the transformer object and print
   // them to the console.
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
}

この例を実行すると、次の出力が生成されます。

10.8914.5218.15

transformer クラスの使用方法を示す完全な例については、「方法: データ パイプラインでトランスフォーマーを使用する」を参照してください。

[トップ]

choice クラス

concurrency::choice クラスでは、一連のソースから最初の使用可能なメッセージを選択します。 choice クラスは、データフロー機構ではなく、制御フロー機構を表します (データフローと制御フローの違いについては、「非同期エージェント ライブラリ」を参照してください)。

choice オブジェクトからの読み取りは、WaitForMultipleObjects パラメーターが bWaitAll に設定されている場合の Windows API 関数 FALSE の呼び出しに似ています。 ただし、choice クラスでは、外部同期オブジェクトではなく、イベント自体にデータをバインドします。

通常、アプリケーションで制御フローを操作するには、concurrency::receive 関数と共に choice クラスを使用します。 さまざまな型のメッセージ バッファーの中から選択する必要がある場合は、choice クラスを使用します。 同じ型のメッセージ バッファーの中から選択する必要がある場合は、single_assignment クラスを使用します。

ソースを choice オブジェクトにリンクさせる順序によって選択するメッセージを決定できるため、その順序は重要です。 たとえば、既にメッセージが格納されている複数のメッセージ バッファーを choice オブジェクトにリンクさせるとします。 choice オブジェクトによって、リンク先の最初のソースからメッセージが選択されます。 すべてのソースをリンクさせると、choice オブジェクトでは各ソースがメッセージを受信した順序を保持します。

次の例では、choice クラスの使用方法の基本的な構造を示します。 この例では、concurrency::make_choice 関数を使用して、3 つのメッセージ ブロックのいずれかを選択する choice オブジェクトを作成します。 次に、さまざまなフィボナッチ数列を計算し、それぞれの結果を別個のメッセージ ブロックに格納します。 その後、最初に終了した操作に基づくメッセージをコンソールに出力します。

// choice-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Although the following thee message blocks are written to one time only, 
   // this example illustrates the fact that the choice class works with 
   // different message block types.

   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   overwrite_buffer<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   unbounded_buffer<double> half_of_fib42;   

   // Create a choice object that selects the first single_assignment 
   // object that receives a value.
   auto select_one = make_choice(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   // Print a message that is based on the operation that finished first.
   switch (receive(select_one))
   {
   case 0:
      wcout << L"fib35 received its value first. Result = " 
            << receive(fib35) << endl;
      break;
   case 1:
      wcout << L"fib37 received its value first. Result = " 
            << receive(fib37) << endl;
      break;
   case 2:
      wcout << L"half_of_fib42 received its value first. Result = " 
            << receive(half_of_fib42) << endl;
      break;
   default:
      wcout << L"Unexpected." << endl;
      break;
   }
}

この例では、次のサンプル出力が生成されます。

fib35 received its value first. Result = 9227465

35 番目のフィボナッチ数を計算するタスクが最初に終了するとは限らないため、この例の出力は変わることがあります。

この例では、concurrency::parallel_invoke アルゴリズムを使用して、フィボナッチ数列を並列に計算します。 parallel_invoke の詳細については、「並列アルゴリズム」を参照してください。

choice クラスの使用方法を示す完全な例については、「方法: 完了したタスクから選択する」を参照してください。

[トップ]

join クラスと multitype_join クラス

concurrency::join クラスおよび concurrency::multitype_join クラスを使用すると、一連のソースの各メンバーがメッセージを受信するまで待機できます。 join クラスでは、共通のメッセージ型を持つソース オブジェクトを処理します。 multitype_join クラスでは、異なるメッセージ型を持つことができるソース オブジェクトを処理します。

join オブジェクトまたは multitype_join オブジェクトからの読み取りは、WaitForMultipleObjects パラメーターが bWaitAll に設定されている場合の Windows API 関数 TRUE の呼び出しに似ています。 ただし、choice オブジェクトと同様に、join オブジェクトおよび multitype_join オブジェクトでは、外部同期オブジェクトではなく、イベント自体にデータをバインドするイベント機構を使用します。

join オブジェクトから読み取ると、std::vector オブジェクトが生成されます。 multitype_join オブジェクトから読み取ると、std::tuple オブジェクトが生成されます。 対応するソース バッファーと同じ順序でこれらのオブジェクトに出現する要素は、join オブジェクトまたは multitype_join オブジェクトにリンクされます。 ソース バッファーを join オブジェクトまたは multitype_join オブジェクトにリンクする順序は、結果の vector オブジェクトまたは tuple オブジェクトの要素の順序に関連するため、既存のソース バッファーを結合からリンク解除しないことをお勧めします。 そのようにすると、未定義の動作が発生する可能性があります。

最長一致の結合と最短一致の結合

join クラスおよび multitype_join クラスでは、最長一致の結合と最短一致の結合の概念をサポートしています。 "最長一致の結合" では、各ソースから使用可能になったメッセージを受け入れ、すべてのメッセージが使用可能になるまでこれを続けます。 "最短一致の結合" では、2 つの段階でメッセージを受信します。 まず、各ソースからメッセージが提供されるまで待機します。 次に、すべてのソースのメッセージが使用可能になったら、各メッセージの予約を試みます。 各メッセージを予約できる場合、すべてのメッセージを処理し、ターゲットに伝達します。 それ以外の場合、メッセージの予約を解除し (取り消し)、再び各ソースがメッセージを受信するまで待機します。

最長一致の結合はメッセージを即座に受け入れるため、最短一致の結合よりもパフォーマンスが優れています。 ただし、最長一致の結合では、デッドロックが発生する可能性がまれにあります。 1 つ以上の共有されたソース オブジェクトを含む複数の結合がある場合は、最短一致の結合を使用します。

次の例では、join クラスの使用方法の基本的な構造を示します。 この例では、concurrency::make_join 関数を使用して、3 つの single_assignment オブジェクトから受信する join オブジェクトを作成します。 この例では、さまざまなフィボナッチ数列を計算し、それぞれの結果を別個の single_assignment オブジェクトに格納します。その後、join オブジェクトが保持している各結果をコンソールに出力します。 この例は choice クラスの例に似ています。ただし、join クラスでは、すべてのソース メッセージ ブロックがメッセージを受信するまで待機します。

// join-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   single_assignment<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   single_assignment<double> half_of_fib42;   

   // Create a join object that selects the values from each of the
   // single_assignment objects.
   auto join_all = make_join(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   auto result = receive(join_all);
   wcout << L"fib35 = " << get<0>(result) << endl;
   wcout << L"fib37 = " << get<1>(result) << endl;
   wcout << L"half_of_fib42 = " << get<2>(result) << endl;
}

この例を実行すると、次の出力が生成されます。

fib35 = 9227465fib37 = 24157817half_of_fib42 = 1.33957e+008

この例では、concurrency::parallel_invoke アルゴリズムを使用して、フィボナッチ数列を並列に計算します。 parallel_invoke の詳細については、「並列アルゴリズム」を参照してください。

join クラスの使用方法を示す完全な例については、「方法: 完了したタスクから選択する」および「チュートリアル: join を使用したデッドロックの防止」を参照してください。

[トップ]

timer クラス

concurrency::timer クラスは、メッセージ ソースとして機能します。 timer オブジェクトでは、指定された時間が経過するとメッセージをターゲットに送信します。 timer クラスは、メッセージの送信を遅延させる必要がある場合や、定期的にメッセージを送信する場合に便利です。

timer クラスでは、メッセージを 1 つのターゲットにのみ送信します。 コンストラクターの _PTarget パラメーターを NULL に設定した場合は、concurrency::ISource::link_target メソッドを呼び出すことで、後からターゲットを指定できます。

timer オブジェクトには、繰り返しと非繰り返しがあります。 繰り返しタイマーを作成するには、コンストラクターを呼び出すときに _Repeating パラメーターに true を渡します。 また、非繰り返しタイマーを作成するには、_Repeating パラメーターに false を渡します。 タイマーが繰り返しの場合、一定の間隔で同じメッセージをターゲットに送信します。

エージェント ライブラリによって、開始されていない状態の timer オブジェクトが作成されます。 タイマー オブジェクトを開始するには、concurrency::timer::start メソッドを呼び出します。 timer オブジェクトを停止するには、そのオブジェクトを破棄するか、concurrency::timer::stop メソッドを呼び出します。 繰り返しタイマーを一時停止するには、concurrency::timer::pause メソッドを呼び出します。

次の例では、timer クラスの使用方法の基本的な構造を示します。 この例では、timer オブジェクトと call オブジェクトを使用して、時間のかかる操作の進行状況を報告します。

// timer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Create a call object that prints characters that it receives 
   // to the console.
   call<wchar_t> print_character([](wchar_t c) {
      wcout << c;
   });

   // Create a timer object that sends the period (.) character to 
   // the call object every 100 milliseconds.
   timer<wchar_t> progress_timer(100u, L'.', &print_character, true);

   // Start the timer.
   wcout << L"Computing fib(42)";
   progress_timer.start();

   // Compute the 42nd Fibonacci number.
   int fib42 = fibonacci(42);

   // Stop the timer and print the result.
   progress_timer.stop();
   wcout << endl << L"result is " << fib42 << endl;
}

この例では、次のサンプル出力が生成されます。

Computing fib(42)..................................................result is 267914296

timer クラスの使用方法を示す完全な例については、「方法: メッセージを定期的に送信する」を参照してください。

[トップ]

メッセージのフィルター処理

メッセージ ブロック オブジェクトを作成する場合、メッセージ ブロックがメッセージを受け入れるか拒否するかを判断する "フィルター関数" を指定できます。 フィルター関数は、メッセージ ブロックが特定の値のみを受信するようにする便利な方法です。

次の例では、フィルター関数を使用して偶数のみを受け入れる unbounded_buffer オブジェクトを作成する方法を示します。 unbounded_buffer オブジェクトは奇数を拒否するため、奇数はターゲット ブロックに伝達されません。

// filter-function.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that uses a filter
   // function to accept only even numbers.
   unbounded_buffer<int> accept_evens(
      [](int n) {
         return (n%2) == 0;
      });

   // Send a few values to the unbounded_buffer object.
   unsigned int accept_count = 0;
   for (int i = 0; i < 10; ++i)
   {
      // The asend function returns true only if the target
      // accepts the message. This enables us to determine
      // how many elements are stored in the unbounded_buffer
      // object.
      if (asend(accept_evens, i))
      {
         ++accept_count;
      }
   }

   // Print to the console each value that is stored in the 
   // unbounded_buffer object. The unbounded_buffer object should
   // contain only even numbers.
   while (accept_count > 0)
   {
      wcout << receive(accept_evens) << L' ';
      --accept_count;
   }
}

この例を実行すると、次の出力が生成されます。

0 2 4 6 8

ラムダ関数、関数ポインター、または関数オブジェクトをフィルター関数として使用できます。 各フィルター関数の形式は次のいずれかになります。

bool (T)
bool (T const &)

データの不必要なコピーをなくすには、値で伝達される集約型を扱うときに 2 番目の形式を使用します。

メッセージのフィルター処理では、"データフロー" プログラミング モデルがサポートされています。このモデルでは、コンポーネントがデータの受信時に計算を実行します。 フィルター関数を使用してメッセージ パッシング ネットワークでデータのフローを制御する例については、「方法 : メッセージ ブロック フィルターを使用する」、「チュートリアル: データフロー エージェントの作成」、および「チュートリアル: 画像処理ネットワークの作成」を参照してください。

[トップ]

メッセージ予約

"メッセージの予約" により、メッセージ ブロックはメッセージを予約して後で使用できるようにすることができます。 通常、メッセージの予約を直接使用することはありません。 ただし、メッセージの予約について理解しておくと、定義済みのいくつかのメッセージ ブロックの型の動作を理解しやすくなります。

最短一致の結合と最長一致の結合について考えてみてください。 どちらの場合も、メッセージの予約によって、後で使用できるようにメッセージを予約します。 前に述べたとおり、最短一致の結合では、2 つの段階でメッセージを受信します。 最初の段階で、最短一致の join オブジェクトは各ソースがメッセージを受信するまで待機します。 次に、各メッセージの予約を試みます。 各メッセージを予約できる場合、すべてのメッセージを処理し、ターゲットに伝達します。 それ以外の場合、メッセージの予約を解除し (取り消し)、再び各ソースがメッセージを受信するまで待機します。

最長一致の結合でも、複数のソースから入力メッセージを読み取ります。この場合、各ソースからメッセージを受信するのを待っている間、メッセージの予約を使用して追加のメッセージを読み取ります。 たとえば、メッセージ ブロック A および B からメッセージを受信する最長一致の結合を考えてみます。 B から 2 つのメッセージを受信し、A からはまだメッセージを受信していない場合、最長一致の結合は、B からの 2 番目のメッセージに対応する一意のメッセージ識別子を保存します。 A からメッセージを受信し、これらのメッセージを伝達した後、保存済みのメッセージ識別子を使用して、B からの 2 番目のメッセージがまだ有効かどうかを確認します。

独自のカスタム メッセージ ブロックの型を実装する場合は、メッセージの予約を使用できます。 カスタム メッセージ ブロックの型を作成する方法の例については、「チュートリアル: カスタム メッセージ ブロックの作成」を参照してください。

[トップ]

関連項目

非同期エージェント ライブラリ