integração .NET AspireApache Kafka
Inclui: de integração de hospedagem e Client de integração
Apache Kafka é uma plataforma de streaming de eventos distribuídos de software livre. É útil para criar pipelines de dados em tempo real e aplicativos de streaming. A integração .NET AspireApache Kafka permite que você se conecte a instâncias existentes do Kafka ou crie novas instâncias de .NET com a imagem de contêiner docker.io/confluentinc/confluent-local
.
Integração de hospedagem
O Apache Kafka modela a integração de hospedagem de um Kafka server como o tipo KafkaServerResource. Para acessar esse tipo, instale o pacote NuGet 📦Aspire.Hosting.Kafka no projeto de host do aplicativo e, depois, adicione-o ao construtor.
dotnet add package Aspire.Hosting.Kafka
Para obter mais informações, consulte dotnet add package ou Gerenciar dependências de pacotes em aplicativos .NET.
Adicionar o recurso server do Kafka
No projeto de host do aplicativo, chame AddKafka na instância de builder
para adicionar um recurso de Kafka server.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Quando .NET.NET Aspire adiciona uma imagem de contêiner ao host do aplicativo, conforme mostrado no exemplo anterior com a imagem docker.io/confluentinc/confluent-local
, ele cria uma nova instância do Kafka server em seu computador local. Uma referência a server Kafka (a variável kafka
) é adicionada ao ExampleProject
. O recurso server kafka inclui portas padrão
O método WithReference configura uma conexão no ExampleProject
denominado "kafka"
. Para obter mais informações, consulte o ciclo de vida do recurso contêiner .
Dica
Se você preferir se conectar a um serverkafka existente, chame AddConnectionString em vez disso. Para mais informações, consulte Recursos existentes de referência.
Adicionar interface do usuário do Kafka
Para adicionar a interface do usuário do Kafka ao recurso server do Kafka, chame o método WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
A UI do Kafka é uma interface web de código aberto e gratuita para monitorar e gerenciar clusters de Apache Kafka.
.NET
.NET Aspire adiciona outra imagem de contêiner docker.io/provectuslabs/kafka-ui
ao host do aplicativo que executa a interface do usuário do Kafka.
Alterar a porta de host da interface do usuário do Kafka
Para alterar a porta de host da interface do usuário do Kafka, encadeia uma chamada para o método WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
A interface do usuário do Kafka é acessível em http://localhost:9100
no exemplo anterior.
Adicionar o recurso Kafka server com volume de dados
Para adicionar um volume de dados ao recurso server kafka, chame o método WithDataVolume no recurso server kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
O volume de dados é usado para persistir os dados Kafka server fora do ciclo de vida de seu contêiner. O volume de dados é montado no caminho /var/lib/kafka/data
no contêiner server Kafka e, quando um parâmetro name
não é fornecido, o nome é gerado aleatoriamente. Para obter mais informações sobre volumes de dados e detalhes sobre por que eles são preferenciais em vez de associar montagens, consulte Docker documentos: Volumes.
Adicionar o recurso Kafka server com montagem de dados
Para adicionar um vínculo de montagem de dados ao recurso Kafka server, chame o método WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Importante
Os de dados associam montagens têm funcionalidade limitada em comparação com volumes, que oferecem melhor desempenho, portabilidade e segurança, tornando-os mais adequados para ambientes de produção. No entanto, os bind mounts permitem acesso direto e modificação de arquivos no sistema host, ideal para desenvolvimento e teste onde as alterações em tempo real são necessárias.
As montagens de associação de dados dependem do sistema de arquivos do computador host para persistir os dados do Kafka server entre reinicializações de contêiner. A montagem da associação de dados é montada no caminho C:\Kafka\Data
no Windows (ou /Kafka/Data
em Unix) no computador host no contêiner do server Kafka. Para obter mais informações sobre montagens de associação de dados, consulte Docker documentos: Associar montagens.
Verificações de integridade de integração de hospedagem
A integração de hospedagem do Kafka adiciona automaticamente uma verificação de integridade para o recurso Kafka server. A verificação de integridade verifica se um produtor Kafka com o nome de conexão especificado é capaz de se conectar ao Kafka e armazenar um tópico nele server.
A integração de hospedagem depende do pacote NuGet 📦 AspNetCore.HealthChecks.Kafka.
integração Client
Para começar a usar a integração .NET AspireApache Kafka, instale o pacote NuGet 📦Aspire.Confluent.Kafka no projeto que consome o client, ou seja, o projeto do aplicativo que usa o Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Adicionar produtor Kafka
No arquivo Program.cs do seu projeto consumidor de client, chame o método de extensão AddKafkaProducer para registrar um IProducer<TKey, TValue>
para ser usado através do contêiner de injeção de dependência. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a ser enviada ao agente. Esses parâmetros genéricos são usados por AddKafkaProducer
para criar uma instância de ProducerBuilder<TKey, TValue>
. Esse método também usa o parâmetro de nome de conexão.
builder.AddKafkaProducer<string, string>("messaging");
Em seguida, você pode recuperar a instância de IProducer<TKey, TValue>
usando a injeção de dependência. Por exemplo, para recuperar o produtor de um IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Para obter mais informações sobre os trabalhadores, consulte Serviços de Trabalho no .NET.
Adicionar um consumidor Kafka
Para registrar um IConsumer<TKey, TValue>
para uso por meio do contêiner de injeção de dependência, chame o método de extensão AddKafkaConsumer no arquivo Program.cs do projeto que utiliza client. O método usa dois parâmetros genéricos correspondentes ao tipo da chave e ao tipo da mensagem a ser recebida do agente. Esses parâmetros genéricos são usados por AddKafkaConsumer
para criar uma instância de ConsumerBuilder<TKey, TValue>
. Esse método também usa o parâmetro de nome de conexão.
builder.AddKafkaConsumer<string, string>("messaging");
Em seguida, você pode recuperar a instância de IConsumer<TKey, TValue>
usando a injeção de dependência. Por exemplo, para recuperar o consumidor de um IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Adicionar produtores ou consumidores Kafka com chave
Pode haver situações em que você deseja registrar várias instâncias de produtor ou consumidor com nomes de conexão diferentes. Para registrar os principais produtores ou consumidores do Kafka, chame a API apropriada:
- AddKeyedKafkaProducer: registra um produtor principal do Kafka.
- AddKeyedKafkaConsumer: Registra um consumidor Kafka com chave.
Para obter mais informações sobre serviços chaveados, consulte .NET injeção de dependência: serviços chaveados.
Configuração
A integração .NET AspireApache Kafka fornece várias opções para configurar a conexão com base nos requisitos e convenções do seu projeto.
Usar uma cadeia de conexão
Ao usar uma cadeia de conexão da seção de configuração ConnectionStrings
, você pode fornecer o nome da cadeia de conexão ao chamar builder.AddKafkaProducer()
ou builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Em seguida, a cadeia de conexão é recuperada da seção de configuração ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
O valor da cadeia de conexão é definido como a propriedade BootstrapServers
da instância de IProducer<TKey, TValue>
ou IConsumer<TKey, TValue>
produzida. Para obter mais informações, consulte BootstrapServers.
Usar provedores de configuração
A integração .NET AspireApache Kafka dá suporte a Microsoft.Extensions.Configuration. Ele carrega o KafkaProducerSettings ou o KafkaConsumerSettings da configuração, utilizando respectivamente as chaves Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
. O snippet a seguir é um exemplo de um arquivo de appsettings.json que configura algumas das opções:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
As propriedades Config
das seções de configuração Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
, respectivamente, se associam a instâncias de ProducerConfig
e ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requer que a propriedade ClientId
seja definida para permitir que o broker acompanhe os deslocamentos de mensagem consumidos.
Para o esquema completo de integração Kafka clientJSON, consulte Aspire. Confluent.Kafka/ConfigurationSchema.json.
Usar delegados em linha
Há vários delegados embutidos disponíveis para configurar diversas opções.
ConfigurarKafkaProducerSettings
e KafkaConsumerSettings
Você pode passar o delegado Action<KafkaProducerSettings> configureSettings
para configurar algumas ou todas as opções embutidas, por exemplo, para desabilitar verificações de integridade do código:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Você pode configurar um consumidor embutido por meio do código:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configurar ProducerBuilder<TKey, TValue>
e ConsumerBuilder<TKey, TValue>
Para configurar os construtores Confluent.Kafka
, passe um Action<ProducerBuilder<TKey, TValue>>
(ou Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Ao registrar produtores e consumidores, se você precisar acessar um serviço registrado no contêiner de DI, poderá passar um Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
ou Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
respectivamente:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Considere o seguinte exemplo de registro de produtor:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Para obter mais informações, consulte a documentação da API ProducerBuilder<TKey, TValue>
e ConsumerBuilder<TKey, TValue>
.
Client verificações de integridade de integração
Por padrão, as integrações .NET.NET Aspire habilitam verificações de integridade para todos os serviços. Para obter mais informações, consulte .NET.NET Aspire visão geral de integrações.
A integração .NET AspireApache Kafka lida com os seguintes cenários de verificação de saúde:
- Adiciona a verificação de integridade
Aspire.Confluent.Kafka.Producer
quando KafkaProducerSettings.DisableHealthChecks éfalse
. - Adiciona a verificação de integridade
Aspire.Confluent.Kafka.Consumer
quando KafkaConsumerSettings.DisableHealthChecks éfalse
. - Integra-se ao endpoint HTTP
/health
, que especifica que todas as verificações de integridade registradas devem ser aprovadas para que o aplicativo seja considerado pronto para receber tráfego.
Observabilidade e telemetria
.NET .NET Aspire integrações configuram automaticamente configurações de Log, Rastreamento e Métricas, que às vezes são conhecidas como os pilares da observabilidade. Para obter mais informações sobre a observabilidade e a telemetria de integração, consulte .NET.NET Aspire visão geral das integrações. Dependendo do serviço de backup, algumas integrações só podem dar suporte a alguns desses recursos. Por exemplo, algumas integrações dão suporte a registro em log e rastreamento, mas não a métricas. Os recursos de telemetria também podem ser desabilitados usando as técnicas apresentadas na seção Configuration.
Registro de Eventos
A integração .NET AspireApache Kafka usa as seguintes categorias de log:
Aspire.Confluent.Kafka
Rastreamento
A integração .NET AspireApache Kafka não emite rastros distribuídos.
Métricas
A integração .NET AspireApache Kafka emite as seguintes métricas usando OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received