integrazione di .NET AspireApache Kafka
Include:integrazione dell'hosting e
Client integrazione
Apache Kafka è una piattaforma di streaming di eventi distribuiti open source. È utile per la creazione di pipeline di dati in tempo reale e applicazioni di streaming. L'integrazione .NET AspireApache Kafka consente di connettersi alle istanze Kafka esistenti o di creare nuove istanze da .NET con l'immagine del contenitore docker.io/confluentinc/confluent-local
.
Integrazione dell'hosting
L'integrazione dell'hosting Apache Kafka modella un server Kafka come il tipo KafkaServerResource. Per accedere a questo tipo, installare il pacchetto NuGet 📦Aspire.Hosting.Kafka nel progetto host dell'app , quindi aggiungerlo con il builder.
- .NET interfaccia della riga di comando
- PackageReference
dotnet add package Aspire.Hosting.Kafka
Per altre informazioni, vedere dotnet add package o Manage package dependencies in .NET applications.
Aggiungere una risorsa server Kafka
Nel progetto host dell'app chiamare AddKafka nell'istanza di builder
per aggiungere una risorsa server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Quando .NET.NET Aspire aggiunge un'immagine del contenitore all'host dell'app, come illustrato nell'esempio precedente con l'immagine docker.io/confluentinc/confluent-local
, crea una nuova istanza del server Kafka nel computer locale. Al ExampleProject
viene aggiunto un riferimento al server Kafka (la variabile kafka
). La risorsa server Kafka include le porte predefinite.
Il metodo WithReference configura una connessione nel ExampleProject
denominato "kafka"
. Per ulteriori informazioni, consultare ciclo di vita delle risorse container.
Consiglio
Se invece si preferisce connettersi a un server Kafka esistente, chiamare AddConnectionString. Per altre informazioni, vedere Fare riferimento alle risorse esistenti.
Aggiungere l'interfaccia utente di Kafka
Per aggiungere l'interfaccia utente Kafka UI alla risorsa server Kafka, chiamare il metodo WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L'interfaccia utente kafka è un'interfaccia utente Web gratuita e open source per monitorare e gestire i cluster Apache Kafka.
.NET
.NET Aspire aggiunge un'altra immagine del contenitore docker.io/provectuslabs/kafka-ui
all'host dell'app che esegue l'interfaccia utente Kafka.
Modificare la porta host dell'interfaccia utente Kafka
Per modificare la porta host dell'interfaccia utente Kafka, concatenare una chiamata al metodo WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L'interfaccia utente di Kafka è accessibile in http://localhost:9100
nell'esempio precedente.
Aggiungere una risorsa server Kafka con volume di dati
Per aggiungere un volume di dati alla risorsa server Kafka, chiamare il metodo WithDataVolume nella risorsa server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Il volume di dati viene usato per rendere persistenti i dati del server Kafka all'esterno del ciclo di vita del contenitore. Il volume di dati viene montato nel percorso /var/lib/kafka/data
nel contenitore del server Kafka e quando non viene specificato un parametro name
, il nome viene generato in modo casuale. Per ulteriori informazioni sui volumi di dati e sui motivi per cui sono preferiti rispetto ai bind mount , consultare la documentazione Docker: Volumi.
Aggiungere una risorsa server Kafka con il montaggio dell'associazione dati
Per aggiungere un montaggio di associazione dati alla risorsa server Kafka, chiamare il metodo WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Importante
I di binding dei dati hanno funzionalità limitate rispetto ai volumi , che offrono prestazioni, portabilità e sicurezza migliori, rendendole più adatte per gli ambienti di produzione. Tuttavia, i bind mounts consentono l'accesso e la modifica diretti dei file nel sistema host, ideale per lo sviluppo e il test quando sono necessarie modifiche in tempo reale.
I montaggi di associazione dati si basano sul file system del computer host per rendere persistenti i dati del server Kafka tra i riavvii del contenitore. Il bind mount dati è montato al percorso C:\Kafka\Data
su Windows (o /Kafka/Data
su Unix) nel computer host per il contenitore del server Kafka. Per altre informazioni sui montaggi di associazione dati, vedere Docker docs: Bind mounts.
Hosting dei controlli di integrità dell'integrazione
L'integrazione dell'hosting Kafka aggiunge automaticamente un controllo di integrità per la risorsa server Kafka. Il controllo di integrità verifica che un producer Kafka con il nome di connessione specificato possa connettersi e memorizzare un argomento sul server Kafka.
L'integrazione dell'hosting si basa sul pacchetto NuGet 📦 AspNetCore.HealthChecks.Kafka.
Client integrazione
Per iniziare a usare l'integrazione di .NET AspireApache Kafka, installare il pacchetto NuGet 📦Aspire.Confluent.Kafka nel progetto che utilizza il client, cioè il progetto per l'applicazione che usa il client Apache Kafka.
- .NET interfaccia della riga di comando
- PackageReference
dotnet add package Aspire.Confluent.Kafka
Aggiungere il producer Kafka
Nel file Program.cs del progetto che usa il client, chiama il metodo di estensione AddKafkaProducer per registrare un IProducer<TKey, TValue>
da utilizzare tramite il contenitore di iniezione di dipendenze. Il metodo accetta due parametri generici corrispondenti al tipo della chiave e al tipo del messaggio da inviare al broker. Questi parametri generici vengono usati da AddKafkaProducer
per creare un'istanza di ProducerBuilder<TKey, TValue>
. Questo metodo accetta anche il parametro del nome della connessione.
builder.AddKafkaProducer<string, string>("messaging");
È quindi possibile recuperare l'istanza di IProducer<TKey, TValue>
usando l'iniezione delle dipendenze. Ad esempio, per recuperare il producer da un IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Per ulteriori informazioni sui lavoratori, vedere Servizi per i lavoratori in .NET.
Aggiungere un consumer Kafka
Per registrare un IConsumer<TKey, TValue>
da usare tramite il contenitore di inserimento delle dipendenze, chiamare il metodo di estensione AddKafkaConsumer nel file Program.cs del progetto che usa il client. Il metodo accetta due parametri generici corrispondenti al tipo della chiave e al tipo del messaggio da ricevere dal broker. Questi parametri generici vengono usati da AddKafkaConsumer
per creare un'istanza di ConsumerBuilder<TKey, TValue>
. Questo metodo accetta anche il parametro del nome della connessione.
builder.AddKafkaConsumer<string, string>("messaging");
È quindi possibile recuperare l'istanza di IConsumer<TKey, TValue>
usando l'iniezione delle dipendenze. Ad esempio, per recuperare l'utente da un IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Aggiungere i producer o consumer Kafka con chiave
In alcuni casi potrebbe essere necessario registrare più istanze di producer o consumer con nomi di connessione diversi. Per registrare i producer o consumer Kafka con chiavi, chiamare l'API appropriata.
- AddKeyedKafkaProducer: registra un producer Kafka con chiave.
- AddKeyedKafkaConsumer: registra un consumer Kafka con chiave.
Per altre informazioni sui servizi con chiave, vedere .NET iniezione delle dipendenze: Servizi con chiave.
Configurazione
L'integrazione .NET AspireApache Kafka offre più opzioni per configurare la connessione in base ai requisiti e alle convenzioni del progetto.
Usare una stringa di connessione
Quando si usa una stringa di connessione dalla sezione di configurazione ConnectionStrings
, è possibile specificare il nome della stringa di connessione quando si chiama builder.AddKafkaProducer()
o builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
La stringa di connessione viene quindi recuperata dalla sezione di configurazione ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Il valore della stringa di connessione viene impostato sulla proprietà BootstrapServers
dell'istanza di IProducer<TKey, TValue>
o IConsumer<TKey, TValue>
generata. Per altre informazioni, vedere BootstrapServers.
Utilizzare i provider di configurazione
L'integrazione .NET AspireApache Kafka supporta Microsoft.Extensions.Configuration. Carica il KafkaProducerSettings o il KafkaConsumerSettings dalla configurazione usando rispettivamente le chiavi Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
. Il frammento di codice seguente è un esempio di un file appsettings.json che configura alcune delle opzioni:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Le proprietà Config
di entrambe le sezioni di configurazione Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
si associano rispettivamente alle istanze di ProducerConfig
e ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
richiede che la proprietà ClientId
sia impostata per consentire al broker di tenere traccia degli offset dei messaggi utilizzati.
Per lo schema completo dell'integrazione client Kafka JSON, vedere Aspire. Confluent.Kafka/ConfigurationSchema.json.
Usare delegati inline
Sono disponibili diversi delegati inline per configurare varie opzioni.
ConfigurareKafkaProducerSettings
e KafkaConsumerSettings
È possibile passare il delegato Action<KafkaProducerSettings> configureSettings
per configurare alcune o tutte le opzioni inline, ad esempio per disabilitare i controlli di integrità dal codice:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
È possibile configurare un consumer in-line dal codice:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configurare ProducerBuilder<TKey, TValue>
e ConsumerBuilder<TKey, TValue>
Per configurare i generatori di Confluent.Kafka
, passare un Action<ProducerBuilder<TKey, TValue>>
(o Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Quando si registrano i producer e i consumer, se è necessario accedere a un servizio registrato nel contenitore DI, è possibile passare rispettivamente un Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
o Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Ecco un esempio di registrazione del seguente produttore:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Per altre informazioni, vedere ProducerBuilder<TKey, TValue>
e documentazione dell'API ConsumerBuilder<TKey, TValue>
.
Client verifiche di integrità dell'integrazione
Per impostazione predefinita, le integrazioni di .NET.NET Aspire abilitano verifiche di integrità per tutti i servizi. Per altre informazioni, vedere panoramica delle integrazioni .NET.NET Aspire.
L'integrazione .NET AspireApache Kafka gestisce i seguenti scenari di verifica dello stato di salute:
- Aggiunge il controllo integrità
Aspire.Confluent.Kafka.Producer
quando KafkaProducerSettings.DisableHealthChecks èfalse
. - Aggiunge il controllo integrità
Aspire.Confluent.Kafka.Consumer
quando KafkaConsumerSettings.DisableHealthChecks èfalse
. - Si integra con l'endpoint HTTP
/health
, il quale specifica che tutti i controlli di integrità registrati devono essere superati con successo affinché l'app sia considerata pronta per accettare il traffico.
Osservabilità e telemetria
.NET
.NET Aspire le integrazioni configurano automaticamente registrazione, tracciamento e metriche, talvolta noti come i pilastri dell'osservabilità. Per altre informazioni sull'osservabilità e la telemetria dell'integrazione, vedere panoramica delle integrazioni .NET.NET Aspire. A seconda del servizio di backup, alcune integrazioni possono supportare solo alcune di queste funzionalità. Ad esempio, alcune integrazioni supportano la registrazione e la traccia, ma non le metriche. Le funzionalità di telemetria possono essere disabilitate anche usando le tecniche presentate nella sezione Configurazione
Registrazione
L'integrazione .NET AspireApache Kafka usa le categorie di log seguenti:
Aspire.Confluent.Kafka
Tracciatura
L'integrazione .NET AspireApache Kafka non genera tracce distribuite.
Metriche
L'integrazione .NET AspireApache Kafka genera le metriche seguenti usando OpenTelemetry:
Aspire.Confluent.Kafka
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received