.NET Aspire Apache Kafka 整合
Apache Kafka 是開放原始碼分散式事件串流平臺。 它適用於建置即時數據管線和串流應用程式。
.NET Aspire
Apache Kafka 整合可讓您連線到現有的 Kafka 實例,或使用 docker.io/confluentinc/confluent-local
,從 建立新的實例。
主機代管整合
Apache Kafka 伺服器中的整合會將 Kafka server 作為 KafkaServerResource 模型化類型。 若要存取此類型,請在 📦 專案中,安裝 Aspire。Hosting.Kafka NuGet 套件,然後在 builder 中加入該套件。
dotnet add package Aspire.Hosting.Kafka
如需詳細資訊,請參閱 dotnet add package 或 在 .NET 應用程式中管理套件相依性。
新增 Kafka server 資源
在您的應用程式主專案中,呼叫 AddKafka 實例上的 builder
,以新增 Kafka server 資源:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
當 .NET.NET Aspire 將容器映像新增至應用程式主機時,如上一個範例中使用的 docker.io/confluentinc/confluent-local
映像,它會在您的本機計算機上建立新的 Kafka server 實例。 Kafka server(kafka
變數)的引用已新增至 ExampleProject
。 Kafka server 資源包含預設埠
WithReference 方法會在名為 ExampleProject
的 "kafka"
中設定連接。 如需詳細資訊,請參閱 容器資源生命週期。
提示
如果您想要連線到現有的 Kafka server,請改為呼叫 AddConnectionString。 如需詳細資訊,請參閱 參考現有資源。
新增 Kafka UI
若要將 Kafka UI 新增至 Kafka server 資源,請呼叫 WithKafkaUI 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI 是免費的開放原始碼 Web UI,可監視和管理 Apache Kafka 叢集。
.NET
.NET Aspire 將另一個容器映像 docker.io/provectuslabs/kafka-ui
新增至執行 Kafka UI 的應用程式主機。
變更 Kafka UI 主機埠
若要變更 Kafka UI 主機埠,請鏈結對 WithHostPort 方法的呼叫:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI 可以在前述範例中的 http://localhost:9100
存取。
新增具有數據量的 Kafka server 資源
若要將數據磁碟區新增至 Kafka server 資源,請在 Kafka WithDataVolume 資源上呼叫 server 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
數據磁碟區用來將 Kafka server 資料保存在其容器生命週期之外。 資料卷被掛載於 Kafka /var/lib/kafka/data
容器中的 server 路徑,當未提供 name
參數時,名稱會隨機生成。 如需瞭解更多關於數據磁碟區的資訊,以及為何它們被優先於 系結掛接,請參閱 Docker 文件:磁碟區。
使用數據系結掛接新增 Kafka server 資源
若要將數據系結掛接新增至 Kafka server 資源,請呼叫 WithDataBindMount 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
數據系結掛接依賴主計算機的文件系統,在容器重新啟動時保存 Kafka server 數據。 資料綁定掛載點在 Kafka C:\Kafka\Data
容器的主機上的 /Kafka/Data
路徑(或 Unix的 server 路徑)。 如需資料系結掛接的詳細資訊,請參閱 Docker 檔:系結掛接。
主機整合健康檢查
Kafka 託管整合會自動新增 Kafka server 資源的健康檢查。 健康情況檢查會確認具有指定連接名稱的 Kafka 產生者能夠連線並儲存主題至 Kafka server。
此主機整合依賴 📦 AspNetCore.HealthChecks.Kafka NuGet 套件。
Client 整合
若要開始使用 .NET AspireApache Kafka 整合,請安裝 📦Aspire。Confluent.Kafkaclient-consuming 專案中的 NuGet 套件,也就是使用 Apache Kafkaclient的應用程式專案。
dotnet add package Aspire.Confluent.Kafka
新增 Kafka 產生者
在 Program.cs消費專案的 client 檔案中,調用 AddKafkaProducer 擴充方法來註冊 IProducer<TKey, TValue>
,以便通過相依性注入容器使用。 方法會採用兩個泛型參數,分別對應於索引鍵的類型以及要傳送至代理程式的訊息的類型。
AddKafkaProducer
會使用這些泛型參數來建立 ProducerBuilder<TKey, TValue>
實例。 此方法也會採用連接名稱參數。
builder.AddKafkaProducer<string, string>("messaging");
接著,您可以使用依賴注入來取得 IProducer<TKey, TValue>
實例。 例如,若要從 IHostedService
擷取產生者:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
如需有關工作者的更多資訊,請參閱
新增 Kafka 消費者
若要註冊 IConsumer<TKey, TValue>
以便透過相依性注入容器使用,請在 AddKafkaConsumer使用的專案中的 Program.cs 檔案中呼叫 client 擴充方法。 方法會使用兩個泛型參數,分別對應於索引鍵類型和從代理程式接收的訊息類型。
AddKafkaConsumer
會使用這些泛型參數來建立 ConsumerBuilder<TKey, TValue>
實例。 此方法也會採用連接名稱參數。
builder.AddKafkaConsumer<string, string>("messaging");
接著,您可以使用依賴注入來取得 IConsumer<TKey, TValue>
實例。 例如,若要從 IHostedService
擷取消費者:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
新增索引鍵 Kafka 產生者或取用者
在某些情況下,您可能想要以不同的連結名稱註冊多個生產者或消費者實例。 若要註冊金鑰化 Kafka 產生者或取用者,請呼叫適當的 API:
- AddKeyedKafkaProducer:註冊具有密鑰的 Kafka 製作人。
- AddKeyedKafkaConsumer:註冊具備鍵值的 Kafka 消費者。
如需鍵控服務的詳細資訊,請參閱 .NET 依賴注入:鍵控服務。
配置
.NET Aspire Apache Kafka 整合提供多個選項,可根據專案的需求和慣例來設定連線。
使用連接字串
從 [ConnectionStrings
組態] 區段使用連接字串時,您可以在呼叫 builder.AddKafkaProducer()
或 builder.AddKafkaProducer()
時提供連接字串的名稱:
builder.AddKafkaProducer<string, string>("kafka-producer");
然後,從 ConnectionStrings
組態區段擷取連接字串:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
連接字串值會設定為所產生 BootstrapServers
或 IProducer<TKey, TValue>
實例的 IConsumer<TKey, TValue>
屬性。 如需詳細資訊,請參閱 BootstrapServers。
使用組態提供者
.NET Aspire
Apache Kafka 整合支援 Microsoft.Extensions.Configuration。 它會分別使用 KafkaProducerSettings 和 KafkaConsumerSettings 金鑰,從組態載入 Aspire:Confluent:Kafka:Producer
或 Aspire.Confluent:Kafka:Consumer
。 下列代碼段是 appsettings.json 檔案的範例,可設定一些選項:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Config
和 Aspire:Confluent:Kafka:Producer
組態區段的 Aspire.Confluent:Kafka:Consumer
屬性分別系結至 ProducerConfig
和 ConsumerConfig
實例。
Confluent.Kafka.Consumer<TKey, TValue>
需要設定 ClientId
屬性,讓代理追蹤已取用的訊息位移。
如需完整的 Kafka client 整合 JSON 架構,請參閱 Aspire。Confluent.Kafka/ConfigurationSchema。json。
使用內聯委派
有數個內嵌委派可用來設定各種選項。
設定KafkaProducerSettings
和 KafkaConsumerSettings
您可以傳遞 Action<KafkaProducerSettings> configureSettings
委派來直接在程式中設定部分或全部選項,例如從程式碼中停用健康檢查:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
您可以從程式碼行內配置消費者:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
設定 ProducerBuilder<TKey, TValue>
和 ConsumerBuilder<TKey, TValue>
若要設定 Confluent.Kafka
產生器,請傳遞 Action<ProducerBuilder<TKey, TValue>>
(或 Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
在註冊生產者和消費者時,如果您需要存取在 DI 容器中註冊的服務,您可以分別傳遞 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
或 Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
。
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
請考慮下列生產者註冊範例:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
如需詳細資訊,請參閱 ProducerBuilder<TKey, TValue>
和 ConsumerBuilder<TKey, TValue>
API 檔。
Client 整合健康狀況檢查
根據預設,.NET.NET Aspire 整合會為所有服務啟用 健康情況檢查。 如需詳細資訊,請參閱 .NET.NET Aspire 整合概觀。
.NET Aspire Apache Kafka 整合會處理下列健康檢查情境:
- 當
Aspire.Confluent.Kafka.Producer
KafkaProducerSettings.DisableHealthChecks時,新增false
健康情況檢查。 - 當
Aspire.Confluent.Kafka.Consumer
KafkaConsumerSettings.DisableHealthChecks時,新增false
健康情況檢查。 - 與
/health
HTTP 端點整合,規定所有已註冊的健康檢查必須通過,應用程式才能被認為準備好接受流量。
可檢視性和遙測
.NET .NET Aspire 整合會自動設定記錄、追蹤和度量組態,有時稱為 可觀測性的要素。 如需整合可觀察性和遙測的詳細資訊,請參閱 .NET.NET Aspire 整合概觀。 視支援服務而定,某些整合可能只支援其中一些功能。 例如,某些整合支援記錄和追蹤,但不支援計量。 您也可以使用 組態 一節中呈現的技術來停用遙測功能。
伐木
.NET Aspire Apache Kafka 整合會使用下列記錄類別:
Aspire.Confluent.Kafka
追蹤
.NET Aspire Apache Kafka 整合不會發出分散式追蹤。
指標
.NET Aspire Apache Kafka 整合會使用 OpenTelemetry發出下列計量:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received