共用方式為


建立 gRPC 服務和方法

注意

這不是這篇文章的最新版本。 如需目前的版本,請參閱 本文的 .NET 9 版本。

警告

不再支援此版本的 ASP.NET Core。 如需詳細資訊,請參閱 .NET 和 .NET Core 支持原則。 如需目前的版本,請參閱 本文的 .NET 9 版本。

重要

這些發行前產品的相關資訊在產品正式發行前可能會有大幅修改。 Microsoft 對此處提供的資訊,不做任何明確或隱含的瑕疵擔保。

如需目前的版本,請參閱 本文的 .NET 9 版本。

作者:James Newton-King

本文件說明如何在 C# 中建立 gRPC 服務和方法。 主題包括:

  • 如何在 .proto 檔案中定義服務和方法。
  • 使用 gRPC C# 工具產生的程式碼。
  • 實作 gRPC 服務和方法。

建立新的 gRPC 服務

搭配 C# 的 gRPC 服務引進了 gRPC 的合約優先方法來進行 API 開發。 先在 .proto 檔案中定義服務和訊息。 C# 工具隨即從 .proto 檔案產生程式碼。 對於伺服器端資產,系統會為每項服務產生抽象基底類型,以及任何訊息的類別。

下列 .proto 檔案會:

  • 定義 Greeter 服務。
  • Greeter 服務則會定義 SayHello 呼叫。
  • SayHello 傳送 HelloRequest 訊息並接收 HelloReply 訊息
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

C# 工具會產生 C# GreeterBase 基底類型:

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

根據預設,產生的 GreeterBase 不會執行任何動作。 其虛擬 SayHello 方法會將 UNIMPLEMENTED 錯誤傳回給任何呼叫該方法的用戶端。 為了使服務發揮作用,應用程式必須建立 GreeterBase 的具體實作:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext 會提供伺服器端呼叫的內容。

服務實作會向應用程式註冊。 如果服務是由 ASP.NET Core gRPC 所裝載,則應該使用 MapGrpcService 方法將其新增至路由管線。

app.MapGrpcService<GreeterService>();

如需詳細資訊,請參閱搭配 ASP.NET Core 的 gRPC 服務

實作 gRPC 方法

gRPC 服務可以有不同類型的方法。 服務傳送和接收訊息的方式則取決於所定義的方法類型。 gRPC 方法的類型有:

  • 一元
  • 伺服器串流
  • 用戶端串流
  • 雙向串流

串流呼叫是使用 .proto 檔案中的 stream 關鍵字來指定。 stream 可以放置在呼叫的要求訊息、回應訊息或兩者上。

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

每種呼叫類型都有不同的方法簽章。 在具體實作中覆寫從抽象基底服務類型產生的方法,可確保使用正確的引數和傳回類型。

一元方法

一元方法會以要求訊息做為參數,並傳回回應。 傳回回應時,一元呼叫即告完成。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

一元呼叫與 Web API 控制器上的動作最為相似。 gRPC 方法與動作有一個重大差異,就是 gRPC 方法無法將要求的各部分繫結至不同的方法引數。 gRPC 方法始終會為傳入的要求資料提供一個訊息引數。 藉由新增欄位至要求訊息,就仍然可將多個值傳送給 gRPC 服務:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

伺服器串流方法

伺服器串流方法會以要求訊息做為參數。 由於可以將多個訊息串流回呼叫端,因此使用 responseStream.WriteAsync 來傳送回應訊息。 當方法傳回時,伺服器串流呼叫即告完成。

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

一旦伺服器串流方法啟動,客戶端就無法傳送其他訊息或資料。 有些串流方法是設計成永遠執行。 對於持續性的串流方法,用戶端可於不再需要時取消呼叫。 發生取消時,用戶端會傳送訊號給伺服器,並引發 ServerCallCoNtext.CancellationTokenCancellationToken 權杖應在伺服器上與非同步方法搭配使用,以便:

  • 隨串流呼叫一併取消任何非同步工作。
  • 快速結束方法。
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

用戶端串流方法

用戶端串流方法會在沒有訊息接收方法的情況下啟動。 requestStream 參數是用來從用戶端讀取訊息。 傳回回應訊息時,用戶端串流呼叫即告完成:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

雙向串流方法

雙向串流方法會在沒有訊息接收方法的情況下啟動。 requestStream 參數是用來從用戶端讀取訊息。 該方法可以選擇使用 responseStream.WriteAsync 傳送訊息。 當該方法傳回下列項目時,雙向串流呼叫即告完成:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

上述 程式碼:

  • 對每個要求傳送一個回應。
  • 這是雙向串流的基本用法。

亦有可能支援更複雜的案例,例如同時讀取要求和傳送回應:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

在雙向串流方法中,用戶端和服務可以隨時傳送訊息給彼此。 雙向方法的最佳實作會根據需求而有所不同。

存取 gRPC 要求標頭

要求訊息並不是用戶端將資料傳送給 gRPC 服務的唯一方式。 在使用 ServerCallContext.RequestHeaders 的服務中可以獲得標頭值。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

使用 gRPC 串流方法進行多執行緒處理

實作使用多執行緒的 gRPC 串流方法有一些重要的注意事項。

讀取器和寫入器執行緒安全性

IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> 都是一次只能由一個執行緒使用。 對於串流 gRPC 方法,多個執行緒無法同時使用 requestStream.MoveNext() 讀取新訊息。 而且多個執行緒無法同時使用 responseStream.WriteAsync(message) 寫入新訊息。

讓多個執行緒安全地與 gRPC 方法互動的一種方法是使用生產者-消費者模式,並搭配使用System.Threading.Channels

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

上述 gRPC 伺服器串流方法會:

  • 建立界限通道來產生和取用 DataResult 訊息。
  • 啟動工作以從通道讀取訊息,並將其寫入回應資料流。
  • 從多個執行緒將訊息寫入通道。

注意

雙向串流方法會採用 IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> 作為引數。 在彼此不同的執行緒上使用這些類型是安全的。

在呼叫結束後與 gRPC 方法互動

在 gRPC 方法結束後,gRPC 呼叫會結束於伺服器上。 在呼叫結束後使用下列傳遞給 gRPC 方法的引數並不安全:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

如果 gRPC 方法啟動了使用這些類型的背景工作,它必須先完成工作,gRPC 方法才會結束。 在 gRPC 方法存在後繼續使用內容、資料流讀取器或資料流寫入器,會導致錯誤和不可預測的行為。

在下列範例中,伺服器串流方法可以在呼叫完成後寫入回應資料流:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

對於上述範例,解決方案是在結束該方法之前先等候寫入工作:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

其他資源