.NET Aspire Apache Kafka 集成
Apache Kafka 是开源分布式事件流式处理平台。 它可用于生成实时数据管道和流式处理应用程序。
.NET Aspire
Apache Kafka 集成使你能够连接到现有的 Kafka 实例,或使用 .NET从 docker.io/confluentinc/confluent-local
创建新实例。
托管集成
Apache Kafka 托管集成将 Kafka 服务器建模为 KafkaServerResource 类型。 若要访问此类型,请安装 📦Aspire。hosting.Kafka应用主机 项目中的 NuGet 包,然后将其添加到生成器中。
dotnet add package Aspire.Hosting.Kafka
有关详细信息,请参阅 dotnet add package 或 管理 .NET 应用程序中的包依赖项。
添加 Kafka 服务器资源
在应用主机项目中,在 builder
实例上调用 AddKafka 以添加 Kafka 服务器资源:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
当 .NET.NET Aspire 向应用主机添加容器映像时,如上例中所示的 docker.io/confluentinc/confluent-local
映像,它会在本地计算机上创建新的 Kafka 服务器实例。 对 Kafka 服务器的引用(kafka
变量)将添加到 ExampleProject
。 Kafka 服务器资源包括默认端口
WithReference 方法在 ExampleProject
中配置一个名为 "kafka"
的连接。 有关详细信息,请参阅 容器资源生命周期。
提示
如果想要连接到现有的 Kafka 服务器,请改为调用 AddConnectionString。 有关详细信息,请参阅 引用现有资源。
添加 Kafka UI
若要将 Kafka UI 添加到 Kafka 服务器资源,请调用 WithKafkaUI 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI 是一个免费的开源 Web UI,用于监视和管理 Apache Kafka 群集。
.NET
.NET Aspire 向运行 Kafka UI 的应用主机添加另一个容器映像 docker.io/provectuslabs/kafka-ui
。
更改 Kafka UI 主机端口
若要更改 Kafka UI 主机端口,请连链调用对 WithHostPort 方法的调用:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
在前面的示例中,Kafka UI 可以在 http://localhost:9100
访问。
使用数据卷添加 Kafka 服务器资源
若要向 Kafka 服务器资源添加数据卷,请在 Kafka 服务器资源上调用 WithDataVolume 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
数据卷用于在容器的生命周期之外保留 Kafka 服务器数据。 数据卷装载在 Kafka 服务器容器中的 /var/lib/kafka/data
路径上,如果未提供 name
参数,则会随机生成名称。 有关数据卷的详细信息,以及它们为何优先于 绑定装载的详细信息,请参阅 Docker 文档:卷。
使用数据绑定挂载添加 Kafka 服务器资源
若要将数据绑定装载添加到 Kafka 服务器资源,请调用 WithDataBindMount 方法:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
重要
与
数据绑定装载依赖于主机的文件系统在容器重启时保留 Kafka 服务器数据。 数据绑定装载安装在主机上的 Kafka 服务器容器中,路径为 Windows 的 C:\Kafka\Data
(或 Unix上的 /Kafka/Data
)。 有关数据绑定装载的详细信息,请参阅 Docker 文档:绑定装载。
托管集成运行状况检查
Kafka 托管集成会自动为 Kafka 服务器资源添加运行状况检查。 运行状况检查验证具有指定连接名称的 Kafka 生成者能否连接到 Kafka 服务器并将主题保存到 Kafka 服务器。
托管集成依赖于 📦 AspNetCore.HealthChecks.Kafka NuGet 包。
Client 集成
若要开始 .NET AspireApache Kafka 集成,请在使用 Apache Kafka 客户端的应用程序的项目中安装 📦Aspire.Confluent.Kafka NuGet 包。
dotnet add package Aspire.Confluent.Kafka
添加 Kafka 生成者
在客户端项目的 Program.cs 文件中,调用 AddKafkaProducer 扩展方法注册 IProducer<TKey, TValue>
,以便通过依赖注入容器使用。 该方法采用两个对应于键类型的泛型参数和要发送到中转站的消息的类型。
AddKafkaProducer
使用这些泛型参数来创建 ProducerBuilder<TKey, TValue>
实例。 此方法还采用连接名称参数。
builder.AddKafkaProducer<string, string>("messaging");
然后,可以使用依赖项注入检索 IProducer<TKey, TValue>
实例。 例如,若要从 IHostedService
检索生成者:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
有关工人的详细信息,请参阅
添加 Kafka 使用者
若要注册 IConsumer<TKey, TValue>
以通过依赖项注入容器使用,请在客户端使用项目的 Program.cs 文件中调用 AddKafkaConsumer 扩展方法。 该方法采用两个对应于键类型的泛型参数,以及要从中转站接收的消息的类型。
AddKafkaConsumer
使用这些泛型参数来创建 ConsumerBuilder<TKey, TValue>
实例。 此方法还采用连接名称参数。
builder.AddKafkaConsumer<string, string>("messaging");
然后,可以使用依赖项注入检索 IConsumer<TKey, TValue>
实例。 例如,若要从 IHostedService
检索使用者:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
添加关键 Kafka 制作者或使用者
在某些情况下,你可能想要使用不同的连接名称注册多个生成者或使用者实例。 若要注册密钥 Kafka 生成者或使用者,请调用相应的 API:
- AddKeyedKafkaProducer:注册带键的 Kafka 生产者。
- AddKeyedKafkaConsumer:注册关键 Kafka 使用者。
有关密钥服务的详细信息,请参阅 .NET 依赖项注入:键式服务。
配置
.NET Aspire Apache Kafka 集成提供了多个选项,用于根据项目的要求和约定配置连接。
使用连接字符串
使用 ConnectionStrings
配置部分中的连接字符串时,可以在调用 builder.AddKafkaProducer()
或 builder.AddKafkaProducer()
时提供连接字符串的名称:
builder.AddKafkaProducer<string, string>("kafka-producer");
然后,从 ConnectionStrings
配置部分检索连接字符串:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
连接字符串值设置为生成的 BootstrapServers
或 IProducer<TKey, TValue>
实例的 IConsumer<TKey, TValue>
属性。 有关详细信息,请参阅 BootstrapServers。
使用配置提供程序
.NET Aspire
Apache Kafka 集成支持 Microsoft.Extensions.Configuration。 它使用 KafkaProducerSettings 和 KafkaConsumerSettings 键分别从配置加载 Aspire:Confluent:Kafka:Producer
和 Aspire.Confluent:Kafka:Consumer
。 以下代码片段是一个 appsettings.json 文件的示例,用于配置某些选项:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Config
和 Aspire:Confluent:Kafka:Producer
配置部分的 Aspire.Confluent:Kafka:Consumer
属性分别绑定到 ProducerConfig
和 ConsumerConfig
实例。
Confluent.Kafka.Consumer<TKey, TValue>
要求将 ClientId
属性设置为允许中转站跟踪已使用的消息偏移量。
有关完整的 Kafka 客户端集成 JSON 架构,请参阅 Aspire。Confluent.Kafka/ConfigurationSchema.json。
使用内联委托
有多个内联代理可用于配置各种选项。
配置KafkaProducerSettings
和 KafkaConsumerSettings
可以传递 Action<KafkaProducerSettings> configureSettings
委托来设置一些或所有内联选项,例如禁用代码中的运行状况检查:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
可以在代码中直接配置消费者:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
配置 ProducerBuilder<TKey, TValue>
和 ConsumerBuilder<TKey, TValue>
若要配置 Confluent.Kafka
生成器,请传递 Action<ProducerBuilder<TKey, TValue>>
(或 Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
注册生产者和消费者时,如果需要访问在 DI 容器中注册的服务,可以分别传递 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
或 Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
请考虑以下生产者注册示例:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
有关详细信息,请参阅 ProducerBuilder<TKey, TValue>
和 ConsumerBuilder<TKey, TValue>
API 文档。
Client 集成健康检查
默认情况下,.NET.NET Aspire 集成为所有服务启用 健康检查。 有关详细信息,请参阅 .NET.NET Aspire 集成概述。
.NET Aspire Apache Kafka 集成负责处理以下健康检查场景:
- 添加
Aspire.Confluent.Kafka.Producer
运行状况检查,当 KafkaProducerSettings.DisableHealthChecks 是false
时。 - 添加
Aspire.Confluent.Kafka.Consumer
运行状况检查,当 KafkaConsumerSettings.DisableHealthChecks 是false
时。 - 与
/health
HTTP 终结点集成,该终结点指定所有已注册的健康检查都必须通过,才能让应用被视为准备好接受流量。
可观测性和遥测
.NET .NET Aspire 集成会自动设置日志记录、跟踪和指标配置,它们有时被称为 可观测性的三大支柱。 有关集成可观测性和遥测的详细信息,请参阅 .NET.NET Aspire 集成概述。 根据支持服务,某些集成可能仅支持其中一些功能。 例如,某些集成支持日志记录和跟踪,但不支持指标。 也可以使用 配置 部分中介绍的技术禁用遥测功能。
伐木
.NET Aspire Apache Kafka 集成使用以下日志类别:
Aspire.Confluent.Kafka
追踪
.NET Aspire Apache Kafka 集成不会发出分布式追踪信息。
指标
.NET Aspire Apache Kafka 集成通过 OpenTelemetry发出以下指标:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received