Compartir a través de


integración de .NET AspireApache Kafka

Incluye:integración de hospedaje y Client

Apache Kafka es una plataforma de streaming de eventos distribuidos de código abierto. Resulta útil para crear canalizaciones de datos en tiempo real y aplicaciones de streaming. La integración de .NET AspireApache Kafka permite conectarse a instancias de Kafka existentes o crear nuevas instancias desde .NET con la imagen de contenedor de docker.io/confluentinc/confluent-local.

Integración de hospedaje

El modelo de integración de Apache Kafka representa un servidor Kafka como el tipo KafkaServerResource. Para acceder a este tipo, instale el 📦Aspire.Hosting.Kafka paquete NuGet en el proyecto host de la aplicación y, a continuación, agréguelo con el generador.

dotnet add package Aspire.Hosting.Kafka

Para obtener más información, consulte dotnet add package o Administrar las dependencias de paquetes en las aplicaciones .NET.

Adición de un recurso de servidor de Kafka

En el proyecto host de la aplicación, llame a AddKafka en la instancia de builder para agregar un recurso de servidor de Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Cuando .NET.NET Aspire agrega una imagen de contenedor al host de la aplicación, como se muestra en el ejemplo anterior con la imagen de docker.io/confluentinc/confluent-local, crea una nueva instancia de servidor de Kafka en el equipo local. Se agrega una referencia al servidor de Kafka (la variable kafka) al ExampleProject. El recurso de servidor de Kafka incluye puertos predeterminados

El método WithReference configura una conexión en el ExampleProject denominado "kafka". Para obtener más información, consulte ciclo de vida de los recursos de un contenedor.

Propina

Si prefiere conectarse a un servidor de Kafka existente, llame a AddConnectionString en su lugar. Para obtener más información, vea Hacer referencia a los recursos existentes.

Adición de la interfaz de usuario de Kafka

Para agregar el de interfaz de usuario de Kafka al recurso del servidor de Kafka, llame al método :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

La interfaz de usuario de Kafka es una interfaz de usuario web gratuita de código abierto para supervisar y administrar clústeres de Apache Kafka. .NET .NET Aspire agrega otra imagen de contenedor docker.io/provectuslabs/kafka-ui al host de la aplicación que ejecuta la interfaz de usuario de Kafka.

Cambio del puerto de host de la interfaz de usuario de Kafka

Para cambiar el puerto de host de la interfaz de usuario de Kafka, encadene una llamada al método WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

La interfaz de usuario de Kafka es accesible en http://localhost:9100 en el ejemplo anterior.

Adición de un recurso de servidor de Kafka con volumen de datos

Para agregar un volumen de datos al recurso del servidor de Kafka, llame al método WithDataVolume en el recurso del servidor de Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

El volumen de datos se usa para conservar los datos del servidor de Kafka fuera del ciclo de vida de su contenedor. El volumen de datos se monta en la ruta de acceso /var/lib/kafka/data del contenedor del servidor de Kafka y, cuando no se proporciona un parámetro name, el nombre se genera de forma aleatoria. Para obtener más información sobre los volúmenes de datos y los detalles sobre por qué se prefieren a montajes de enlace, consulte Docker documentación: Volúmenes.

Adición de un recurso de servidor de Kafka con montaje de enlace de datos

Para agregar un montaje de enlace de datos al recurso del servidor de Kafka, llame al método WithDataBindMount.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Importante

Los montajes de datos de enlace tienen una funcionalidad limitada en comparación con los volúmenes , que ofrecen un mejor rendimiento, portabilidad y seguridad, lo que los hace más adecuados para entornos de producción. Sin embargo, los montajes de enlaces permiten el acceso directo y la modificación de archivos en el sistema anfitrión, lo cual es ideal para el desarrollo y las pruebas donde se necesitan cambios en tiempo real.

Los montajes de enlace de datos dependen del sistema de archivos del equipo host para conservar los datos del servidor Kafka en los reinicios del contenedor. El montaje del enlace de datos se monta en el C:\Kafka\Data en Windows (o /Kafka/Data en Unix) dentro del contenedor en la ruta de acceso del servidor de Kafka en el equipo host. Para obtener más información sobre los montajes de enlace de datos, consulte Docker documento: Montajes de enlace de datos.

Hospedaje de comprobaciones de estado de integración

La integración de hospedaje de Kafka agrega automáticamente una comprobación de estado para el recurso del servidor de Kafka. La comprobación de estado comprueba que un productor de Kafka con el nombre de conexión especificado puede conectarse y conservar un tema al servidor de Kafka.

La integración de hospedaje se basa en el paquete NuGet 📦 AspNetCore.HealthChecks.Kafka.

integración de Client

Para empezar a trabajar con la integración de .NET AspireApache Kafka, instale el 📦Aspire. Confluent.Kafka paquete NuGet en el proyecto que consume el cliente, es decir, el proyecto de la aplicación que usa el cliente de Apache Kafka.

dotnet add package Aspire.Confluent.Kafka

Agregar productor de Kafka

En el archivo Program.cs del proyecto cliente, llame al método de extensión AddKafkaProducer para registrar un IProducer<TKey, TValue> para su uso a través del contenedor de inyección de dependencias. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a enviar al agente. Estos parámetros genéricos se usan en AddKafkaProducer para crear una instancia de ProducerBuilder<TKey, TValue>. Este método también toma el parámetro de nombre de conexión.

builder.AddKafkaProducer<string, string>("messaging");

A continuación, puede obtener la instancia IProducer<TKey, TValue> mediante la inserción de dependencias. Por ejemplo, para recuperar el productor de un IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Para obtener más información sobre los trabajadores, consulte Servicios de trabajo en .NET.

Agregar un consumidor de Kafka

Para registrar un IConsumer<TKey, TValue> para su uso a través del contenedor de inserción de dependencias, llame al método de extensión AddKafkaConsumer en el archivo Program.cs del proyecto que consume el cliente. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a recibir del agente. Estos parámetros genéricos se usan en AddKafkaConsumer para crear una instancia de ConsumerBuilder<TKey, TValue>. Este método también toma el parámetro de nombre de conexión.

builder.AddKafkaConsumer<string, string>("messaging");

A continuación, puede obtener la instancia IConsumer<TKey, TValue> mediante la inserción de dependencias. Por ejemplo, para recuperar al consumidor de un IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Adición de productores o consumidores clave de Kafka

Puede haber situaciones en las que quiera registrar varias instancias de productor o consumidor con nombres de conexión diferentes. Para registrar productores o consumidores clave de Kafka, llame a la API adecuada.

Para obtener más información sobre los servicios con claves, consulte .NET inserción de dependencias: Servicios con claves.

Configuración

La integración de .NET AspireApache Kafka proporciona varias opciones para configurar la conexión en función de los requisitos y convenciones del proyecto.

Uso de una cadena de conexión

Al usar una cadena de conexión de la sección de configuración de ConnectionStrings, puede proporcionar el nombre de la cadena de conexión al llamar a builder.AddKafkaProducer() o builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

A continuación, la cadena de conexión se obtiene de la sección de configuración de ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

El valor de la cadena de conexión se establece en la propiedad BootstrapServers de la instancia de IProducer<TKey, TValue> o IConsumer<TKey, TValue> generada. Para obtener más información, vea BootstrapServers.

Uso de proveedores de configuración

La integración de .NET AspireApache Kafka admite Microsoft.Extensions.Configuration. Carga el KafkaProducerSettings o KafkaConsumerSettings desde la configuración usando respectivamente las claves Aspire:Confluent:Kafka:Producer y Aspire.Confluent:Kafka:Consumer. El fragmento de código siguiente es un ejemplo de un archivo appsettings.json que configura algunas de las opciones:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Las propiedades Config de las secciones de configuración de Aspire:Confluent:Kafka:Producer y Aspire.Confluent:Kafka:Consumer, respectivamente, se enlazan a instancias de ProducerConfig y ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requiere que se establezca la propiedad ClientId para permitir que el agente realice un seguimiento de los desplazamientos de mensajes consumidos.

Para obtener el esquema completo de integración de cliente de Kafka JSON, consulte Aspire. Confluent.Kafka/ConfigurationSchema.json.

Usa delegados en línea

Hay varios delegados en línea disponibles para configurar diferentes opciones.

Configuración deKafkaProducerSettings y KafkaConsumerSettings

Puede pasar el delegado Action<KafkaProducerSettings> configureSettings para configurar algunas o todas las opciones en línea, por ejemplo, para deshabilitar las verificaciones de salud desde el código.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Puede configurar un consumidor en línea directamente en el código:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configuración de ProducerBuilder<TKey, TValue> y ConsumerBuilder<TKey, TValue>

Para configurar los constructores de Confluent.Kafka, pase un Action<ProducerBuilder<TKey, TValue>> (o un Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Al registrar productores y consumidores, si necesita acceder a un servicio registrado en el contenedor DI, puede pasar un Action<IServiceProvider, ProducerBuilder<TKey, TValue>> o un Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>, respectivamente.

Considere el siguiente ejemplo de registro de productor:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Para más información, consulte la documentación de la API de ProducerBuilder<TKey, TValue> y la API de ConsumerBuilder<TKey, TValue>.

verificaciones de estado de integración Client

De forma predeterminada, las integraciones de .NET.NET Aspire habilitan las comprobaciones de estado de para todos los servicios. Para obtener más información, consulte .NET.NET Aspire integrations overview.

La integración de .NET AspireApache Kafka maneja los siguientes escenarios de verificación de salud:

  • Agrega la comprobación de estado Aspire.Confluent.Kafka.Producer cuando KafkaProducerSettings.DisableHealthChecks es false.
  • Agrega la comprobación de estado Aspire.Confluent.Kafka.Consumer cuando KafkaConsumerSettings.DisableHealthChecks es false.
  • Se integra con el punto de conexión HTTP de /health, que especifica que todas las comprobaciones de estado registradas deben pasar para que la aplicación se considere lista para aceptar el tráfico.

Observabilidad y telemetría

.NET .NET Aspire integraciones configuran automáticamente las configuraciones de registro, seguimiento y métricas, que a veces se conocen como los pilares de la observabilidad. Para obtener más información sobre la observabilidad de integración y la telemetría, consulte información general sobre las integraciones de .NET.NET Aspire. En función del servicio de respaldo, algunas integraciones solo pueden admitir algunas de estas características. Por ejemplo, algunas integraciones admiten el registro y el seguimiento, pero no las métricas. Las funciones de telemetría también se pueden deshabilitar mediante las técnicas presentadas en la sección de Configuración .

Registro

La integración de .NET AspireApache Kafka usa las siguientes categorías de registro:

  • Aspire.Confluent.Kafka

Rastreo

La integración de .NET AspireApache Kafka no emite trazas distribuidas.

Métricas

La integración de .NET AspireApache Kafka emite las métricas siguientes mediante OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received

Consulte también