タスク ベースの非同期パターンの利用
タスク ベースの非同期パターン (TAP) を使用して非同期操作を行うと、コールバックを使用して、ブロックすることなく待機できます。 タスクの場合、これは Task.ContinueWith などのメソッドによって行われます。 言語ベースの非同期サポートが、通常の制御フロー内での非同期操作の待機を許可することで、コールバックを隠し、コンパイラにより生成されたコードはこの同じ API レベルのサポートを提供します。
Await による実行の中断
Task オブジェクトおよび Task<TResult> オブジェクトの非同期での待機には、C# では await キーワード、Visual Basic では Await 演算子を使用できます。 Task を待っているとき、await
式は型 void
になります。 Task<TResult> を待っているとき、await
式は型 TResult
になります。 await
式は、非同期メソッドの本体内に含める必要があります。 (これらの言語機能は .NET Framework 4.5 で導入されました)。
待機機能は、隠れた状態で継続を使用してタスクにコールバックをインストールします。 このコールバックは中断ポイントから非同期メソッドを再開します。 非同期メソッドが再開され、待機していた操作が正常に完了し、Task<TResult> であった場合に、その TResult
が返されます。 待っていた Task または Task<TResult> が Canceled 状態で終わった場合、OperationCanceledException 例外がスローされます。 待っていた Task または Task<TResult> が Faulted 状態で終わった場合、エラーの原因となった例外がスローされます。 Task
は複数の例外の結果としてエラーになることがありますが、反映されるのはこれらの例外の中の 1 つのみです。 ただし、Task.Exception プロパティは、すべてのエラーが含まれる AggregateException 例外を返します。
中断時に非同期メソッドを実行していたスレッドに同期コンテキスト (SynchronizationContext オブジェクト) が関連付けられている場合 (たとえば、SynchronizationContext.Current プロパティが null
ではない場合)、非同期メソッドは、コンテキストの Post メソッドを使用して、その同じ同期コンテキストで再開します。 そのように関連付けられていない場合、中断時に実行されていたタスク スケジューラ (TaskScheduler オブジェクト) に基づきます。 通常、これは既定のタスク スケジューラ (TaskScheduler.Default) であり、スレッド プールをターゲットにします。 このタスク スケジューラによって、待機していた非同期操作が完了した場合に再開するかどうか、または再開のスケジュールを設定するかどうかが決定されます。 既定のスケジューラは、通常、待機していた操作が完了したスレッド上での実行の継続を許可します。
非同期メソッドが呼び出された場合、まだ完了していない待機可能なインスタンスの最初の await 式まで、関数の本体を同期をとって実行し、この時点で呼び出しを呼び出し元に返します。 非同期メソッドが void
を返さない場合、進行中の計算を表す Task または Task<TResult> オブジェクトが返されます。 void 以外の非同期メソッドでは、return ステートメントが検出された場合、またはメソッド本体の末尾に到達した場合、タスクが RanToCompletion の終了状態で完了します。 ハンドルされない例外により、非同期メソッドの本体から制御が離れた場合、タスクは Faulted 状態になります。 その例外が OperationCanceledException の場合、タスクは代わりに Canceled 状態で終わります。 この方法では、結果または例外が最終的に発行されます。
この動作には、いくつかの重要なバリエーションがあります。 タスクが待機されるまでに既に完了していた場合は、パフォーマンス上の理由から、制御が明け渡されず、関数が実行を継続します。 また、元のコンテキストに戻ることが必ずしも望ましい動作ではない場合は変更されることがあります。これは、次のセクションで詳しく説明します。
Yield と ConfigureAwait を使用して中断と再開を構成する
いくつかのメソッドでは、非同期メソッドの実行をより詳細に制御できます。 たとえば、Task.Yield メソッドを使用し、yield ポイントを非同期メソッドに伝えることができます。
public class Task : …
{
public static YieldAwaitable Yield();
…
}
これは、非同期にポストするか、現在のコンテキストにスケジュールを戻すことと同じです。
Task.Run(async delegate
{
for(int i=0; i<1000000; i++)
{
await Task.Yield(); // fork the continuation into a separate work item
...
}
});
Task.ConfigureAwait メソッドを使用することもできます。非同期メソッドでの中断と再開でより優れた制御機能が与えられます。 前述のとおり、既定では、現在のコンテキストは、非同期メソッドが中断されたときにキャプチャされ、そのキャプチャされたコンテキストは、再開時に非同期メソッドの継続を呼び出すために使用されます。 これは、多くの場合に求められる動作です。 その他の場合では、継続のコンテキストが重要でないこともあり、元のコンテキストへのポスト バックを回避することでパフォーマンスを向上できます。 そのためには、Task.ConfigureAwait メソッドを使用して、待機操作にコンテキストをキャプチャして再開するのではなく、待機していた非同期操作がどこで完了したかを問わず、実行を継続するように指示できます。
await someTask.ConfigureAwait(continueOnCapturedContext:false);
非同期操作の取り消し
.NET Framework 4 以降、取り消しをサポートする TAP メソッドには、取り消しトークン (CancellationToken オブジェクト) を受け取るオーバーロードが少なくとも 1 つあります。
キャンセル トークンは、キャンセル トークンのソース (CancellationTokenSource オブジェクト) によって作成されます。 ソースの Token プロパティからキャンセル トークンが返され、ソースの Cancel メソッドが呼び出されたときにこのトークンに信号が送られます。 たとえば、単一の Web ページをダウンロードする際に操作を取り消せるようにする場合は、CancellationTokenSource オブジェクトを作成し、TAP メソッドにそのオブジェクトのトークンを渡し、操作を取り消す準備ができたらソースの Cancel メソッドを呼び出します。
var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();
複数の非同期呼び出しを取り消す場合は、すべての呼び出しに同じトークンを渡すことができます。
var cts = new CancellationTokenSource();
IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));
// at some point later, potentially on another thread
…
cts.Cancel();
または、操作の一部を選択して同じトークンを渡すこともできます。
var cts = new CancellationTokenSource();
byte [] data = await DownloadDataAsync(url, cts.Token);
await SaveToDiskAsync(outputPath, data, CancellationToken.None);
… // at some point later, potentially on another thread
cts.Cancel();
重要
取り消し要求は任意のスレッドから開始することができます。
取り消しが要求されないことを示すために、キャンセル トークンを受け取るすべてのメソッドに CancellationToken.None 値を渡すことができます。 これで CancellationToken.CanBeCanceled プロパティが false
を返します。呼び出されたメソッドは適宜最適化できます。 テストのためには、トークンが既に取り消された状態か取り消し不可能な状態で開始するかどうかを示すブール値を受け取るコンストラクターを使用してインスタンス化した、事前に取り消されたキャンセル トークンを渡すこともできます。
この取り消し方法には、いくつか利点があります。
同じキャンセル トークンを、任意の数の同期操作および非同期操作に渡すことができます。
同じ取り消し要求を、任意の数のリスナーに渡すことができます。
取り消しが要求されるかどうか、およびそれがいつ実行されるかは、非同期 API の開発者が完全に制御します。
API を使用するコードにより、取り消し要求が反映される非同期呼び出しが選択され、決定されます。
進行状況の監視
一部の非同期メソッドは、非同期メソッドに渡される進行状況インターフェイスを通じて進行状況を公開します。 たとえば、テキスト文字列を非同期的にダウンロードし、その過程で、これまでに完了したダウンロードの割合を含む進行状況の更新を発生させる関数があるとします。 このようなメソッドは、Windows Presentation Foundation (WPF) アプリケーションでは次のように使用できます。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,
new Progress<int>(p => pbDownloadProgress.Value = p));
}
finally { btnDownload.IsEnabled = true; }
}
タスク ベースの組み込み連結子の使用
System.Threading.Tasks 名前空間には、タスクを構成および使用するための複数のメソッドがあります。
Task.Run
Task クラスには、Run メソッドがいくつか含まれています。このメソッドでは、次のように、Task や Task<TResult> のような作業の負荷をスレッド プールに簡単に移すことができます。
public async void button1_Click(object sender, EventArgs e)
{
textBox1.Text = await Task.Run(() =>
{
// … do compute-bound work here
return answer;
});
}
Task.Run(Func<Task>) オーバーロードなど、これらの Run メソッドの一部は TaskFactory.StartNew メソッドの短縮形として存在します。 このオーバーロードでは、次のように、負荷分散される作業内で await を使用できます。
public async void button1_Click(object sender, EventArgs e)
{
pictureBox1.Image = await Task.Run(async() =>
{
using(Bitmap bmp1 = await DownloadFirstImageAsync())
using(Bitmap bmp2 = await DownloadSecondImageAsync())
return Mashup(bmp1, bmp2);
});
}
このようなオーバーロードは論理的に、タスク並列ライブラリの Unwrap 拡張メソッドとの連動で TaskFactory.StartNew メソッドを使用することと等しくなります。
Task.FromResult
データが既に利用でき、単に Task<TResult> にリフトされたタスクを返すメソッドから返す必要があるシナリオでは、FromResult メソッドを使用します。
public Task<int> GetValueAsync(string key)
{
int cachedValue;
return TryGetCachedValue(out cachedValue) ?
Task.FromResult(cachedValue) :
GetValueAsyncInternal();
}
private async Task<int> GetValueAsyncInternal(string key)
{
…
}
Task.WhenAll
WhenAll メソッドは、タスクとして表される複数の非同期操作で非同期に待機するために使用されます。 メソッドには、一連の非ジェネリック タスクまたは不均一な一連のジェネリック タスク (複数の void を返す操作を非同期に待機したり、各値の型が異なる複数の値を返すメソッドを非同期に待機するなど) に加えて、均一な一連のジェネリック タスク (複数の TResult
を返すメソッドを非同期に待機するなど) をサポートする複数のオーバーロードが含まれます。
複数の顧客に電子メール メッセージを送信するとします。 このような場合、1 つのメッセージの送信完了を待たずに次のメッセージを送信するような、メッセージのオーバーラップ送信が可能です。 送信操作がいつ完了したか、またエラーが発生したかを確認することもできます。
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);
このコードは発生する可能性がある例外を明示的に処理せず、WhenAll の結果のタスクの await
から例外を反映します。 例外を処理するには、次のようなコードを使用できます。
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
...
}
この例では、非同期の操作が失敗した場合、すべての例外が AggregateException 例外で連結され、WhenAll メソッドから返される Task に格納されます。 ただし、これらの例外の 1 つだけが await
キーワードによって反映されます。 すべての例外を調べる場合は、上記のコードを次のように書き直します。
Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
foreach(Task faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
Web から複数のファイルを非同期にダウンロードする例を考えてみます。 この場合、すべての非同期操作の結果の型が同種で、結果に簡単にアクセスできます。
string [] pages = await Task.WhenAll(
from url in urls select DownloadStringTaskAsync(url));
前の void を返す場合で説明したのと同じ例外処理手法を使用できます。
Task<string> [] asyncOps =
(from url in urls select DownloadStringTaskAsync(url)).ToArray();
try
{
string [] pages = await Task.WhenAll(asyncOps);
...
}
catch(Exception exc)
{
foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
Task.WhenAny
WhenAny メソッドを使用して、完了するタスクとして表される複数の非同期操作の 1 つのみを非同期に待機できます。 このメソッドは、次の 4 つの主なユース ケースで役立ちます。
冗長性: 1 つの操作を複数回実行し、最初に完了したものを選択する (たとえば、1 つの結果を生成する複数の株価情報 Web サービスに問い合わせて、最も速く完了したものを選択する)。
インターリーブ: 複数の操作を起動してそのすべてが完了するのを待機するが、それらの操作を完了時に処理する。
調整: 他の操作が完了したら追加の操作を開始できるようにする。 これは、インターリーブのシナリオを拡張したものです。
初期のエスケープ: たとえば、タスク t1 が表す操作を別のタスク t2 と共に WhenAny タスクでグループにまとめ、WhenAny タスクで待機できます。 タスク t2 は、タイムアウトか取り消し、または WhenAny タスクを t1 の前に完了するようなその他のシグナルを表す場合があります。
冗長性
株式を購入するかどうかを決定するとします。 信頼できるいくつかの株式推薦 Web サービスがありますが、日常的な負荷によっては、時に応じて各サービスがかなり遅くなることがあります。 WhenAny メソッドを使用すると、いずれかの操作が完了したときに通知を受け取ることができます。
var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol),
GetBuyRecommendation2Async(symbol),
GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);
正常に完了したすべてのタスクの結果をラップせずに返す WhenAll とは異なり、WhenAny は完了したタスクを返します。 タスクが失敗した場合は、それを把握することが重要です。タスクが成功した場合は、戻り値が関連付けられているタスクを把握することが重要です。 そのため、この例で示すように、返されるタスクの結果にアクセスするか、引き続き待機する必要があります。
WhenAll と同様に、例外に対応できる必要があります。 完了したタスクを受け取るので、適切に try/catch
で囲んで、エラーが反映されて返されるタスクを待機できます。次に例を示します。
Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{
Task<bool> recommendation = await Task.WhenAny(recommendations);
try
{
if (await recommendation) BuyStock(symbol);
break;
}
catch(WebException exc)
{
recommendations.Remove(recommendation);
}
}
さらに、最初のタスクが正常に完了しても、以降のタスクが失敗する可能性があります。 この時点で、例外を処理する方法はいくつかあります。起動したすべてのタスクが完了するまで待機できます。その場合は、WhenAll メソッドを使用することも、すべての例外が重要であり、ログに記録する必要があると指定することもできます。 そのためには、継続を使用してタスクが非同期に完了した時点で通知を受け取ることができます。
foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => { if (t.IsFaulted) Log(t.Exception); });
}
または
foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}
または、以下のように指定できます。
private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
foreach(var task in tasks)
{
try { await task; }
catch(Exception exc) { Log(exc); }
}
}
…
LogCompletionIfFailed(recommendations);
最後に、残りのすべての操作を取り消すこともできます。
var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol, cts.Token),
GetBuyRecommendation2Async(symbol, cts.Token),
GetBuyRecommendation3Async(symbol, cts.Token)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);
インターリーブ
Web からイメージをダウンロードして、各イメージを処理するとします (イメージを UI コントロールに追加するなど)。 UI スレッドで順次イメージを処理しても、イメージはできるだけ同時にダウンロードしたいと考えています。 また、すべてダウンロードされるまで、UI へのイメージの追加を保留したくないと考えています。 代わりに、それらを完了時に追加したいと考えています。
List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
次のように、ダウンロードしたイメージの ThreadPool でコンピューター処理を集中して行うことが必要になるシナリオでも、インターリーブを適用できます。
List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)
.ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
調整
インターリーブの例ではユーザーが多くのイメージをダウンロードするため、このダウンロード数を絞り込み、たとえば、特定の数のダウンロードだけを同時実行する必要がある場合を考えます。 これを行うには、非同期操作のサブセットを開始します。 操作が完了したら、追加操作を開始して、完了した操作に代えて実行します。
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
初期のエスケープ
ユーザーの取り消し要求 (キャンセル ボタンがクリックされたなど) に同時に対応できるようにしながら、1 つの操作の完了を非同期に待機するとします。 このシナリオのコードを次に示します。
private CancellationTokenSource m_cts;
public void btnCancel_Click(object sender, EventArgs e)
{
if (m_cts != null) m_cts.Cancel();
}
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
if (imageDownload.IsCompleted)
{
Bitmap image = await imageDownload;
panel.AddImage(image);
}
else imageDownload.ContinueWith(t => Log(t));
}
finally { btnRun.Enabled = true; }
}
private static async Task UntilCompletionOrCancellation(
Task asyncOp, CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
using(ct.Register(() => tcs.TrySetResult(true)))
await Task.WhenAny(asyncOp, tcs.Task);
return asyncOp;
}
この実装は、エスケープを決めた直後にユーザー インターフェイスを再度使用できるようにしますが、基になる非同期操作は取り消しません。 別の方法として、エスケープを決めたときに保留中の操作を取り消すことがありますが、取り消し要求のために途中で終了する場合など、操作が完了するまでユーザー インターフェイスを再確立しません。
private CancellationTokenSource m_cts;
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
Bitmap image = await imageDownload;
panel.AddImage(image);
}
catch(OperationCanceledException) {}
finally { btnRun.Enabled = true; }
}
初期のエスケープのもう 1 つの例では、次のセクションで説明する Delay メソッドと連動で WhenAny メソッドを使用します。
Task.Delay
Task.Delay メソッドを使用して、非同期メソッドの実行を一時停止できます。 これは、ポーリング ループのビルド、あらかじめ指定された期間にわたるユーザー入力処理の遅延など、さまざまな機能に役立ちます。 Task.Delay メソッドは、Task.WhenAny との連動で使用し、待機にタイムアウトを実装する場合にも便利です。
大規模な非同期操作 (ASP.NET Web サービスなど) の一部であるタスクが完了するまで時間がかかる場合 (特に操作が完了しない場合) には、操作全般に影響する可能性があります。 そのため、非同期操作を待機するときにタイムアウトできるようにすることが重要です。 同期の Task.Wait、Task.WaitAll、Task.WaitAny メソッドはタイムアウト値を受け取りますが、対応する TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny と前述の Task.WhenAll/Task.WhenAny メソッドは受け取りません。 代わりに、Task.Delay と Task.WhenAny を併用し、タイムアウトを実装できます。
たとえば、UI アプリケーションで、イメージをダウンロードし、そのダウンロード中は UI を無効にするとします。 ただし、ダウンロードに時間がかかりすぎる場合には、UI を再び有効にしてダウンロードを破棄します。
public async void btnDownload_Click(object sender, EventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap> download = GetBitmapAsync(url);
if (download == await Task.WhenAny(download, Task.Delay(3000)))
{
Bitmap bmp = await download;
pictureBox.Image = bmp;
status.Text = "Downloaded";
}
else
{
pictureBox.Image = null;
status.Text = "Timed out";
var ignored = download.ContinueWith(
t => Trace("Task finally completed"));
}
}
finally { btnDownload.Enabled = true; }
}
複数のダウンロードにも同じことが当てはまります。次のように、WhenAll がタスクを返すためです。
public async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap[]> downloads =
Task.WhenAll(from url in urls select GetBitmapAsync(url));
if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
{
foreach(var bmp in downloads.Result) panel.AddImage(bmp);
status.Text = "Downloaded";
}
else
{
status.Text = "Timed out";
downloads.ContinueWith(t => Log(t));
}
}
finally { btnDownload.Enabled = true; }
}
タスク ベースの連結子のビルド
タスクは、非同期操作を完全に表現し、操作の結合、その結果の取得などの同期機能と非同期機能を提供することができるため、大きなパターンをビルドするためのタスクを構成する有益な連結子ライブラリを構築できるようになります。 前のセクションで説明したとおり、.NET には、いくつか連結子が組み込まれていますが、自分独自のものをビルドすることもできます。 以下のセクションでは、有効な連結子メソッドと型の例をいくつか示します。
RetryOnFault
多くの状況では、前の操作が失敗した場合に再試行することが望まれます。 同期コードの場合は、次の例のように RetryOnFault
などのヘルパー メソッドをビルドしてこれを行うことができます。
public static T RetryOnFault<T>(
Func<T> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return function(); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
非同期操作の場合も、ほとんど同じヘルパー メソッドをビルドでき、TAP を使用して非同期操作を実装し、タスクを返します。
public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
その後、次のように、この連結子を使用してアプリケーションのロジックに再試行をエンコードできます。
// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
() => DownloadStringTaskAsync(url), 3);
RetryOnFault
関数をさらに拡張できます。 たとえば、この関数は別の Func<Task>
を受け取り、次のように再試行のタイミングを判断するために、再試行の間に呼び出されます。
public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
await retryWhen().ConfigureAwait(false);
}
return default(T);
}
次の関数を使用して、操作を再試行する前に 1 秒待機するようにできます。
// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
() => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));
NeedOnlyOne
操作の待機時間を改善し、成功の確率を高めるために、冗長性を活用できる場合があります。 株価情報を提供する複数の Web サービスがあるとします。しかし、時間によって、各サービスの品質レベルと応答時間が変化します。 この変動に対応するには、すべての Web サービスに要求を出し、最初の応答を取得した時点で残りの要求を取り消します。 複数の操作を起動し、応答を待機し、残りの操作を取り消すというこの一般的なパターンの実装は、ヘルパー関数を実装して簡略化できます。 次の例の NeedOnlyOne
関数は、このシナリオを示します。
public static async Task<T> NeedOnlyOne(
params Func<CancellationToken,Task<T>> [] functions)
{
var cts = new CancellationTokenSource();
var tasks = (from function in functions
select function(cts.Token)).ToArray();
var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
cts.Cancel();
foreach(var task in tasks)
{
var ignored = task.ContinueWith(
t => Log(t), TaskContinuationOptions.OnlyOnFaulted);
}
return completed;
}
この関数は次のように使用できます。
double currentPrice = await NeedOnlyOne(
ct => GetCurrentPriceFromServer1Async("msft", ct),
ct => GetCurrentPriceFromServer2Async("msft", ct),
ct => GetCurrentPriceFromServer3Async("msft", ct));
インターリーブされた操作
多数のタスクを使用する際に、インターリーブのシナリオをサポートするために WhenAny メソッドを使用すると、パフォーマンスの問題が発生することがあります。 WhenAny を呼び出すたびに、継続が各タスクに登録されます。 タスクの数を N とすると、インターリーブ操作の有効期間に作成された O (N2) の継続になります。 多数のタスクを使用する場合は、連結子 (次の例の Interleaved
) を使用してパフォーマンスの問題に対処できます。
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
select new TaskCompletionSource<T>()).ToList();
int nextTaskIndex = -1;
foreach (var inputTask in inputTasks)
{
inputTask.ContinueWith(completed =>
{
var source = sources[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsFaulted)
source.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled)
source.TrySetCanceled();
else
source.TrySetResult(completed.Result);
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return from source in sources
select source.Task;
}
タスクが完了すると、たとえば次のように、連結子を使用してタスクの結果を処理できます。
IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
int result = await task;
…
}
WhenAllOrFirstException
特定のスキャッター/ギャザー シナリオでは、すべてのタスクの完了を待機することが考えられますが、そのうちの 1 つがエラーになった場合には、例外の発生時点で待機を停止することが望まれます。 これは、次の例の WhenAllOrFirstException
などの連結子メソッドを使用して実行できます。
public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
var inputs = tasks.ToList();
var ce = new CountdownEvent(inputs.Count);
var tcs = new TaskCompletionSource<T[]>();
Action<Task> onCompleted = (Task completed) =>
{
if (completed.IsFaulted)
tcs.TrySetException(completed.Exception.InnerExceptions);
if (ce.Signal() && !tcs.Task.IsCompleted)
tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
};
foreach (var t in inputs) t.ContinueWith(onCompleted);
return tcs.Task;
}
タスク ベースのデータ構造のビルド
カスタム タスク ベースの連結子をビルドする機能に加えて、非同期操作の結果と、結合する必要な同期の両方を表す Task および Task<TResult> 内にデータ構造を配置することで、非同期シナリオに使用するカスタム データ構造をビルドするための強力な型になります。
AsyncCache
タスクの重要な側面の 1 つに、タスクを待機する複数のコンシューマーに渡し、タスクに継続を登録し、その結果または例外を取得できることなどがあります (Task<TResult> の場合)。 これで Task と Task<TResult> が非同期キャッシュ インフラストラクチャで使用できるように調整されます。 以下に、Task<TResult> 上にビルドした小さいながらも強力な非同期キャッシュの例を示します。
public class AsyncCache<TKey, TValue>
{
private readonly Func<TKey, Task<TValue>> _valueFactory;
private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;
public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
{
if (valueFactory == null) throw new ArgumentNullException("valueFactory");
_valueFactory = valueFactory;
_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
}
public Task<TValue> this[TKey key]
{
get
{
if (key == null) throw new ArgumentNullException("key");
return _map.GetOrAdd(key, toAdd =>
new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
}
}
}
AsyncCache<TKey,TValue> クラスでは、コンストラクターへのデリゲートとして TKey
を受け取る関数を受け入れ、Task<TResult> を返します。 以前にキャッシュからアクセスした値は内部ディクショナリに格納され、AsyncCache
によって、キャッシュに同時にアクセスしてもキーごとにタスクが 1 つしか生成されないようにします。
たとえば、次のようにダウンロードされた Web ページのキャッシュをビルドできます。
private AsyncCache<string,string> m_webPages =
new AsyncCache<string,string>(DownloadStringTaskAsync);
Web ページのコンテンツが必要なときはいつでも、非同期メソッドでこのキャッシュを使用できます。 AsyncCache
クラスによって、できるだけ少ない数のページが確実にダウンロードされるようになり、結果がキャッシュされます。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtContents.Text = await m_webPages["https://www.microsoft.com"];
}
finally { btnDownload.IsEnabled = true; }
}
AsyncProducerConsumerCollection
タスクは、非同期アクティビティを調整するためのデータ構造のビルドにも使用できます。 プロデューサー/コンシューマーというクラシックな並列設計パターンの 1 つについて考えてみます。 このパターンでは、コンシューマーによって使用されるデータをプロデューサーが生成し、プロデューサーおよびコンシューマーは並列に実行できます。 たとえば、コンシューマーは、項目 2 を生成中のプロデューサーが以前に生成した項目 1 を処理します。 プロデューサー/コンシューマー パターンでは、新しいデータについてコンシューマーに通知され、そのデータが使用可能な場合にコンシューマーが見つけられるように、プロデューサーが作成した作業を格納するデータ構造が必ず必要です。
プロデューサーおよびコンシューマーとして使用する非同期メソッドを有効にする、タスク上にビルドされた単純なデータ構造を以下に示します。
public class AsyncProducerConsumerCollection<T>
{
private readonly Queue<T> m_collection = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_waiting =
new Queue<TaskCompletionSource<T>>();
public void Add(T item)
{
TaskCompletionSource<T> tcs = null;
lock (m_collection)
{
if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
else m_collection.Enqueue(item);
}
if (tcs != null) tcs.TrySetResult(item);
}
public Task<T> Take()
{
lock (m_collection)
{
if (m_collection.Count > 0)
{
return Task.FromResult(m_collection.Dequeue());
}
else
{
var tcs = new TaskCompletionSource<T>();
m_waiting.Enqueue(tcs);
return tcs.Task;
}
}
}
}
このデータ構造を使用すると、次のようなコードを作成できます。
private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.Take();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Add(data);
}
System.Threading.Tasks.Dataflow 名前空間には BufferBlock<T> 型が含まれます。これは同様に使用できますが、カスタムのコレクション型をビルドする必要はありません。
private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.ReceiveAsync();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Post(data);
}
注意
System.Threading.Tasks.Dataflow 名前空間は、NuGet パッケージとして使用できます。 System.Threading.Tasks.Dataflow 名前空間が含まれるアセンブリをインストールするには、Visual Studio で自分のプロジェクトを開き、[プロジェクト] メニューの [NuGet パッケージの管理] を選択し、System.Threading.Tasks.Dataflow
パッケージをオンライン検索します。
関連項目
.NET