.NET Aspire Apache Kafka integrering
omfattar:Hostingintegration och
Client integration
Apache Kafka är en plattform för distribuerad händelseströmning med öppen källkod. Det är användbart för att bygga datapipelines i realtid och strömmande applikationer. Med integreringen .NET AspireApache Kafka kan du ansluta till befintliga Kafka-instanser eller skapa nya instanser från .NET med docker.io/confluentinc/confluent-local
containeravbildningen.
Värdintegrering
Apache Kafka som är värd för integrering modellerar en Kafka-server som KafkaServerResource typ. Om du vill komma åt den här typen installerar du 📦Aspire.Hosting.Kafka NuGet-paketet i appvärdprojektet för , och lägg sedan till det med buildern.
dotnet add package Aspire.Hosting.Kafka
Mer information finns i dotnet add package eller Hantera paketberoenden i .NET applikationer.
Lägg till Kafka-serverresurs
I appvärdprojektet anropar du AddKafka på builder
-instansen för att lägga till en Kafka-serverresurs:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
När .NET.NET Aspire lägger till en containerbild i apputrymmet, som visas i det föregående exemplet med docker.io/confluentinc/confluent-local
-bilden, skapar det en ny Kafka-serverinstans på din lokala dator. En referens till Kafka-servern (variabeln kafka
) läggs till i ExampleProject
. Kafka-serverresursen innehåller standardportar
Metoden WithReference konfigurerar en anslutning i ExampleProject
med namnet "kafka"
. Mer information finns i Livscykel för containerresurser.
Tips
Om du hellre vill ansluta till en befintlig Kafka-server anropar du AddConnectionString i stället. Mer information finns i Referera till befintliga resurser.
Lägg till Kafka-användargränssnitt
Om du vill lägga till Kafka-användargränssnittet till Kafka-serverresursen anropar du metoden WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka-användargränssnittet är ett kostnadsfritt webbgränssnitt med öppen källkod för att övervaka och hantera Apache Kafka kluster.
.NET
.NET Aspire lägger till ytterligare en containeravbildning docker.io/provectuslabs/kafka-ui
till appvärden som kör Kafka-användargränssnittet.
Ändra värdporten för Kafka-användargränssnittet
Om du vill ändra värdporten för Kafka-användargränssnittet kedjar du ett anrop till metoden WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka-användargränssnittet är tillgängligt på http://localhost:9100
i föregående exempel.
Lägga till Kafka-serverresurs med datavolym
Om du vill lägga till en datavolym i Kafka-serverresursen anropar du metoden WithDataVolume på Kafka-serverresursen:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Datavolymen används för att bevara Kafka-serverdata utanför containerns livscykel. Datavolymen monteras på sökvägen /var/lib/kafka/data
i Kafka-servercontainern och när en name
parameter inte anges, genereras namnet slumpvis. Mer information om datavolymer samt varför de föredras framför bindmountsfinns i dokumentet Docker Volymer.
Lägg till Kafka-serverresurs med databindningspunkt
Om du vill lägga till en databindningsmontering till Kafka-serverresursen anropar du metoden WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Viktig
Data bindningsmonteringar har begränsade funktioner jämfört med volymer, vilket ger bättre prestanda, portabilitet och säkerhet, vilket gör dem mer lämpliga för produktionsmiljöer. Bindningsmonteringar tillåter dock direkt åtkomst och ändring av filer i värdsystemet, perfekt för utveckling och testning där realtidsändringar behövs.
Databindningsmonteringar förlitar sig på värddatorns filsystem för att bevara Kafka-serverdata mellan omstarter av containrar. Databindningspunkten monteras i sökvägen C:\Kafka\Data
på Windows (eller /Kafka/Data
på Unix) på värddatorn i Kafka-serverns container. Mer information om bindningar av data finns i dokumentationen för Docker: Monteringar av bindningar.
Genomföra hälsokontroller för integrationer
Kafka-värdintegrering lägger automatiskt till en hälsokontroll för Kafka-serverresursen. Hälsokontrollen verifierar att en Kafka-producent med det angivna anslutningsnamnet kan ansluta och bevara ett ämne till Kafka-servern.
Värdtjänstintegration förlitar sig på 📦 AspNetCore.HealthChecks.Kafka NuGet-paket.
Client integrering
Kom igång med .NET AspireApache Kafka-integreringen genom att installera 📦Aspire. Confluent.Kafka NuGet-paket i det klientkrävande projektet, det vill säga projektet för det program som använder Apache Kafka-klienten.
dotnet add package Aspire.Confluent.Kafka
Lägg till Kafka-producent
Anropa Program.cs-tilläggsmetoden i AddKafkaProducer-filen för ditt klientprojekt för att registrera en IProducer<TKey, TValue>
för användning via beroendeinjektionscontainern. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska skickas till meddelandemäklaren. Dessa allmänna parametrar används av AddKafkaProducer
för att skapa en instans av ProducerBuilder<TKey, TValue>
. Den här metoden använder även parametern anslutningsnamn.
builder.AddKafkaProducer<string, string>("messaging");
Du kan sedan hämta IProducer<TKey, TValue>
-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta producenten från en IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Mer information om arbetare finns i Worker-tjänster i .NET.
Lägg till Kafka-konsument
Om du vill registrera en IConsumer<TKey, TValue>
för användning via containern för beroendeinjektion anropar du AddKafkaConsumer-tilläggsmetoden i Program.cs-filen för ditt klientprojekt. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska tas emot från mäklaren. Dessa allmänna parametrar används av AddKafkaConsumer
för att skapa en instans av ConsumerBuilder<TKey, TValue>
. Den här metoden använder även parametern anslutningsnamn.
builder.AddKafkaConsumer<string, string>("messaging");
Du kan sedan hämta IConsumer<TKey, TValue>
-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta en konsument från IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Lägg till nyckelade Kafka-producenter eller konsumenter
Det kan finnas situationer där du vill registrera flera producent- eller konsumentinstanser med olika anslutningsnamn. Om du vill registrera nyckelade Kafka-producenter eller konsumenter anropar du lämpligt API:
- AddKeyedKafkaProducer: Registrerar en nyckelad Kafka-producent.
- AddKeyedKafkaConsumer: Registrerar en nyckelad Kafka-konsument.
För mer information om nyckelade tjänster, se .NET beroendeinjektion: Nyckelade tjänster.
Konfiguration
Den .NET AspireApache Kafka integreringen innehåller flera alternativ för att konfigurera anslutningen baserat på kraven och konventionerna i ditt projekt.
Använda en anslutningssträng
När du använder en anslutningssträng från ConnectionStrings
-konfigurationsavsnittet kan du ange namnet på anslutningssträngen när du anropar builder.AddKafkaProducer()
eller builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Sedan hämtas anslutningssträngen från ConnectionStrings
-konfigurationsavsnittet:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Anslutningssträngsvärdet anges till egenskapen BootstrapServers
för den producerade IProducer<TKey, TValue>
- eller IConsumer<TKey, TValue>
-instansen. Mer information finns i BootstrapServers.
Använda konfigurationsprovidrar
.NET Aspire
Apache Kafka-integreringen stöder Microsoft.Extensions.Configuration. Den läser in KafkaProducerSettings eller KafkaConsumerSettings från konfigurationen med hjälp av Aspire:Confluent:Kafka:Producer
respektive Aspire.Confluent:Kafka:Consumer
nycklar. Följande kodfragment är ett exempel på en appsettings.json fil som konfigurerar några av alternativen:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Egenskaperna för Config
för konfigurationsavsnitten både Aspire:Confluent:Kafka:Producer
och Aspire.Confluent:Kafka:Consumer
binder till instanser av ProducerConfig
och ConsumerConfig
respektive.
Confluent.Kafka.Consumer<TKey, TValue>
kräver att egenskapen ClientId
anges så att mäklaren spårar förbrukade meddelandeförskjutningar.
Det fullständiga schemat för Kafka-klientintegrering JSON finns i Aspire. Confluent.Kafka/ConfigurationSchema.json.
Använd inline-delegater
Det finns flera interna ombud tillgängliga för att konfigurera olika alternativ.
KonfigureraKafkaProducerSettings
och KafkaConsumerSettings
Du kan skicka delegaten Action<KafkaProducerSettings> configureSettings
för att konfigurera vissa eller alla alternativ direkt i koden, till exempel för att inaktivera hälsokontroller från koden.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Du kan konfigurera en konsument inline från koden.
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Konfigurera ProducerBuilder<TKey, TValue>
och ConsumerBuilder<TKey, TValue>
För att konfigurera Confluent.Kafka
byggare, skicka en Action<ProducerBuilder<TKey, TValue>>
(eller Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
När du registrerar producenter och konsumenter kan du skicka en Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
eller Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
om du behöver komma åt en tjänst som är registrerad i DI-containern:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Tänk dig följande exempel på producentregistrering:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Mer information finns i dokumentationen om ProducerBuilder<TKey, TValue>
och ConsumerBuilder<TKey, TValue>
API.
Client hälsokontroller för integrering
Som standard aktiverar .NET.NET Aspire integreringar hälsokontroller för alla tjänster. Mer information finns i översikten över .NET.NET Aspire integreringar.
Integreringen .NET AspireApache Kafka hanterar följande hälsokontrollscenarier:
- Lägger till
Aspire.Confluent.Kafka.Producer
hälsokontroll när KafkaProducerSettings.DisableHealthChecks ärfalse
. - Lägger till
Aspire.Confluent.Kafka.Consumer
hälsokontroll när KafkaConsumerSettings.DisableHealthChecks ärfalse
. - Integrerar med
/health
HTTP-slutpunkt, som anger att alla registrerade hälsokontroller måste klara för att appen ska anses vara redo att ta emot trafik.
Observerbarhet och telemetri
.NET .NET Aspire integreringar ställer automatiskt in loggning, spårning och mätvärden, som ibland benämns grundpelarna för observerbarhet. Mer information om integreringsobservabilitet och telemetri finns i översikten över .NET.NET Aspire integreringar. Beroende på säkerhetskopieringstjänsten kanske vissa integreringar bara stöder vissa av dessa funktioner. Vissa integreringar stöder till exempel loggning och spårning, men inte mått. Telemetrifunktioner kan också inaktiveras med hjälp av de tekniker som visas i avsnittet Configuration.
Skogsavverkning
.NET Aspire Apache Kafka-integreringen använder följande loggkategorier:
Aspire.Confluent.Kafka
Spårning
Den .NET AspireApache Kafka integreringen genererar inte distribuerade spårningar.
Mätvärden
Integreringen .NET AspireApache Kafka genererar följande mått med hjälp av OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
Se även
.NET Aspire