ASP.NET Core SignalR でストリーミングを使う
作成者: Brennan Conroy
ASP.NET Core SignalR は、クライアントからサーバーへ、およびサーバーからクライアントへのストリーミングをサポートしています。 これは、時間の経過と共にデータのフラグメントを受信するシナリオに便利です。 ストリーミング時には、すべてのデータが使用可能になるまで待つのではなく、各フラグメントが使用可能になるとすぐにクライアントまたはサーバーに送信されます。
サンプル コードを表示またはダウンロードします (ダウンロード方法)。
ストリーミング用のハブを設定する
IAsyncEnumerable<T>、ChannelReader<T>、Task<IAsyncEnumerable<T>>
、または Task<ChannelReader<T>>
が返されると、ハブ メソッドは自動的にストリーミング ハブ メソッドになります。
サーバーからクライアントへのストリーミング
ストリーミング ハブ メソッドからは、ChannelReader<T>
に加えて IAsyncEnumerable<T>
を返すことができます。 IAsyncEnumerable<T>
を返す最もシンプルな方法は、次のサンプルに示すように、ハブ メソッドを非同期反復子メソッドにすることです。 ハブ非同期反復子メソッドは、クライアントがストリームからサブスクライブを解除したときにトリガーされる CancellationToken
パラメーターを受け取ることができます。 非同期反復子メソッドを使うと、ChannelReader
が適切な速さで返されない、ChannelWriter<T> を完了せずにメソッドが終了するなど、チャネルでよく発生する問題を回避できます。
Note
次の例では、C# 8.0 以降が必要です。
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
次のサンプルは、チャネルを使ってクライアントにデータをストリーミングする基本的な方法を示しています。 ChannelWriter<T> にオブジェクトが書き込まれるたびに、そのオブジェクトは直ちにクライアントに送信されます。 最後に ChannelWriter
が完了し、ストリームが閉じられたことがクライアントに通知されます。
Note
バックグラウンド スレッドで ChannelWriter<T>
に書き込み、できるだけ早く ChannelReader
を返します。 ChannelReader
が返されるまで、他のハブの呼び出しはブロックされます。
ロジックを try ... catch
ステートメントでラップします。 finally
ブロックで Channel
を完了します。 エラーをフローしたい場合は、catch
ブロック内でキャプチャし、finally
ブロック内で書き込みます。
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
サーバーからクライアントへのストリーミング ハブ メソッドは、クライアントがストリームからサブスクライブを解除したときにトリガーされる CancellationToken
パラメーターを受け取ることができます。 クライアントがストリームの終了前に切断した場合は、このトークンを使ってサーバーの操作を停止し、リソースを解放します。
クライアントからサーバーへのストリーミング
型 ChannelReader<T> または IAsyncEnumerable<T> の 1 つ以上のオブジェクトを受け取ると、ハブ メソッドは自動的にクライアントからサーバーへのストリーミング ハブ メソッドになります。 次のサンプルは、クライアントから送信されたストリーミング データを読み取る基本を示しています。 クライアントから ChannelWriter<T> に書き込まれるたびに、ハブ メソッドから読み取られるサーバーの ChannelReader
にデータが書き込まれます。
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
IAsyncEnumerable<T> バージョンのメソッドが続きます。
Note
次の例では、C# 8.0 以降が必要です。
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET クライアント
サーバーからクライアントへのストリーミング
HubConnection
の StreamAsync
と StreamAsChannelAsync
のメソッドは、サーバーからクライアントへのストリーミング メソッドを呼び出すために使われます。 ハブ メソッド名と、ハブ メソッドに定義されている引数を StreamAsync
または StreamAsChannelAsync
に渡します。 StreamAsync<T>
と StreamAsChannelAsync<T>
のジェネリック パラメーターには、ストリーミング メソッドから返されるオブジェクトの型を指定します。 型 IAsyncEnumerable<T>
または ChannelReader<T>
のオブジェクトは、ストリームの呼び出しから返され、クライアント上のストリームを表します。
IAsyncEnumerable<int>
を返す StreamAsync
の例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
ChannelReader<int>
を返す、対応する StreamAsChannelAsync
の例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
上のコードでは、次のようになります。
HubConnection
に対するStreamAsChannelAsync
メソッドは、サーバーからクライアントへのストリーミング メソッドを呼び出すために使われます。 ハブ メソッド名と、ハブ メソッドに定義されている引数をStreamAsChannelAsync
に渡します。StreamAsChannelAsync<T>
のジェネリック パラメーターには、ストリーミング メソッドから返されるオブジェクトの型を指定します。ChannelReader<T>
は、ストリームの呼び出しから返され、クライアント上のストリームを表します。
クライアントからサーバーへのストリーミング
クライアントからサーバーへのストリーミング ハブ メソッドを .NET クライアントから呼び出す方法は 2 つあります。 呼び出されたハブ メソッドに応じて、SendAsync
、InvokeAsync
、StreamAsChannelAsync
の引数として IAsyncEnumerable<T>
または ChannelReader
を渡すことができます。
IAsyncEnumerable
または ChannelWriter
オブジェクトにデータが書き込まれるたびに、サーバー上のハブ メソッドは、クライアントからのデータを含む新しい項目を受け取ります。
IAsyncEnumerable
オブジェクトを使っている場合、ストリーム項目を返すメソッドが終了すると、ストリームは終了します。
Note
次の例では、C# 8.0 以降が必要です。
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
また、ChannelWriter
を使っている場合は、channel.Writer.Complete()
を使ってチャネルを完了します。
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript クライアント
サーバーからクライアントへのストリーミング
JavaScript クライアントの場合、connection.stream
を使ってハブ上でサーバーからクライアントへのストリーミング メソッドを呼び出します。 stream
メソッドは、2 つの引数を受け取ります。
- ハブ メソッドの名前。 次の例では、ハブ メソッドの名前は
Counter
です。 - ハブ メソッドで定義されている引数。 次の例の引数は、受信するストリーム項目の数とストリーム項目間の延期期間のカウントです。
connection.stream
から IStreamResult
が返されます。これには subscribe
メソッドが含まれています。 subscribe
に IStreamSubscriber
を渡し、stream
の呼び出しからの通知を受け取る next
、error
、complete
のコールバックを設定します。
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
クライアントからストリームを終了するには、subscribe
メソッドから返された ISubscription
に対して dispose
メソッドを呼び出します。 このメソッドを呼び出すと、ハブ メソッドの CancellationToken
パラメーターを指定していた場合にそれが取り消されます。
クライアントからサーバーへのストリーミング
JavaScript クライアントからは、呼び出されたハブ メソッドに応じて send
、invoke
、または stream
の引数として Subject
を渡すことで、ハブに対するクライアントからサーバーへのストリーミング メソッドが呼び出されます。 Subject
は、Subject
のようなクラスです。 たとえば、RxJS では、そのライブラリの Subject クラスを使うことができます。
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
項目を指定して subject.next(item)
を呼び出すと、ストリームに項目が書き込まれ、ハブ メソッドはサーバー上の項目を受け取ります。
ストリームを終了するには、subject.complete()
を呼び出します。
Java クライアント
サーバーからクライアントへのストリーミング
SignalR Java クライアントには、ストリーミング メソッドを呼び出すために stream
メソッドが使われます。 stream
は 3 つ以上の引数を受け取ります。
- ストリーム項目の想定される型。
- ハブ メソッドの名前。
- ハブ メソッドで定義されている引数。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
HubConnection
に対する stream
メソッドからは、ストリーム項目型の Observable が返されます。 Observable 型の subscribe
メソッドには、onNext
、onError
、onCompleted
のハンドラーが定義されています。
クライアントからサーバーへのストリーミング
SignalR Java クライアントからは、呼び出されたハブ メソッドに応じて send
、invoke
、または stream
の引数として Observable を渡すことで、ハブに対するクライアントからサーバーへのストリーミング メソッドが呼び出されます。
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
項目を指定して stream.onNext(item)
を呼び出すと、ストリームに項目が書き込まれ、ハブ メソッドはサーバー上の項目を受け取ります。
ストリームを終了するには、stream.onComplete()
を呼び出します。
その他のリソース
ASP.NET Core