创建 gRPC 服务和方法
注意
此版本不是本文的最新版本。 有关当前版本,请参阅本文的 .NET 9 版本。
警告
此版本的 ASP.NET Core 不再受支持。 有关详细信息,请参阅 .NET 和 .NET Core 支持策略。 有关当前版本,请参阅本文的 .NET 9 版本。
本文档介绍如何以 C# 创建 gRPC 服务和方法。 主题包括:
- 如何在
.proto
文件中定义服务和方法。 - 使用 gRPC C# 工具生成的代码。
- 实现 gRPC 服务和方法。
创建新的 gRPC 服务
使用 C# 的 gRPC 服务介绍了 gRPC 的 API 开发协定优先方法。 服务和消息是在 .proto
文件中定义的。 然后,C# 工具从 .proto
文件生成代码。 对于服务器端资产,将为每个服务生成一个抽象基类型,同时为所有消息生成类。
以下 .proto
文件:
- 定义
Greeter
服务。 Greeter
服务定义SayHello
调用。SayHello
发送HelloRequest
消息并接收HelloReply
消息
syntax = "proto3";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
C# 工具生成 C# GreeterBase
基类型:
public abstract partial class GreeterBase
{
public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
}
public class HelloRequest
{
public string Name { get; set; }
}
public class HelloReply
{
public string Message { get; set; }
}
默认情况下,生成的 GreeterBase
不执行任何操作。 它的虚拟 SayHello
方法会将 UNIMPLEMENTED
错误返回到调用它的任何客户端。 为了使服务有用,应用必须创建 GreeterBase
的具体实现:
public class GreeterService : GreeterBase
{
public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
}
}
ServerCallContext
提供服务器端调用的上下文。
服务实现已注册到应用。 如果服务由 ASP.NET Core gRPC 托管,则应使用 MapGrpcService
方法将其添加到路由管道。
app.MapGrpcService<GreeterService>();
有关详细信息,请参阅 ASP.NET Core 的 gRPC 服务。
实现 gRPC 方法
gRPC 服务可以有不同类型的方法。 服务发送和接收消息的方式取决于所定义的方法的类型。 gRPC 方法类型如下:
- 一元
- 服务器流式处理
- 客户端流式处理
- 双向流式处理
流式处理调用是使用 stream
关键字在 .proto
文件中指定的。 stream
可以放置在调用的请求消息和/或响应消息中。
syntax = "proto3";
service ExampleService {
// Unary
rpc UnaryCall (ExampleRequest) returns (ExampleResponse);
// Server streaming
rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);
// Client streaming
rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);
// Bi-directional streaming
rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}
每个调用类型都有不同的方法签名。 在具体实现中替代从抽象基本服务类型生成的方法,可确保使用正确的参数和返回类型。
一元方法
一元方法将请求消息作为参数,并返回响应。 返回响应时,一元调用完成。
public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
ServerCallContext context)
{
var response = new ExampleResponse();
return Task.FromResult(response);
}
一元调用与 Web API 控制器上的操作最为相似。 gRPC 方法与操作的一个重要区别是,gRPC 方法无法将请求的某些部分绑定到不同的方法参数。 对于传入请求数据,gRPC 方法始终有一个消息参数。 通过在请求消息中添加字段,仍可以将多个值发送到 gRPC 服务:
message ExampleRequest {
int32 pageIndex = 1;
int32 pageSize = 2;
bool isDescending = 3;
}
服务器流式处理方法
服务器流式处理方法将请求消息作为参数。 由于可以将多个消息流式传输回调用方,因此可使用 responseStream.WriteAsync
发送响应消息。 当方法返回时,服务器流式处理调用完成。
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
服务器流式处理方法启动后,客户端无法发送其他消息或数据。 某些流式处理方法设计为永久运行。 对于连续流式处理方法,客户端可以在不再需要调用时将其取消。 当发生取消时,客户端会将信号发送到服务器,并引发 ServerCallContext.CancellationToken。 应在服务器上通过异步方法使用 CancellationToken
标记,以实现以下目的:
- 所有异步工作都与流式处理调用一起取消。
- 该方法快速退出。
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
}
}
客户端流式处理方法
客户端流式处理方法在该方法没有接收消息的情况下启动。 requestStream
参数用于从客户端读取消息。 返回响应消息时,客户端流式处理调用完成:
public override async Task<ExampleResponse> StreamingFromClient(
IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
// ...
}
return new ExampleResponse();
}
双向流式处理方法
双向流式处理方法在该方法没有接收到消息的情况下启动。 requestStream
参数用于从客户端读取消息。 该方法可选择使用 responseStream.WriteAsync
发送消息。 当方法返回时,双向流式处理调用完成:
public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
await responseStream.WriteAsync(new ExampleResponse());
}
}
前面的代码:
- 发送每个请求的响应。
- 是双向流式处理的基本用法。
可以支持更复杂的方案,例如同时读取请求和发送响应:
public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
// Read requests in a background task.
var readTask = Task.Run(async () =>
{
await foreach (var message in requestStream.ReadAllAsync())
{
// Process request.
}
});
// Send responses until the client signals that it is complete.
while (!readTask.IsCompleted)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
}
}
在双向流式处理方法中,客户端和服务可在任何时间互相发送消息。 双向方法的最佳实现根据需求而有所不同。
访问 gRPC 请求标头
请求消息并不是客户端将数据发送到 gRPC 服务的唯一方法。 标头值在使用 ServerCallContext.RequestHeaders
的服务中可用。
public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
ServerCallContext context)
{
var userAgent = context.RequestHeaders.GetValue("user-agent");
// ...
return Task.FromResult(new ExampleResponse());
}
使用 gRPC 流式处理方法的多线程处理
实现使用多个线程的 gRPC 流式处理方法有一些重要的注意事项。
读取器和编写器线程安全性
IAsyncStreamReader<TMessage>
和 IServerStreamWriter<TMessage>
一次只能由一个线程使用。 对于流式处理 gRPC 方法,多个线程无法使用 requestStream.MoveNext()
同时读取新消息。 多个线程无法使用 responseStream.WriteAsync(message)
同时写入新消息。
多个线程能够与 gRPC 方法实现交互的一种安全方法是将生成方-使用者模式与 System.Threading.Channels 配合使用。
public override async Task DownloadResults(DataRequest request,
IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));
var consumerTask = Task.Run(async () =>
{
// Consume messages from channel and write to response stream.
await foreach (var message in channel.Reader.ReadAllAsync())
{
await responseStream.WriteAsync(message);
}
});
var dataChunks = request.Value.Chunk(size: 10);
// Write messages to channel from multiple threads.
await Task.WhenAll(dataChunks.Select(
async c =>
{
var message = new DataResult { BytesProcessed = c.Length };
await channel.Writer.WriteAsync(message);
}));
// Complete writing and wait for consumer to complete.
channel.Writer.Complete();
await consumerTask;
}
之前的 gRPC 服务器流式处理方法:
- 创建用于生成和使用
DataResult
消息的有界通道。 - 启动从通道读取消息,然后将消息写入响应流的任务。
- 从多个线程将消息写入通道。
注意
双向流式处理方法采用 IAsyncStreamReader<TMessage>
和 IServerStreamWriter<TMessage>
作为自变量。 在彼此独立的线程上使用这些类型是安全的。
调用结束后与 gRPC 方法交互
gRPC 方法退出后,gRPC 调用将在服务器上结束。 在调用结束后,使用以下传递到 gRPC 方法的自变量并不安全:
ServerCallContext
IAsyncStreamReader<TMessage>
IServerStreamWriter<TMessage>
如果 gRPC 方法启动使用这些类型的后台任务,则必须在 gRPC 方法退出之前完成任务。 在 gRPC 方法退出后,若继续使用上下文、流读取器或流编写器,将导致错误和不可预知的行为。
在以下示例中,服务器流式处理方法可以在调用完成后写入响应流:
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
_ = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
}
对于前面的示例,解决方案是在退出方法之前等待写入任务:
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
var writeTask = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
await writeTask;
}