Compartilhar via


integração .NET AspireApache Kafka

Inclui: de integração de hospedagem e Client de integração

Apache Kafka é uma plataforma de streaming de eventos distribuídos de software livre. É útil para criar pipelines de dados em tempo real e aplicativos de streaming. A integração .NET AspireApache Kafka permite que você se conecte a instâncias existentes do Kafka ou crie novas instâncias de .NET com a imagem de contêiner docker.io/confluentinc/confluent-local.

Integração de hospedagem

O Apache Kafka modela a integração de hospedagem de um Kafka server como o tipo KafkaServerResource. Para acessar esse tipo, instale o pacote NuGet 📦Aspire.Hosting.Kafka no projeto de host do aplicativo e, depois, adicione-o ao construtor.

dotnet add package Aspire.Hosting.Kafka

Para obter mais informações, consulte dotnet add package ou Gerenciar dependências de pacotes em aplicativos .NET.

Adicionar o recurso server do Kafka

No projeto de host do aplicativo, chame AddKafka na instância de builder para adicionar um recurso de Kafka server.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Quando .NET.NET Aspire adiciona uma imagem de contêiner ao host do aplicativo, conforme mostrado no exemplo anterior com a imagem docker.io/confluentinc/confluent-local, ele cria uma nova instância do Kafka server em seu computador local. Uma referência a server Kafka (a variável kafka) é adicionada ao ExampleProject. O recurso server kafka inclui portas padrão

O método WithReference configura uma conexão no ExampleProject denominado "kafka". Para obter mais informações, consulte o ciclo de vida do recurso contêiner .

Dica

Se você preferir se conectar a um serverkafka existente, chame AddConnectionString em vez disso. Para mais informações, consulte Recursos existentes de referência.

Adicionar interface do usuário do Kafka

Para adicionar a interface do usuário do Kafka ao recurso server do Kafka, chame o método WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

A UI do Kafka é uma interface web de código aberto e gratuita para monitorar e gerenciar clusters de Apache Kafka. .NET .NET Aspire adiciona outra imagem de contêiner docker.io/provectuslabs/kafka-ui ao host do aplicativo que executa a interface do usuário do Kafka.

Alterar a porta de host da interface do usuário do Kafka

Para alterar a porta de host da interface do usuário do Kafka, encadeia uma chamada para o método WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

A interface do usuário do Kafka é acessível em http://localhost:9100 no exemplo anterior.

Adicionar o recurso Kafka server com volume de dados

Para adicionar um volume de dados ao recurso server kafka, chame o método WithDataVolume no recurso server kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

O volume de dados é usado para persistir os dados Kafka server fora do ciclo de vida de seu contêiner. O volume de dados é montado no caminho /var/lib/kafka/data no contêiner server Kafka e, quando um parâmetro name não é fornecido, o nome é gerado aleatoriamente. Para obter mais informações sobre volumes de dados e detalhes sobre por que eles são preferenciais em vez de associar montagens, consulte Docker documentos: Volumes.

Adicionar o recurso Kafka server com montagem de dados

Para adicionar um vínculo de montagem de dados ao recurso Kafka server, chame o método WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Importante

Os de dados associam montagens têm funcionalidade limitada em comparação com volumes, que oferecem melhor desempenho, portabilidade e segurança, tornando-os mais adequados para ambientes de produção. No entanto, os bind mounts permitem acesso direto e modificação de arquivos no sistema host, ideal para desenvolvimento e teste onde as alterações em tempo real são necessárias.

As montagens de associação de dados dependem do sistema de arquivos do computador host para persistir os dados do Kafka server entre reinicializações de contêiner. A montagem da associação de dados é montada no caminho C:\Kafka\Data no Windows (ou /Kafka/Data em Unix) no computador host no contêiner do server Kafka. Para obter mais informações sobre montagens de associação de dados, consulte Docker documentos: Associar montagens.

Verificações de integridade de integração de hospedagem

A integração de hospedagem do Kafka adiciona automaticamente uma verificação de integridade para o recurso Kafka server. A verificação de integridade verifica se um produtor Kafka com o nome de conexão especificado é capaz de se conectar ao Kafka e armazenar um tópico nele server.

A integração de hospedagem depende do pacote NuGet 📦 AspNetCore.HealthChecks.Kafka.

integração Client

Para começar a usar a integração .NET AspireApache Kafka, instale o pacote NuGet 📦Aspire.Confluent.Kafka no projeto que consome o client, ou seja, o projeto do aplicativo que usa o Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Adicionar produtor Kafka

No arquivo Program.cs do seu projeto consumidor de client, chame o método de extensão AddKafkaProducer para registrar um IProducer<TKey, TValue> para ser usado através do contêiner de injeção de dependência. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a ser enviada ao agente. Esses parâmetros genéricos são usados por AddKafkaProducer para criar uma instância de ProducerBuilder<TKey, TValue>. Esse método também usa o parâmetro de nome de conexão.

builder.AddKafkaProducer<string, string>("messaging");

Em seguida, você pode recuperar a instância de IProducer<TKey, TValue> usando a injeção de dependência. Por exemplo, para recuperar o produtor de um IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Para obter mais informações sobre os trabalhadores, consulte Serviços de Trabalho no .NET.

Adicionar um consumidor Kafka

Para registrar um IConsumer<TKey, TValue> para uso por meio do contêiner de injeção de dependência, chame o método de extensão AddKafkaConsumer no arquivo Program.cs do projeto que utiliza client. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a ser recebida do agente. Esses parâmetros genéricos são usados por AddKafkaConsumer para criar uma instância de ConsumerBuilder<TKey, TValue>. Esse método também usa o parâmetro de nome de conexão.

builder.AddKafkaConsumer<string, string>("messaging");

Em seguida, você pode recuperar a instância de IConsumer<TKey, TValue> usando a injeção de dependência. Por exemplo, para recuperar o consumidor de um IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Adicionar produtores ou consumidores Kafka com chave

Pode haver situações em que você deseja registrar várias instâncias de produtor ou consumidor com nomes de conexão diferentes. Para registrar os principais produtores ou consumidores do Kafka, chame a API apropriada:

Para obter mais informações sobre serviços chaveados, consulte .NET injeção de dependência: serviços chaveados.

Configuração

A integração .NET AspireApache Kafka fornece várias opções para configurar a conexão com base nos requisitos e convenções do seu projeto.

Usar uma cadeia de conexão

Ao usar uma cadeia de conexão da seção de configuração ConnectionStrings, você pode fornecer o nome da cadeia de conexão ao chamar builder.AddKafkaProducer() ou builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Em seguida, a cadeia de conexão é recuperada da seção de configuração ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

O valor da cadeia de conexão é definido como a propriedade BootstrapServers da instância de IProducer<TKey, TValue> ou IConsumer<TKey, TValue> produzida. Para obter mais informações, consulte BootstrapServers.

Usar provedores de configuração

A integração .NET AspireApache Kafka dá suporte a Microsoft.Extensions.Configuration. Ele carrega o KafkaProducerSettings ou o KafkaConsumerSettings da configuração, utilizando respectivamente as chaves Aspire:Confluent:Kafka:Producer e Aspire.Confluent:Kafka:Consumer. O snippet a seguir é um exemplo de um arquivo de appsettings.json que configura algumas das opções:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

As propriedades Config das seções de configuração Aspire:Confluent:Kafka:Producer e Aspire.Confluent:Kafka:Consumer, respectivamente, se associam a instâncias de ProducerConfig e ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requer que a propriedade ClientId seja definida para permitir que o broker acompanhe os deslocamentos de mensagem consumidos.

Para o esquema completo de integração Kafka clientJSON, consulte Aspire. Confluent.Kafka/ConfigurationSchema.json.

Usar delegados em linha

Há vários delegados embutidos disponíveis para configurar diversas opções.

ConfigurarKafkaProducerSettings e KafkaConsumerSettings

Você pode passar o delegado Action<KafkaProducerSettings> configureSettings para configurar algumas ou todas as opções embutidas, por exemplo, para desabilitar verificações de integridade do código:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Você pode configurar um consumidor embutido por meio do código:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configurar ProducerBuilder<TKey, TValue> e ConsumerBuilder<TKey, TValue>

Para configurar os construtores Confluent.Kafka, passe um Action<ProducerBuilder<TKey, TValue>> (ou Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Ao registrar produtores e consumidores, se você precisar acessar um serviço registrado no contêiner de DI, poderá passar um Action<IServiceProvider, ProducerBuilder<TKey, TValue>> ou Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> respectivamente:

Considere o seguinte exemplo de registro de produtor:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Para obter mais informações, consulte a documentação da API ProducerBuilder<TKey, TValue> e ConsumerBuilder<TKey, TValue>.

Client verificações de integridade de integração

Por padrão, as integrações .NET.NET Aspire habilitam verificações de integridade para todos os serviços. Para obter mais informações, consulte .NET.NET Aspire visão geral de integrações.

A integração .NET AspireApache Kafka lida com os seguintes cenários de verificação de saúde:

  • Adiciona a verificação de integridade Aspire.Confluent.Kafka.Producer quando KafkaProducerSettings.DisableHealthChecks é false.
  • Adiciona a verificação de integridade Aspire.Confluent.Kafka.Consumer quando KafkaConsumerSettings.DisableHealthChecks é false.
  • Integra-se ao endpoint HTTP /health, que especifica que todas as verificações de integridade registradas devem ser aprovadas para que o aplicativo seja considerado pronto para receber tráfego.

Observabilidade e telemetria

.NET .NET Aspire integrações configuram automaticamente configurações de Log, Rastreamento e Métricas, que às vezes são conhecidas como os pilares da observabilidade. Para obter mais informações sobre a observabilidade e a telemetria de integração, consulte .NET.NET Aspire visão geral das integrações. Dependendo do serviço de backup, algumas integrações só podem dar suporte a alguns desses recursos. Por exemplo, algumas integrações dão suporte a registro em log e rastreamento, mas não a métricas. Os recursos de telemetria também podem ser desabilitados usando as técnicas apresentadas na seção Configuration.

Registro de Eventos

A integração .NET AspireApache Kafka usa as seguintes categorias de log:

  • Aspire.Confluent.Kafka

Rastreamento

A integração .NET AspireApache Kafka não emite rastros distribuídos.

Métricas

A integração .NET AspireApache Kafka emite as seguintes métricas usando OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Consulte também