.NET Aspire Apache Kafka 集成

包括:托管集成Client 集成

Apache Kafka 是开源分布式事件流式处理平台。 它可用于生成实时数据管道和流式处理应用程序。 .NET Aspire Apache Kafka 集成使你能够连接到现有的 Kafka 实例,或使用 .NET从 docker.io/confluentinc/confluent-local 创建新实例。

托管集成

Apache Kafka 托管集成将 Kafka 服务器建模为 KafkaServerResource 类型。 若要访问此类型,请安装 📦Aspire。hosting.Kafka应用主机 项目中的 NuGet 包,然后将其添加到生成器中。

dotnet add package Aspire.Hosting.Kafka

有关详细信息,请参阅 dotnet add package管理 .NET 应用程序中的包依赖项

添加 Kafka 服务器资源

在应用主机项目中,在 builder 实例上调用 AddKafka 以添加 Kafka 服务器资源:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

当 .NET.NET Aspire 向应用主机添加容器映像时,如上例中所示的 docker.io/confluentinc/confluent-local 映像,它会在本地计算机上创建新的 Kafka 服务器实例。 对 Kafka 服务器的引用(kafka 变量)将添加到 ExampleProject。 Kafka 服务器资源包括默认端口

WithReference 方法在 ExampleProject 中配置一个名为 "kafka"的连接。 有关详细信息,请参阅 容器资源生命周期

提示

如果想要连接到现有的 Kafka 服务器,请改为调用 AddConnectionString。 有关详细信息,请参阅 引用现有资源

添加 Kafka UI

若要将 Kafka UI 添加到 Kafka 服务器资源,请调用 WithKafkaUI 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI 是一个免费的开源 Web UI,用于监视和管理 Apache Kafka 群集。 .NET .NET Aspire 向运行 Kafka UI 的应用主机添加另一个容器映像 docker.io/provectuslabs/kafka-ui

更改 Kafka UI 主机端口

若要更改 Kafka UI 主机端口,请连链调用对 WithHostPort 方法的调用:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

在前面的示例中,Kafka UI 可以在 http://localhost:9100 访问。

使用数据卷添加 Kafka 服务器资源

若要向 Kafka 服务器资源添加数据卷,请在 Kafka 服务器资源上调用 WithDataVolume 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

数据卷用于在容器的生命周期之外保留 Kafka 服务器数据。 数据卷装载在 Kafka 服务器容器中的 /var/lib/kafka/data 路径上,如果未提供 name 参数,则会随机生成名称。 有关数据卷的详细信息,以及它们为何优先于 绑定装载的详细信息,请参阅 Docker 文档:卷

使用数据绑定挂载添加 Kafka 服务器资源

若要将数据绑定装载添加到 Kafka 服务器资源,请调用 WithDataBindMount 方法:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

重要

相比,绑定 装载的数据 功能有限,从而提供更好的性能、可移植性和安全性,使它们更适用于生产环境。 但是,绑定装载允许直接访问和修改主机系统上的文件,非常适合在需要实时更改的情况下进行开发和测试。

数据绑定装载依赖于主机的文件系统在容器重启时保留 Kafka 服务器数据。 数据绑定装载安装在主机上的 Kafka 服务器容器中,路径为 Windows 的 C:\Kafka\Data(或 Unix上的 /Kafka/Data)。 有关数据绑定装载的详细信息,请参阅 Docker 文档:绑定装载

托管集成运行状况检查

Kafka 托管集成会自动为 Kafka 服务器资源添加运行状况检查。 运行状况检查验证具有指定连接名称的 Kafka 生成者能否连接到 Kafka 服务器并将主题保存到 Kafka 服务器。

托管集成依赖于 📦 AspNetCore.HealthChecks.Kafka NuGet 包。

Client 集成

若要开始 .NET AspireApache Kafka 集成,请在使用 Apache Kafka 客户端的应用程序的项目中安装 📦Aspire.Confluent.Kafka NuGet 包。

dotnet add package Aspire.Confluent.Kafka

添加 Kafka 生成者

在客户端项目的 Program.cs 文件中,调用 AddKafkaProducer 扩展方法注册 IProducer<TKey, TValue>,以便通过依赖注入容器使用。 该方法采用两个对应于键类型的泛型参数和要发送到中转站的消息的类型。 AddKafkaProducer 使用这些泛型参数来创建 ProducerBuilder<TKey, TValue>实例。 此方法还采用连接名称参数。

builder.AddKafkaProducer<string, string>("messaging");

然后,可以使用依赖项注入检索 IProducer<TKey, TValue> 实例。 例如,若要从 IHostedService检索生成者:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

有关工人的详细信息,请参阅 中的 工人服务。

添加 Kafka 使用者

若要注册 IConsumer<TKey, TValue> 以通过依赖项注入容器使用,请在客户端使用项目的 Program.cs 文件中调用 AddKafkaConsumer 扩展方法。 该方法采用两个对应于键类型的泛型参数,以及要从中转站接收的消息的类型。 AddKafkaConsumer 使用这些泛型参数来创建 ConsumerBuilder<TKey, TValue>实例。 此方法还采用连接名称参数。

builder.AddKafkaConsumer<string, string>("messaging");

然后,可以使用依赖项注入检索 IConsumer<TKey, TValue> 实例。 例如,若要从 IHostedService检索使用者:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

添加关键 Kafka 制作者或使用者

在某些情况下,你可能想要使用不同的连接名称注册多个生成者或使用者实例。 若要注册密钥 Kafka 生成者或使用者,请调用相应的 API:

有关密钥服务的详细信息,请参阅 .NET 依赖项注入:键式服务

配置

.NET Aspire Apache Kafka 集成提供了多个选项,用于根据项目的要求和约定配置连接。

使用连接字符串

使用 ConnectionStrings 配置部分中的连接字符串时,可以在调用 builder.AddKafkaProducer()builder.AddKafkaProducer()时提供连接字符串的名称:

builder.AddKafkaProducer<string, string>("kafka-producer");

然后,从 ConnectionStrings 配置部分检索连接字符串:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

连接字符串值设置为生成的 BootstrapServersIProducer<TKey, TValue> 实例的 IConsumer<TKey, TValue> 属性。 有关详细信息,请参阅 BootstrapServers

使用配置提供程序

.NET Aspire Apache Kafka 集成支持 Microsoft.Extensions.Configuration。 它使用 KafkaProducerSettingsKafkaConsumerSettings 键分别从配置加载 Aspire:Confluent:Kafka:ProducerAspire.Confluent:Kafka:Consumer。 以下代码片段是一个 appsettings.json 文件的示例,用于配置某些选项:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

ConfigAspire:Confluent:Kafka:Producer 配置部分的 Aspire.Confluent:Kafka:Consumer 属性分别绑定到 ProducerConfigConsumerConfig实例。

Confluent.Kafka.Consumer<TKey, TValue> 要求将 ClientId 属性设置为允许中转站跟踪已使用的消息偏移量。

有关完整的 Kafka 客户端集成 JSON 架构,请参阅 Aspire。Confluent.Kafka/ConfigurationSchema.json

使用内联委托

有多个内联代理可用于配置各种选项。

配置KafkaProducerSettingsKafkaConsumerSettings

可以传递 Action<KafkaProducerSettings> configureSettings 委托来设置一些或所有内联选项,例如禁用代码中的运行状况检查:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

可以在代码中直接配置消费者:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
配置 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue>

若要配置 Confluent.Kafka 生成器,请传递 Action<ProducerBuilder<TKey, TValue>>(或 Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

注册生产者和消费者时,如果需要访问在 DI 容器中注册的服务,可以分别传递 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>

请考虑以下生产者注册示例:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

有关详细信息,请参阅 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue> API 文档。

Client 集成健康检查

默认情况下,.NET.NET Aspire 集成为所有服务启用 健康检查。 有关详细信息,请参阅 .NET.NET Aspire 集成概述

.NET Aspire Apache Kafka 集成负责处理以下健康检查场景:

可观测性和遥测

.NET .NET Aspire 集成会自动设置日志记录、跟踪和指标配置,它们有时被称为 可观测性的三大支柱。 有关集成可观测性和遥测的详细信息,请参阅 .NET.NET Aspire 集成概述。 根据支持服务,某些集成可能仅支持其中一些功能。 例如,某些集成支持日志记录和跟踪,但不支持指标。 也可以使用 配置 部分中介绍的技术禁用遥测功能。

伐木

.NET Aspire Apache Kafka 集成使用以下日志类别:

  • Aspire.Confluent.Kafka

追踪

.NET Aspire Apache Kafka 集成不会发出分布式追踪信息。

指标

.NET Aspire Apache Kafka 集成通过 OpenTelemetry发出以下指标:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

另请参阅