Partilhar via


.NET Aspire Apache Kafka integração

Inclui: de integraçãoHosting e integração

Apache Kafka é uma plataforma de streaming de eventos distribuídos de código aberto. É útil para criar pipelines de dados em tempo real e aplicativos de streaming. A integração .NET AspireApache Kafka permite que você se conecte a instâncias Kafka existentes ou crie novas instâncias a partir de .NET com a imagem de contêiner docker.io/confluentinc/confluent-local.

Integração de hospedagem

A integração de hosting Apache Kafka modela um Kafka server como tipo de KafkaServerResource. Para aceder a este tipo, instale o pacote NuGet 📦Aspire.Hosting.Kafka no projeto do anfitrião da aplicação , e depois adicione-o com o construtor.

dotnet add package Aspire.Hosting.Kafka

Para obter mais informações, consulte dotnet add package ou Gerir dependências de pacotes em aplicações .NET.

Adicionar recurso Kafka server

Em seu projeto de host de aplicativo, chame AddKafka na instância builder para adicionar um recurso server Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Quando .NET.NET Aspire adiciona uma imagem de contêiner ao host do aplicativo, como mostrado no exemplo anterior com a imagem docker.io/confluentinc/confluent-local, ele cria uma nova instância de server Kafka em sua máquina local. Uma referência ao Kafka server (à variável kafka) é adicionada ao ExampleProject. O recurso Kafka server inclui portas padrão

O método WithReference configura uma conexão no ExampleProject chamado "kafka". Para obter mais informações, consulte Ciclo de vida do recurso de contêiner.

Dica

Se você preferir se conectar a um serverKafka existente, ligue para AddConnectionString em vez disso. Para obter mais informações, consulte Fazer referência a recursos existentes.

Adicionar Kafka UI

Para adicionar o Kafka UI ao recurso Kafka server, chame o método WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

A UI do Kafka é uma interface do usuário da Web gratuita e de código aberto para monitorar e gerenciar clusters Apache Kafka. .NET .NET Aspire adiciona outra imagem de contentor docker.io/provectuslabs/kafka-ui ao anfitrião da aplicação que executa a interface do Kafka.

Alterar a porta de host da interface do usuário Kafka

Para alterar a porta de host da interface do usuário Kafka, encadeie uma chamada para o método WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

A interface do usuário de Kafka pode ser acessada em http://localhost:9100 no exemplo anterior.

Adicionar recurso Kafka server com volume de dados

Para adicionar um volume de dados ao recurso server Kafka, chame o método WithDataVolume no recurso server Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

O volume de dados é usado para manter os dados do Kafka server fora do ciclo de vida de seu contêiner. O volume de dados é montado no caminho /var/lib/kafka/data no contêiner Kafka server e quando um parâmetro name não é fornecido, o nome é gerado aleatoriamente. Para obter mais informações sobre volumes de dados e detalhes sobre por que eles são preferidos em relação a montagens de ligação , consulte Docker docs: Volumes.

Adicionar o recurso Kafka server com ponto de montagem de ligação de dados

Para adicionar uma montagem de ligação de dados ao recurso Kafka server, chame o método WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Importante

Os suportes de ligação de de dados têm funcionalidade limitada em comparação com volumes, que oferecem melhor desempenho, portabilidade e segurança, tornando-os mais adequados para ambientes de produção. No entanto, as montagens bind permitem o acesso direto e a modificação de arquivos no sistema host, ideal para desenvolvimento e testes onde alterações em tempo real são necessárias.

As montagens de associação de dados dependem do sistema de arquivos da máquina host para persistir os dados do Kafka server nas reinicializações do contêiner. A montagem de ligação de dados é montada no caminho C:\Kafka\Data no Windows (ou /Kafka/Data no Unix) na máquina host no contêiner Kafka server. Para obter mais informações sobre montagens de associação de dados, consulte Docker docs: Bind mounts.

Verificações de integridade da integração de hospedagem

A integração de hospedagem Kafka adiciona automaticamente uma verificação de integridade para o recurso Kafka server. A verificação de integridade confirma se um produtor de Kafka com o nome de conexão especificado é capaz de se conectar e armazenar um tópico no Kafka server.

A integração de hospedagem depende do 📦 AspNetCore.HealthChecks.Kafka pacote NuGet.

Client integração

Para começar com a integração .NET AspireApache Kafka, instale o pacote NuGet 📦Aspire.Confluent.Kafka no projeto de consumo de client, ou seja, o projeto para o aplicativo que usa o Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Adicionar produtor Kafka

No arquivo de Program.cs do seu projeto que consome client, chame o método de extensão AddKafkaProducer para registar um IProducer<TKey, TValue> para uso através do contentor de injeção de dependência. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a ser enviada ao corretor. Esses parâmetros genéricos são usados por AddKafkaProducer para criar uma instância de ProducerBuilder<TKey, TValue>. Esse método também usa o parâmetro de nome de conexão.

builder.AddKafkaProducer<string, string>("messaging");

Em seguida, você pode recuperar a instância IProducer<TKey, TValue> usando a injeção de dependência. Por exemplo, para recuperar o produtor de um IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Para obter mais informações sobre trabalhadores, consulte Serviços do trabalhador no .NET.

Adicionar consumidor Kafka

Para registar um IConsumer<TKey, TValue> para uso por meio do contentor de injeção de dependência, chame o método de extensão AddKafkaConsumer no ficheiro de Program.cs do projeto que consome client. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a receber do broker. Esses parâmetros genéricos são usados por AddKafkaConsumer para criar uma instância de ConsumerBuilder<TKey, TValue>. Esse método também usa o parâmetro de nome de conexão.

builder.AddKafkaConsumer<string, string>("messaging");

Em seguida, você pode recuperar a instância IConsumer<TKey, TValue> usando a injeção de dependência. Por exemplo, para recuperar o consumidor de um IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Adicionar produtores ou consumidores Kafka chaveados

Pode haver situações em que você queira registrar várias instâncias de produtor ou consumidor com nomes de conexão diferentes. Para registrar produtores ou consumidores de Kafka com chave, chame a API apropriada:

Para obter mais informações sobre serviços chaveados, consulte .NET injeção de dependência: Serviços chaveados.

Configuração

A integração .NET AspireApache Kafka fornece várias opções para configurar a conexão com base nos requisitos e convenções do seu projeto.

Usar uma cadeia de conexão

Ao usar uma cadeia de conexão da seção de configuração de ConnectionStrings, você pode fornecer o nome da cadeia de conexão ao chamar builder.AddKafkaProducer() ou builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Em seguida, a cadeia de conexão é recuperada da seção de configuração ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

O valor da cadeia de conexão é definido na propriedade BootstrapServers da instância IProducer<TKey, TValue> ou IConsumer<TKey, TValue> produzida. Para obter mais informações, consulte BootstrapServers.

Usar provedores de configuração

A integração .NET AspireApache Kafka suporta Microsoft.Extensions.Configuration. Ele carrega o KafkaProducerSettings ou KafkaConsumerSettings da configuração usando, respectivamente, as teclas Aspire:Confluent:Kafka:Producer e Aspire.Confluent:Kafka:Consumer. O trecho a seguir é um exemplo de um arquivo de appsettings.json que configura algumas das opções:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

As propriedades Config das seções de configuração Aspire:Confluent:Kafka:Producer e Aspire.Confluent:Kafka:Consumer, respectivamente, ligam-se a instâncias de ProducerConfig e ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requer que a propriedade ClientId seja definida para permitir que o intermediário monitorize os deslocamentos de mensagens consumidas.

Para o esquema completo de Kafka client de integração JSON, consulte Aspire. Confluent.Kafka/ConfigurationSchema.json.

Usar delegados em linha

Há vários delegados embutidos disponíveis para configurar várias opções.

ConfigurarKafkaProducerSettings e KafkaConsumerSettings

Você pode passar o delegate Action<KafkaProducerSettings> configureSettings para configurar algumas ou todas as opções diretamente, por exemplo, para desativar as verificações de integridade através do código:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Você pode configurar um consumidor embutido a partir do código:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configurar ProducerBuilder<TKey, TValue> e ConsumerBuilder<TKey, TValue>

Para configurar construtores Confluent.Kafka, passe um Action<ProducerBuilder<TKey, TValue>> (ou Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Ao cadastrar produtores e consumidores, caso necessite acessar um serviço cadastrado no contêiner DI, poderá passar por um Action<IServiceProvider, ProducerBuilder<TKey, TValue>> ou Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> respectivamente:

Considere o seguinte exemplo de registo de produtor:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Para obter mais informações, consulte a documentação da API ProducerBuilder<TKey, TValue> e ConsumerBuilder<TKey, TValue>.

Client verificações de saúde da integração

Por padrão, as integrações .NET.NET Aspire habilitam verificações de integridade para todos os serviços. Para obter mais informações, consulte .NET.NET Aspire visão geral das integrações.

A integração .NET AspireApache Kafka lida com os seguintes cenários de verificação de saúde:

  • Adiciona a verificação de integridade Aspire.Confluent.Kafka.Producer quando KafkaProducerSettings.DisableHealthChecks é false.
  • Adiciona a verificação de integridade Aspire.Confluent.Kafka.Consumer quando KafkaConsumerSettings.DisableHealthChecks é false.
  • Integra-se com o endpoint HTTP /health, que especifica que todas as verificações de integridade registadas devem passar para que a aplicação seja considerada pronta para aceitar tráfego.

Observabilidade e telemetria

.NET .NET Aspire integrações configuram automaticamente as definições de Registo, Rastreamento e Medição, que às vezes são conhecidas como os pilares da observabilidade. Para obter mais informações sobre observabilidade e telemetria de integração, consulte Visão geral de integrações .NET.NET Aspire. Dependendo do serviço de suporte, algumas integrações podem suportar apenas alguns desses recursos. Por exemplo, algumas integrações suportam registro em log e rastreamento, mas não métricas. Os recursos de telemetria também podem ser desativados usando as técnicas apresentadas na seção de configuração.

Registo

A integração .NET AspireApache Kafka usa as seguintes categorias de log:

  • Aspire.Confluent.Kafka

Rastreio

A integração .NET AspireApache Kafka não emite traços distribuídos.

Métricas

A integração .NET AspireApache Kafka emite as seguintes métricas usando OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Ver também