共用方式為


.NET Aspire Apache Kafka 整合

包括:主機整合Client 整合

Apache Kafka 是開放原始碼分散式事件串流平臺。 它適用於建置即時數據管線和串流應用程式。 .NET Aspire Apache Kafka 整合可讓您連線到現有的 Kafka 實例,或使用 docker.io/confluentinc/confluent-local,從 建立新的實例。

主機代管整合

Apache Kafka 伺服器中的整合會將 Kafka server 作為 KafkaServerResource 模型化類型。 若要存取此類型,請在 📦 專案中,安裝 Aspire。Hosting.Kafka NuGet 套件,然後在 builder 中加入該套件。

dotnet add package Aspire.Hosting.Kafka

如需詳細資訊,請參閱 dotnet add package在 .NET 應用程式中管理套件相依性

新增 Kafka server 資源

在您的應用程式主專案中,呼叫 AddKafka 實例上的 builder,以新增 Kafka server 資源:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

當 .NET.NET Aspire 將容器映像新增至應用程式主機時,如上一個範例中使用的 docker.io/confluentinc/confluent-local 映像,它會在您的本機計算機上建立新的 Kafka server 實例。 Kafka server(kafka 變數)的引用已新增至 ExampleProject。 Kafka server 資源包含預設埠

WithReference 方法會在名為 ExampleProject"kafka" 中設定連接。 如需詳細資訊,請參閱 容器資源生命週期

提示

如果您想要連線到現有的 Kafka server,請改為呼叫 AddConnectionString。 如需詳細資訊,請參閱 參考現有資源

新增 Kafka UI

若要將 Kafka UI 新增至 Kafka server 資源,請呼叫 WithKafkaUI 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI 是免費的開放原始碼 Web UI,可監視和管理 Apache Kafka 叢集。 .NET .NET Aspire 將另一個容器映像 docker.io/provectuslabs/kafka-ui 新增至執行 Kafka UI 的應用程式主機。

變更 Kafka UI 主機埠

若要變更 Kafka UI 主機埠,請鏈結對 WithHostPort 方法的呼叫:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI 可以在前述範例中的 http://localhost:9100 存取。

新增具有數據量的 Kafka server 資源

若要將數據磁碟區新增至 Kafka server 資源,請在 Kafka WithDataVolume 資源上呼叫 server 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

數據磁碟區用來將 Kafka server 資料保存在其容器生命週期之外。 資料卷被掛載於 Kafka /var/lib/kafka/data 容器中的 server 路徑,當未提供 name 參數時,名稱會隨機生成。 如需瞭解更多關於數據磁碟區的資訊,以及為何它們被優先於 系結掛接,請參閱 Docker 文件:磁碟區

使用數據系結掛接新增 Kafka server 資源

若要將數據系結掛接新增至 Kafka server 資源,請呼叫 WithDataBindMount 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

重要

相較於 磁碟區,數據 系結掛 接的功能有限,可提供更佳的效能、可移植性和安全性,使其更適合生產環境。 不過,綁定掛載允許直接存取和修改主機系統上的檔案,這對於需要即時變更的開發和測試非常適合。

數據系結掛接依賴主計算機的文件系統,在容器重新啟動時保存 Kafka server 數據。 資料綁定掛載點在 Kafka C:\Kafka\Data 容器的主機上的 /Kafka/Data 路徑(或 Unix的 server 路徑)。 如需資料系結掛接的詳細資訊,請參閱 Docker 檔:系結掛接

主機整合健康檢查

Kafka 託管整合會自動新增 Kafka server 資源的健康檢查。 健康情況檢查會確認具有指定連接名稱的 Kafka 產生者能夠連線並儲存主題至 Kafka server。

此主機整合依賴 📦 AspNetCore.HealthChecks.Kafka NuGet 套件。

Client 整合

若要開始使用 .NET AspireApache Kafka 整合,請安裝 📦Aspire。Confluent.Kafkaclient-consuming 專案中的 NuGet 套件,也就是使用 Apache Kafkaclient的應用程式專案。

dotnet add package Aspire.Confluent.Kafka

新增 Kafka 產生者

Program.cs消費專案的 client 檔案中,調用 AddKafkaProducer 擴充方法來註冊 IProducer<TKey, TValue>,以便通過相依性注入容器使用。 方法會採用兩個泛型參數,分別對應於索引鍵的類型以及要傳送至代理程式的訊息的類型。 AddKafkaProducer 會使用這些泛型參數來建立 ProducerBuilder<TKey, TValue>實例。 此方法也會採用連接名稱參數。

builder.AddKafkaProducer<string, string>("messaging");

接著,您可以使用依賴注入來取得 IProducer<TKey, TValue> 實例。 例如,若要從 IHostedService擷取產生者:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

如需有關工作者的更多資訊,請參閱 中的 工作者服務。

新增 Kafka 消費者

若要註冊 IConsumer<TKey, TValue> 以便透過相依性注入容器使用,請在 AddKafkaConsumer使用的專案中的 Program.cs 檔案中呼叫 client 擴充方法。 方法會使用兩個泛型參數,分別對應於索引鍵類型和從代理程式接收的訊息類型。 AddKafkaConsumer 會使用這些泛型參數來建立 ConsumerBuilder<TKey, TValue>實例。 此方法也會採用連接名稱參數。

builder.AddKafkaConsumer<string, string>("messaging");

接著,您可以使用依賴注入來取得 IConsumer<TKey, TValue> 實例。 例如,若要從 IHostedService擷取消費者:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

新增索引鍵 Kafka 產生者或取用者

在某些情況下,您可能想要以不同的連結名稱註冊多個生產者或消費者實例。 若要註冊金鑰化 Kafka 產生者或取用者,請呼叫適當的 API:

如需鍵控服務的詳細資訊,請參閱 .NET 依賴注入:鍵控服務

配置

.NET Aspire Apache Kafka 整合提供多個選項,可根據專案的需求和慣例來設定連線。

使用連接字串

從 [ConnectionStrings 組態] 區段使用連接字串時,您可以在呼叫 builder.AddKafkaProducer()builder.AddKafkaProducer()時提供連接字串的名稱:

builder.AddKafkaProducer<string, string>("kafka-producer");

然後,從 ConnectionStrings 組態區段擷取連接字串:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

連接字串值會設定為所產生 BootstrapServersIProducer<TKey, TValue> 實例的 IConsumer<TKey, TValue> 屬性。 如需詳細資訊,請參閱 BootstrapServers

使用組態提供者

.NET Aspire Apache Kafka 整合支援 Microsoft.Extensions.Configuration。 它會分別使用 KafkaProducerSettingsKafkaConsumerSettings 金鑰,從組態載入 Aspire:Confluent:Kafka:ProducerAspire.Confluent:Kafka:Consumer。 下列代碼段是 appsettings.json 檔案的範例,可設定一些選項:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

ConfigAspire:Confluent:Kafka:Producer 組態區段的 Aspire.Confluent:Kafka:Consumer 屬性分別系結至 ProducerConfigConsumerConfig實例。

Confluent.Kafka.Consumer<TKey, TValue> 需要設定 ClientId 屬性,讓代理追蹤已取用的訊息位移。

如需完整的 Kafka client 整合 JSON 架構,請參閱 Aspire。Confluent.Kafka/ConfigurationSchema。json

使用內聯委派

有數個內嵌委派可用來設定各種選項。

設定KafkaProducerSettingsKafkaConsumerSettings

您可以傳遞 Action<KafkaProducerSettings> configureSettings 委派來直接在程式中設定部分或全部選項,例如從程式碼中停用健康檢查:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

您可以從程式碼行內配置消費者:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
設定 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue>

若要設定 Confluent.Kafka 產生器,請傳遞 Action<ProducerBuilder<TKey, TValue>> (或 Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

在註冊生產者和消費者時,如果您需要存取在 DI 容器中註冊的服務,您可以分別傳遞 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>

請考慮下列生產者註冊範例:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

如需詳細資訊,請參閱 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue> API 檔。

Client 整合健康狀況檢查

根據預設,.NET.NET Aspire 整合會為所有服務啟用 健康情況檢查。 如需詳細資訊,請參閱 .NET.NET Aspire 整合概觀

.NET Aspire Apache Kafka 整合會處理下列健康檢查情境:

可檢視性和遙測

.NET .NET Aspire 整合會自動設定記錄、追蹤和度量組態,有時稱為 可觀測性的要素。 如需整合可觀察性和遙測的詳細資訊,請參閱 .NET.NET Aspire 整合概觀。 視支援服務而定,某些整合可能只支援其中一些功能。 例如,某些整合支援記錄和追蹤,但不支援計量。 您也可以使用 組態 一節中呈現的技術來停用遙測功能。

伐木

.NET Aspire Apache Kafka 整合會使用下列記錄類別:

  • Aspire.Confluent.Kafka

追蹤

.NET Aspire Apache Kafka 整合不會發出分散式追蹤。

指標

.NET Aspire Apache Kafka 整合會使用 OpenTelemetry發出下列計量:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

另請參閱