ASP.NET Core SignalR에서 스트리밍 사용
작성자: Brennan Conroy
ASP.NET Core SignalR은 클라이언트에서 서버로, 서버에서 클라이언트로의 스트리밍을 지원합니다. 이는 시간이 지남에 따라 데이터 조각이 도착하는 시나리오에 유용합니다. 스트리밍할 때 각 조각은 모든 데이터를 사용할 수 있을 때까지 기다리지 않고 사용할 수 있게 되는 즉시 클라이언트 또는 서버로 전송됩니다.
스트리밍을 위한 허브 설정
허브 메서드는 IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
또는 Task<ChannelReader<T>>
를 반환할 때 자동으로 스트리밍 허브 메서드가 됩니다.
서버-클라이언트 스트리밍
스트리밍 허브 메서드는 ChannelReader<T>
외에도 IAsyncEnumerable<T>
을 반환할 수 있습니다. IAsyncEnumerable<T>
을 반환하는 가장 간단한 방법은 다음 샘플과 같이 허브 메서드를 비동기 반복기 메서드로 만드는 것입니다. 허브 비동기 반복기 메서드는 클라이언트가 스트림에서 구독을 취소할 때 트리거되는 CancellationToken
매개 변수를 수락할 수 있습니다. 비동기 반복기 메서드는 ChannelWriter<T>를 완료하지 않고 조기에 ChannelReader
를 반환하거나 메서드를 종료하지 않는 등 채널과 관련된 문제를 방지합니다.
참고 항목
다음 샘플에는 C# 8.0 이상이 필요합니다.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
다음 샘플에서는 채널을 사용하여 클라이언트로 데이터를 스트리밍하는 기본 사항을 보여 줍니다. 개체가 ChannelWriter<T>에 기록될 때마다 개체는 클라이언트에 즉시 전송됩니다. 마지막에 ChannelWriter
가 완료되어 클라이언트에 스트림이 닫혔음을 알립니다.
참고 항목
백그라운드 스레드에서 ChannelWriter<T>
에 쓰고 최대한 빨리 ChannelReader
를 반환합니다. 다른 허브 호출은 ChannelReader
가 반환될 때까지 차단됩니다.
try ... catch
문에 논리를 래핑합니다. finally
블록에서 Channel
을 완료합니다. 오류를 흐르게 하려면 catch
블록 내에서 오류를 캡처하고 finally
블록에 씁니다.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
서버-클라이언트 스트리밍 허브 메서드는 클라이언트가 스트림에서 구독을 취소할 때 트리거되는 CancellationToken
매개 변수를 수락할 수 있습니다. 이 토큰을 사용하여 서버 작업을 중지하고 스트림이 끝나기 전에 클라이언트의 연결이 끊어지면 리소스를 해제합니다.
클라이언트-서버 스트리밍
허브 메서드는 ChannelReader<T> 또는 IAsyncEnumerable<T> 형식의 개체를 하나 이상 허용하면 자동으로 클라이언트-서버 스트리밍 허브 메서드가 됩니다. 다음 샘플에서는 클라이언트에서 보낸 스트리밍 데이터를 읽는 기본 사항을 보여 줍니다. 클라이언트가 ChannelWriter<T>에 쓸 때마다 허브 메서드가 읽는 서버의 ChannelReader
에 데이터가 기록됩니다.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
메서드의 IAsyncEnumerable<T> 버전은 다음과 같습니다.
참고 항목
다음 샘플에는 C# 8.0 이상이 필요합니다.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET 클라이언트
서버-클라이언트 스트리밍
HubConnection
의 StreamAsync
및 StreamAsChannelAsync
메서드는 서버-클라이언트 스트리밍 메서드를 호출하는 데 사용됩니다. 허브 메서드에 정의된 허브 메서드 이름 및 인수를 StreamAsync
또는 StreamAsChannelAsync
에 전달합니다. StreamAsync<T>
및 StreamAsChannelAsync<T>
의 제네릭 매개 변수는 스트리밍 메서드에서 반환하는 개체의 형식을 지정합니다. IAsyncEnumerable<T>
또는 ChannelReader<T>
형식의 개체가 스트림 호출에서 반환되고 클라이언트의 스트림을 나타냅니다.
IAsyncEnumerable<int>
을 반환하는 StreamAsync
예제입니다.
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
ChannelReader<int>
를 반환하는 해당 StreamAsChannelAsync
예제입니다.
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
위의 코드에서:
HubConnection
의StreamAsChannelAsync
메서드는 서버-클라이언트 스트리밍 메서드를 호출하는 데 사용됩니다. 허브 메서드에 정의된 허브 메서드 이름 및 인수를StreamAsChannelAsync
에 전달합니다.StreamAsChannelAsync<T>
의 제네릭 매개 변수는 스트리밍 메서드에서 반환하는 개체의 형식을 지정합니다.ChannelReader<T>
는 스트림 호출에서 반환되고 클라이언트의 스트림을 나타냅니다.
클라이언트-서버 스트리밍
.NET 클라이언트에서 클라이언트-서버 스트리밍 허브 메서드를 호출하는 방법에는 두 가지가 있습니다. IAsyncEnumerable<T>
또는 ChannelReader
를 호출된 허브 메서드에 따라 SendAsync
, InvokeAsync
또는 StreamAsChannelAsync
에 인수로 전달할 수 있습니다.
IAsyncEnumerable
또는 ChannelWriter
개체에 데이터를 쓸 때마다 서버의 허브 메서드는 클라이언트의 데이터가 있는 새 항목을 받습니다.
IAsyncEnumerable
개체를 사용하는 경우 스트림 항목을 반환하는 메서드가 종료된 후 스트림이 종료됩니다.
참고 항목
다음 샘플에는 C# 8.0 이상이 필요합니다.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
또는 ChannelWriter
를 사용하는 경우 channel.Writer.Complete()
를 사용하여 채널을 완료합니다.
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript 클라이언트
서버-클라이언트 스트리밍
JavaScript 클라이언트는 connection.stream
을 사용하여 허브에서 서버-클라이언트 스트리밍 메서드를 호출합니다. stream
메서드는 두 개의 인수를 허용합니다.
- 허브 메서드의 이름. 다음 예제에서 허브 메서드 이름은
Counter
입니다. - 허브 메서드에 정의된 인수입니다. 다음 예제에서 인수는 수신할 스트림 항목 수와 스트림 항목 간의 지연 횟수입니다.
connection.stream
은 subscribe
메서드를 포함하는 IStreamResult
를 반환합니다. IStreamSubscriber
를 subscribe
에 전달하고 next
, error
및 complete
콜백을 설정하여 stream
호출에서 알림을 받습니다.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
클라이언트에서 스트림을 종료하려면 subscribe
메서드에서 반환된 ISubscription
에서 dispose
메서드를 호출합니다. 이 메서드를 호출하면 허브 메서드의 CancellationToken
매개 변수가 취소됩니다(제공된 경우).
클라이언트-서버 스트리밍
JavaScript 클라이언트는 호출된 허브 메서드에 따라 Subject
를 send
, invoke
또는 stream
에 인수로 전달하여 허브에서 클라이언트-서버 스트리밍 메서드를 호출합니다. Subject
는 Subject
와 같은 클래스입니다. 예를 들어 RxJS에서 해당 라이브러리의 Subject 클래스를 사용할 수 있습니다.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
항목을 사용하여 subject.next(item)
를 호출하면 스트림에 항목이 기록되고 허브 메서드가 서버의 항목을 받습니다.
스트림을 종료하려면 subject.complete()
를 호출합니다.
Java 클라이언트
서버-클라이언트 스트리밍
SignalR Java 클라이언트는 stream
메서드를 사용하여 스트리밍 메서드를 호출합니다. stream
은 세 개 이상의 인수를 허용합니다.
- 스트림 항목의 예상 형식입니다.
- 허브 메서드의 이름.
- 허브 메서드에 정의된 인수입니다.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
HubConnection
의 stream
메서드는 스트림 항목 형식의 Observable을 반환합니다. Observable 형식의 subscribe
메서드는 onNext
, onError
및 onCompleted
처리기가 정의된 위치입니다.
클라이언트-서버 스트리밍
SignalR Java 클라이언트는 호출된 허브 메서드에 따라 Observable을 send
, invoke
또는 stream
에 인수로 전달하여 허브에서 클라이언트-서버 스트리밍 메서드를 호출할 수 있습니다.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
항목을 사용하여 stream.onNext(item)
를 호출하면 스트림에 항목이 기록되고 허브 메서드가 서버의 항목을 받습니다.
스트림을 종료하려면 stream.onComplete()
를 호출합니다.
추가 리소스
ASP.NET Core