与其他异步模式和类型互操作

.NET 中异步模式的简短历史记录:

任务和异步编程模型 (APM)

从 APM 到 TAP

因为异步编程模型 (APM) 模式的结构合理,而且能够轻松生成包装,将 APM 实现公开为 TAP 实现。 .NET Framework 4 及更高版本包含采用 FromAsync 方法重载形式的帮助器例程来实现这种转换。

请考虑 Stream 类及其 BeginReadEndRead 方法,它们代表与同步 Read 方法对应的 APM:

public int Read(byte[] buffer, int offset, int count)
Public Function Read(buffer As Byte(), offset As Integer,
                     count As Integer) As Integer
public IAsyncResult BeginRead(byte[] buffer, int offset,
                              int count, AsyncCallback callback,
                              object state)
Public Function BeginRead(buffer As Byte, offset As Integer,
                          count As Integer, callback As AsyncCallback,
                          state As Object) As IAsyncResult
public int EndRead(IAsyncResult asyncResult)
Public Function EndRead(asyncResult As IAsyncResult) As Integer

可以使用 TaskFactory<TResult>.FromAsync 方法来实现此操作的 TAP 包装,如下所示:

public static Task<int> ReadAsync(this Stream stream,
                                  byte[] buffer, int offset,
                                  int count)
{
    if (stream == null)
       throw new ArgumentNullException("stream");

    return Task<int>.Factory.FromAsync(stream.BeginRead,
                                       stream.EndRead, buffer,
                                       offset, count, null);
}
<Extension()>
Public Function ReadAsync(strm As Stream,
                          buffer As Byte(), offset As Integer,
                          count As Integer) As Task(Of Integer)
    If strm Is Nothing Then
        Throw New ArgumentNullException("stream")
    End If

    Return Task(Of Integer).Factory.FromAsync(AddressOf strm.BeginRead,
                                              AddressOf strm.EndRead, buffer,
                                              offset, count, Nothing)
End Function

此实现类似于以下内容:

 public static Task<int> ReadAsync(this Stream stream,
                                   byte [] buffer, int offset,
                                   int count)
 {
    if (stream == null)
        throw new ArgumentNullException("stream");

    var tcs = new TaskCompletionSource<int>();
    stream.BeginRead(buffer, offset, count, iar =>
                     {
                        try {
                           tcs.TrySetResult(stream.EndRead(iar));
                        }
                        catch(OperationCanceledException) {
                           tcs.TrySetCanceled();
                        }
                        catch(Exception exc) {
                           tcs.TrySetException(exc);
                        }
                     }, null);
    return tcs.Task;
}

<Extension()>
Public Function ReadAsync(stream As Stream, buffer As Byte(), _
                          offset As Integer, count As Integer) _
                          As Task(Of Integer)
    If stream Is Nothing Then
        Throw New ArgumentNullException("stream")
    End If

    Dim tcs As New TaskCompletionSource(Of Integer)()
    stream.BeginRead(buffer, offset, count,
                     Sub(iar)
                         Try
                             tcs.TrySetResult(stream.EndRead(iar))
                         Catch e As OperationCanceledException
                             tcs.TrySetCanceled()
                         Catch e As Exception
                             tcs.TrySetException(e)
                         End Try
                     End Sub, Nothing)
    Return tcs.Task
End Function

从 TAP 到 APM

如果现有的基础结构需要 APM 模式,则还需要采用 TAP 实现并在需要 APM 实现的地方使用它。 由于任务可以组合,并且 Task 类实现 IAsyncResult,你可以使用一个简单的 helper 函数执行此操作。 以下代码使用 Task<TResult> 类的扩展,但可以对非泛型任务使用几乎相同的函数。

public static IAsyncResult AsApm<T>(this Task<T> task,
                                    AsyncCallback callback,
                                    object state)
{
    if (task == null)
        throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>(state);
    task.ContinueWith(t =>
                      {
                         if (t.IsFaulted)
                            tcs.TrySetException(t.Exception.InnerExceptions);
                         else if (t.IsCanceled)
                            tcs.TrySetCanceled();
                         else
                            tcs.TrySetResult(t.Result);

                         if (callback != null)
                            callback(tcs.Task);
                      }, TaskScheduler.Default);
    return tcs.Task;
}
<Extension()>
Public Function AsApm(Of T)(task As Task(Of T),
                            callback As AsyncCallback,
                            state As Object) As IAsyncResult
    If task Is Nothing Then
        Throw New ArgumentNullException("task")
    End If

    Dim tcs As New TaskCompletionSource(Of T)(state)
    task.ContinueWith(Sub(antecedent)
                          If antecedent.IsFaulted Then
                              tcs.TrySetException(antecedent.Exception.InnerExceptions)
                          ElseIf antecedent.IsCanceled Then
                              tcs.TrySetCanceled()
                          Else
                              tcs.TrySetResult(antecedent.Result)
                          End If

                          If callback IsNot Nothing Then
                              callback(tcs.Task)
                          End If
                      End Sub, TaskScheduler.Default)
    Return tcs.Task
End Function

现在,请考虑具有以下 TAP 实现的用例:

public static Task<String> DownloadStringAsync(Uri url)
Public Shared Function DownloadStringAsync(url As Uri) As Task(Of String)

并且想要提供此 APM 实现:

public IAsyncResult BeginDownloadString(Uri url,
                                        AsyncCallback callback,
                                        object state)
Public Function BeginDownloadString(url As Uri,
                                    callback As AsyncCallback,
                                    state As Object) As IAsyncResult
public string EndDownloadString(IAsyncResult asyncResult)
Public Function EndDownloadString(asyncResult As IAsyncResult) As String

以下示例演示了一种向 APM 迁移的方法:

public IAsyncResult BeginDownloadString(Uri url,
                                        AsyncCallback callback,
                                        object state)
{
   return DownloadStringAsync(url).AsApm(callback, state);
}

public string EndDownloadString(IAsyncResult asyncResult)
{
   return ((Task<string>)asyncResult).Result;
}
Public Function BeginDownloadString(url As Uri,
                                    callback As AsyncCallback,
                                    state As Object) As IAsyncResult
    Return DownloadStringAsync(url).AsApm(callback, state)
End Function

Public Function EndDownloadString(asyncResult As IAsyncResult) As String
    Return CType(asyncResult, Task(Of String)).Result
End Function

任务和基于事件的异步模式 (EAP)

包装 基于事件的异步模式 (EAP) 实现比包装 APM 模式更为复杂,因为与 APM 模式相比,EAP 模式的变体更多,结构更少。 为了演示,以下代码包装了 DownloadStringAsync 方法。 DownloadStringAsync 接受 URI,在下载时引发 DownloadProgressChanged 事件,以报告进度的多个统计信息,并在完成时引发 DownloadStringCompleted 事件。 最终在指定 URI 中返回一个字符串,其中包含页面内容。

 public static Task<string> DownloadStringAsync(Uri url)
 {
     var tcs = new TaskCompletionSource<string>();
     var wc = new WebClient();
     wc.DownloadStringCompleted += (s,e) =>
         {
             if (e.Error != null)
                tcs.TrySetException(e.Error);
             else if (e.Cancelled)
                tcs.TrySetCanceled();
             else
                tcs.TrySetResult(e.Result);
         };
     wc.DownloadStringAsync(url);
     return tcs.Task;
}
Public Shared Function DownloadStringAsync(url As Uri) As Task(Of String)
    Dim tcs As New TaskCompletionSource(Of String)()
    Dim wc As New WebClient()
    AddHandler wc.DownloadStringCompleted, Sub(s, e)
                                               If e.Error IsNot Nothing Then
                                                   tcs.TrySetException(e.Error)
                                               ElseIf e.Cancelled Then
                                                   tcs.TrySetCanceled()
                                               Else
                                                   tcs.TrySetResult(e.Result)
                                               End If
                                           End Sub
    wc.DownloadStringAsync(url)
    Return tcs.Task
End Function

任务和等待句柄

从等待句柄到 TAP

虽然等待句柄不能实现异步模式,但高级开发人员可以在设置等待句柄时使用 WaitHandle 类和 ThreadPool.RegisterWaitForSingleObject 方法实现异步通知。 可以包装 RegisterWaitForSingleObject 方法以在等待句柄中启用针对任何同步等待的基于任务的替代方法:

public static Task WaitOneAsync(this WaitHandle waitHandle)
{
    if (waitHandle == null)
        throw new ArgumentNullException("waitHandle");

    var tcs = new TaskCompletionSource<bool>();
    var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
        delegate { tcs.TrySetResult(true); }, null, -1, true);
    var t = tcs.Task;
    t.ContinueWith( (antecedent) => rwh.Unregister(null));
    return t;
}
<Extension()>
Public Function WaitOneAsync(waitHandle As WaitHandle) As Task
    If waitHandle Is Nothing Then
        Throw New ArgumentNullException("waitHandle")
    End If

    Dim tcs As New TaskCompletionSource(Of Boolean)()
    Dim rwh As RegisteredWaitHandle = ThreadPool.RegisterWaitForSingleObject(waitHandle,
        Sub(state, timedOut)
            tcs.TrySetResult(True)
        End Sub, Nothing, -1, True)
    Dim t = tcs.Task
    t.ContinueWith(Sub(antecedent)
                       rwh.Unregister(Nothing)
                   End Sub)
    Return t
End Function

使用此方法,可以在异步方法中使用现有 WaitHandle 实现。 例如,若要限制在任何特定时间执行的异步操作数,可以利用信号灯(System.Threading.SemaphoreSlim 对象)。 可以将并发运行的操作数目限制到 N,方法为:初始化到 N 的信号量的数目、在想要执行操作时等待信号量,并在完成操作时释放信号量 :

static int N = 3;

static SemaphoreSlim m_throttle = new SemaphoreSlim(N, N);

static async Task DoOperation()
{
    await m_throttle.WaitAsync();
    // do work
    m_throttle.Release();
}
Shared N As Integer = 3

Shared m_throttle As New SemaphoreSlim(N, N)

Shared Async Function DoOperation() As Task
    Await m_throttle.WaitAsync()
    ' Do work.
    m_throttle.Release()
End Function

还可以构建不依赖等待句柄就完全可以处理任务的异步信号量。 若要执行此操作,可以使用 使用基于任务的异步模式 中所述的用于在 Task

从 TAP 到等待句柄

正如前面所述, Task 类实现 IAsyncResult,且该实现公开 IAsyncResult.AsyncWaitHandle 属性,该属性会返回在 Task 完成时设置的等待句柄。 可以获得 WaitHandleTask ,如下所示:

WaitHandle wh = ((IAsyncResult)task).AsyncWaitHandle;
Dim wh As WaitHandle = CType(task, IAsyncResult).AsyncWaitHandle

请参阅