建立 gRPC 服務和方法
注意
這不是這篇文章的最新版本。 如需目前的版本,請參閱 本文的 .NET 9 版本。
警告
不再支援此版本的 ASP.NET Core。 如需詳細資訊,請參閱 .NET 和 .NET Core 支持原則。 如需目前的版本,請參閱 本文的 .NET 9 版本。
本文件說明如何在 C# 中建立 gRPC 服務和方法。 主題包括:
- 如何在
.proto
檔案中定義服務和方法。 - 使用 gRPC C# 工具產生的程式碼。
- 實作 gRPC 服務和方法。
建立新的 gRPC 服務
搭配 C# 的 gRPC 服務引進了 gRPC 的合約優先方法來進行 API 開發。 先在 .proto
檔案中定義服務和訊息。 C# 工具隨即從 .proto
檔案產生程式碼。 對於伺服器端資產,系統會為每項服務產生抽象基底類型,以及任何訊息的類別。
下列 .proto
檔案會:
- 定義
Greeter
服務。 Greeter
服務則會定義SayHello
呼叫。SayHello
傳送HelloRequest
訊息並接收HelloReply
訊息
syntax = "proto3";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
C# 工具會產生 C# GreeterBase
基底類型:
public abstract partial class GreeterBase
{
public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
}
public class HelloRequest
{
public string Name { get; set; }
}
public class HelloReply
{
public string Message { get; set; }
}
根據預設,產生的 GreeterBase
不會執行任何動作。 其虛擬 SayHello
方法會將 UNIMPLEMENTED
錯誤傳回給任何呼叫該方法的用戶端。 為了使服務發揮作用,應用程式必須建立 GreeterBase
的具體實作:
public class GreeterService : GreeterBase
{
public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
}
}
ServerCallContext
會提供伺服器端呼叫的內容。
服務實作會向應用程式註冊。 如果服務是由 ASP.NET Core gRPC 所裝載,則應該使用 MapGrpcService
方法將其新增至路由管線。
app.MapGrpcService<GreeterService>();
如需詳細資訊,請參閱搭配 ASP.NET Core 的 gRPC 服務。
實作 gRPC 方法
gRPC 服務可以有不同類型的方法。 服務傳送和接收訊息的方式則取決於所定義的方法類型。 gRPC 方法的類型有:
- 一元
- 伺服器串流
- 用戶端串流
- 雙向串流
串流呼叫是使用 .proto
檔案中的 stream
關鍵字來指定。 stream
可以放置在呼叫的要求訊息、回應訊息或兩者上。
syntax = "proto3";
service ExampleService {
// Unary
rpc UnaryCall (ExampleRequest) returns (ExampleResponse);
// Server streaming
rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);
// Client streaming
rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);
// Bi-directional streaming
rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}
每種呼叫類型都有不同的方法簽章。 在具體實作中覆寫從抽象基底服務類型產生的方法,可確保使用正確的引數和傳回類型。
一元方法
一元方法會以要求訊息做為參數,並傳回回應。 傳回回應時,一元呼叫即告完成。
public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
ServerCallContext context)
{
var response = new ExampleResponse();
return Task.FromResult(response);
}
一元呼叫與 Web API 控制器上的動作最為相似。 gRPC 方法與動作有一個重大差異,就是 gRPC 方法無法將要求的各部分繫結至不同的方法引數。 gRPC 方法始終會為傳入的要求資料提供一個訊息引數。 藉由新增欄位至要求訊息,就仍然可將多個值傳送給 gRPC 服務:
message ExampleRequest {
int32 pageIndex = 1;
int32 pageSize = 2;
bool isDescending = 3;
}
伺服器串流方法
伺服器串流方法會以要求訊息做為參數。 由於可以將多個訊息串流回呼叫端,因此使用 responseStream.WriteAsync
來傳送回應訊息。 當方法傳回時,伺服器串流呼叫即告完成。
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
一旦伺服器串流方法啟動,客戶端就無法傳送其他訊息或資料。 有些串流方法是設計成永遠執行。 對於持續性的串流方法,用戶端可於不再需要時取消呼叫。 發生取消時,用戶端會傳送訊號給伺服器,並引發 ServerCallCoNtext.CancellationToken。 CancellationToken
權杖應在伺服器上與非同步方法搭配使用,以便:
- 隨串流呼叫一併取消任何非同步工作。
- 快速結束方法。
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
}
}
用戶端串流方法
用戶端串流方法會在沒有訊息接收方法的情況下啟動。 requestStream
參數是用來從用戶端讀取訊息。 傳回回應訊息時,用戶端串流呼叫即告完成:
public override async Task<ExampleResponse> StreamingFromClient(
IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
// ...
}
return new ExampleResponse();
}
雙向串流方法
雙向串流方法會在沒有訊息接收方法的情況下啟動。 requestStream
參數是用來從用戶端讀取訊息。 該方法可以選擇使用 responseStream.WriteAsync
傳送訊息。 當該方法傳回下列項目時,雙向串流呼叫即告完成:
public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
await responseStream.WriteAsync(new ExampleResponse());
}
}
上述 程式碼:
- 對每個要求傳送一個回應。
- 這是雙向串流的基本用法。
亦有可能支援更複雜的案例,例如同時讀取要求和傳送回應:
public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
// Read requests in a background task.
var readTask = Task.Run(async () =>
{
await foreach (var message in requestStream.ReadAllAsync())
{
// Process request.
}
});
// Send responses until the client signals that it is complete.
while (!readTask.IsCompleted)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
}
}
在雙向串流方法中,用戶端和服務可以隨時傳送訊息給彼此。 雙向方法的最佳實作會根據需求而有所不同。
存取 gRPC 要求標頭
要求訊息並不是用戶端將資料傳送給 gRPC 服務的唯一方式。 在使用 ServerCallContext.RequestHeaders
的服務中可以獲得標頭值。
public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
ServerCallContext context)
{
var userAgent = context.RequestHeaders.GetValue("user-agent");
// ...
return Task.FromResult(new ExampleResponse());
}
使用 gRPC 串流方法進行多執行緒處理
實作使用多執行緒的 gRPC 串流方法有一些重要的注意事項。
讀取器和寫入器執行緒安全性
IAsyncStreamReader<TMessage>
和 IServerStreamWriter<TMessage>
都是一次只能由一個執行緒使用。 對於串流 gRPC 方法,多個執行緒無法同時使用 requestStream.MoveNext()
讀取新訊息。 而且多個執行緒無法同時使用 responseStream.WriteAsync(message)
寫入新訊息。
讓多個執行緒安全地與 gRPC 方法互動的一種方法是使用生產者-消費者模式,並搭配使用System.Threading.Channels。
public override async Task DownloadResults(DataRequest request,
IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));
var consumerTask = Task.Run(async () =>
{
// Consume messages from channel and write to response stream.
await foreach (var message in channel.Reader.ReadAllAsync())
{
await responseStream.WriteAsync(message);
}
});
var dataChunks = request.Value.Chunk(size: 10);
// Write messages to channel from multiple threads.
await Task.WhenAll(dataChunks.Select(
async c =>
{
var message = new DataResult { BytesProcessed = c.Length };
await channel.Writer.WriteAsync(message);
}));
// Complete writing and wait for consumer to complete.
channel.Writer.Complete();
await consumerTask;
}
上述 gRPC 伺服器串流方法會:
- 建立界限通道來產生和取用
DataResult
訊息。 - 啟動工作以從通道讀取訊息,並將其寫入回應資料流。
- 從多個執行緒將訊息寫入通道。
注意
雙向串流方法會採用 IAsyncStreamReader<TMessage>
和 IServerStreamWriter<TMessage>
作為引數。 在彼此不同的執行緒上使用這些類型是安全的。
在呼叫結束後與 gRPC 方法互動
在 gRPC 方法結束後,gRPC 呼叫會結束於伺服器上。 在呼叫結束後使用下列傳遞給 gRPC 方法的引數並不安全:
ServerCallContext
IAsyncStreamReader<TMessage>
IServerStreamWriter<TMessage>
如果 gRPC 方法啟動了使用這些類型的背景工作,它必須先完成工作,gRPC 方法才會結束。 在 gRPC 方法存在後繼續使用內容、資料流讀取器或資料流寫入器,會導致錯誤和不可預測的行為。
在下列範例中,伺服器串流方法可以在呼叫完成後寫入回應資料流:
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
_ = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
}
對於上述範例,解決方案是在結束該方法之前先等候寫入工作:
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
var writeTask = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
await writeTask;
}