共用方式為


.NET Aspire Apache Kafka 整合

包括:主機整合Client 整合

Apache Kafka 是開放原始碼分散式事件串流平臺。 它適用於建置即時數據管線和串流應用程式。 .NET Aspire Apache Kafka 整合可讓您連線到現有的 Kafka 實例,或使用 .NET,從 docker.io/confluentinc/confluent-local 建立新的實例。

主機代管整合

整合托管的 Apache Kafka 將 Kafka 伺服器建模為 KafkaServerResource 類型。 若要存取此類型,請在 📦 專案中,安裝 Aspire。Hosting.Kafka NuGet 套件,然後在 builder 中加入該套件。

dotnet add package Aspire.Hosting.Kafka

如需詳細資訊,請參閱 dotnet add package在 .NET 應用程式中管理套件相依性

新增 Kafka 伺服器資源

在應用程式主機專案中,呼叫 builder 實例上的 AddKafka 以新增 Kafka 伺服器資源:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

當 .NET.NET Aspire 將容器映射新增至應用程式主機時,如上述範例中的 docker.io/confluentinc/confluent-local 映像所示,它會在本機計算機上建立新的 Kafka 伺服器實例。 Kafka 伺服器的參考(kafka 變數)被添加到 ExampleProject。 Kafka 伺服器資源包含預設埠

WithReference 方法會在名為 ExampleProject"kafka" 中設定連接。 如需詳細資訊,請參閱 容器資源生命週期

提示

如果您想要連線到現有的 Kafka 伺服器,請改為呼叫 AddConnectionString。 如需詳細資訊,請參閱 參考現有資源

新增 Kafka UI

若要將 Kafka UI 新增至 Kafka 伺服器資源,請呼叫 WithKafkaUI 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI 是免費的開放原始碼 Web UI,可監視和管理 Apache Kafka 叢集。 .NET .NET Aspire 將另一個容器映像 docker.io/provectuslabs/kafka-ui 新增至執行 Kafka UI 的應用程式主機。

變更 Kafka UI 主機埠

若要變更 Kafka UI 主機埠,請鏈結對 WithHostPort 方法的呼叫:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI 可以在前述範例中的 http://localhost:9100 存取。

使用數據磁碟區新增 Kafka 伺服器資源

若要將數據磁碟區新增至 Kafka 伺服器資源,請在 Kafka 伺服器資源上呼叫 WithDataVolume 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

數據磁碟區用來將 Kafka 伺服器資料保存在其容器生命週期之外。 資料卷會掛接在 Kafka 伺服器容器中的 /var/lib/kafka/data 路徑,且未提供 name 參數時,名稱會隨機產生。 如需瞭解更多關於數據磁碟區的資訊,以及為何它們被優先於 系結掛接,請參閱 Docker 文件:磁碟區

新增 Kafka 伺服器資源並使用資料綁定掛載

若要將數據系結掛接新增至 Kafka 伺服器資源,請呼叫 WithDataBindMount 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

重要

相較於 磁碟區,數據 系結掛 接的功能有限,可提供更佳的效能、可移植性和安全性,使其更適合生產環境。 不過,綁定掛載允許直接存取和修改主機系統上的檔案,這對於需要即時變更的開發和測試非常適合。

數據系結掛接依賴主計算機的文件系統,在容器重新啟動時保存 Kafka 伺服器數據。 在 Kafka 伺服器的容器中,數據綁定掛載會被掛載在主機的 Windows 路徑 C:\Kafka\Data(或 Unix的 /Kafka/Data)上。 如需資料系結掛接的詳細資訊,請參閱 Docker 檔:系結掛接

主機整合健康檢查

Kafka 託管整合會自動新增 Kafka 伺服器資源的健康檢查。 健康情況檢查會確認具有指定連接名稱的 Kafka 產生者能夠連線並保存主題至 Kafka 伺服器。

此主機整合依賴 📦 AspNetCore.HealthChecks.Kafka NuGet 套件。

Client 整合

若要開始使用 .NET AspireApache Kafka 整合,請在使用 Apache Kafka 用戶端的應用程式專案中安裝 📦AspireConfluent.Kafka NuGet 套件。

dotnet add package Aspire.Confluent.Kafka

新增 Kafka 產生者

在您的用戶端取用專案的 Program.cs 檔案中,呼叫 AddKafkaProducer 擴充方法來註冊一個 IProducer<TKey, TValue>,以透過依賴注入容器使用。 方法會採用兩個泛型參數,分別對應於索引鍵的類型以及要傳送至代理程式的訊息的類型。 AddKafkaProducer 會使用這些泛型參數來建立 ProducerBuilder<TKey, TValue>實例。 此方法也會採用連接名稱參數。

builder.AddKafkaProducer<string, string>("messaging");

接著,您可以使用依賴注入來取得 IProducer<TKey, TValue> 實例。 例如,若要從 IHostedService擷取產生者:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

如需有關工作者的更多資訊,請參閱 中的 工作者服務。

新增 Kafka 消費者

若要註冊要透過相依性插入容器使用的 IConsumer<TKey, TValue>,請在用戶端取用專案的 Program.cs 檔案中呼叫 AddKafkaConsumer 擴充方法。 方法會使用兩個泛型參數,分別對應於索引鍵類型和從代理程式接收的訊息類型。 AddKafkaConsumer 會使用這些泛型參數來建立 ConsumerBuilder<TKey, TValue>實例。 此方法也會採用連接名稱參數。

builder.AddKafkaConsumer<string, string>("messaging");

接著,您可以使用依賴注入來取得 IConsumer<TKey, TValue> 實例。 例如,若要從 IHostedService擷取消費者:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

新增索引鍵 Kafka 產生者或取用者

在某些情況下,您可能想要以不同的連結名稱註冊多個生產者或消費者實例。 若要註冊金鑰化 Kafka 產生者或取用者,請呼叫適當的 API:

如需鍵控服務的詳細資訊,請參閱 .NET 依賴注入:鍵控服務

配置

.NET Aspire Apache Kafka 整合提供多個選項,可根據專案的需求和慣例來設定連線。

使用連接字串

從 [ConnectionStrings 組態] 區段使用連接字串時,您可以在呼叫 builder.AddKafkaProducer()builder.AddKafkaProducer()時提供連接字串的名稱:

builder.AddKafkaProducer<string, string>("kafka-producer");

然後,從 ConnectionStrings 組態區段擷取連接字串:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

連接字串值會設定為所產生 BootstrapServersIProducer<TKey, TValue> 實例的 IConsumer<TKey, TValue> 屬性。 如需詳細資訊,請參閱 BootstrapServers

使用組態提供者

.NET Aspire Apache Kafka 整合支援 Microsoft.Extensions.Configuration。 它會分別使用 KafkaProducerSettingsKafkaConsumerSettings 金鑰,從組態載入 Aspire:Confluent:Kafka:ProducerAspire.Confluent:Kafka:Consumer。 下列代碼段是 appsettings.json 檔案的範例,可設定一些選項:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

ConfigAspire:Confluent:Kafka:Producer 組態區段的 Aspire.Confluent:Kafka:Consumer 屬性分別系結至 ProducerConfigConsumerConfig實例。

Confluent.Kafka.Consumer<TKey, TValue> 需要設定 ClientId 屬性,讓代理追蹤已取用的訊息位移。

如需完整的 Kafka 用戶端整合 JSON 架構,請參閱 Aspire。Confluent.Kafka/ConfigurationSchema.json

使用內聯委派

有數個內嵌委派可用來設定各種選項。

設定KafkaProducerSettingsKafkaConsumerSettings

您可以傳遞 Action<KafkaProducerSettings> configureSettings 委派來直接在程式中設定部分或全部選項,例如從程式碼中停用健康檢查:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

您可以從程式碼行內配置消費者:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
設定 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue>

若要設定 Confluent.Kafka 產生器,請傳遞 Action<ProducerBuilder<TKey, TValue>> (或 Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

在註冊生產者和消費者時,如果您需要存取在 DI 容器中註冊的服務,您可以分別傳遞 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>

請考慮下列生產者註冊範例:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

如需詳細資訊,請參閱 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue> API 檔。

Client 整合健康狀況檢查

根據預設,.NET.NET Aspire 整合會為所有服務啟用 健康情況檢查。 如需詳細資訊,請參閱 .NET.NET Aspire 整合概觀

.NET Aspire Apache Kafka 整合會處理下列健康檢查情境:

可檢視性和遙測

.NET .NET Aspire 整合會自動設定記錄、追蹤和度量組態,有時稱為 可觀測性的要素。 如需整合可觀察性和遙測的詳細資訊,請參閱 .NET.NET Aspire 整合概觀。 視支援服務而定,某些整合可能只支援其中一些功能。 例如,某些整合支援記錄和追蹤,但不支援計量。 您也可以使用 組態 一節中呈現的技術來停用遙測功能。

伐木

.NET Aspire Apache Kafka 整合會使用下列記錄類別:

  • Aspire.Confluent.Kafka

追蹤

.NET Aspire Apache Kafka 整合不會發出分散式追蹤。

指標

.NET Aspire Apache Kafka 整合會使用 OpenTelemetry發出下列計量:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

另請參閱