integratie van .NET AspireApache Kafka
omvat:hostingintegratie en
Client integratie
Apache Kafka is een opensource gedistribueerd gebeurtenisstreamingplatform. Het is handig voor het bouwen van realtime gegevenspijplijnen en streamingtoepassingen. Met de .NET AspireApache Kafka-integratie kunt u verbinding maken met bestaande Kafka-exemplaren of nieuwe exemplaren maken vanuit .NET met de docker.io/confluentinc/confluent-local
containerinstallatiekopieën.
Hostingintegratie
Het Apache Kafka-hostingintegratiemodel modelleert een Kafka-server als het KafkaServerResource-type. Installeer het 📦Aspire.Hosting.Kafka NuGet-pakket in het app-host project om toegang te krijgen tot dit type, en voeg het vervolgens toe met de builder.
dotnet add package Aspire.Hosting.Kafka
Zie dotnet pakket toevoegen of Pakketafhankelijkheden beheren in .NET toepassingenvoor meer informatie.
Kafka-serverresource toevoegen
Roep in uw app-hostproject AddKafka aan op het builder
exemplaar om een Kafka-serverresource toe te voegen:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Wanneer .NET.NET Aspire een containerafbeelding toevoegt aan de app-host, zoals wordt weergegeven in het vorige voorbeeld met de docker.io/confluentinc/confluent-local
-afbeelding, wordt er een nieuw Kafka-serverexemplaar op uw lokale machine gemaakt. Er wordt een verwijzing naar uw Kafka-server (de variabele kafka
) toegevoegd aan de ExampleProject
. De Kafka-serverresource bevat standaardpoorten
De methode WithReference configureert een verbinding in de ExampleProject
met de naam "kafka"
. Zie levenscyclus van containerresourcesvoor meer informatie.
Tip
Als u liever verbinding wilt maken met een bestaande Kafka-server, roept u in plaats daarvan AddConnectionString aan. Zie Bestaande bronnenvoor meer informatie.
Kafka-gebruikersinterface toevoegen
Als u de Kafka-gebruikersinterface wilt toevoegen aan de Kafka-serverresource, roept u de methode WithKafkaUI aan:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
De Kafka-gebruikersinterface is een gratis opensource-webinterface voor het bewaken en beheren van Apache Kafka clusters.
.NET
.NET Aspire voegt nog een containerimage docker.io/provectuslabs/kafka-ui
toe aan de app-host waarin de Kafka-gebruikersinterface draait.
De kafka UI-hostpoort wijzigen
Als u de kafka UI-hostpoort wilt wijzigen, koppelt u een aanroep aan de WithHostPort methode:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
De Kafka-gebruikersinterface is toegankelijk op http://localhost:9100
in het vorige voorbeeld.
Kafka-serverresource met gegevensvolume toevoegen
Als u een gegevensvolume wilt toevoegen aan de Kafka-serverresource, roept u de WithDataVolume methode aan op de Kafka-serverresource:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Het gegevensvolume wordt gebruikt om de Kafka-servergegevens buiten de levenscyclus van de container te behouden. Het gegevensvolume wordt gekoppeld aan het /var/lib/kafka/data
pad in de Kafka-servercontainer en wanneer er geen name
parameter wordt opgegeven, wordt de naam willekeurig gegenereerd. Zie de voor meer informatie over gegevensvolumes en details waarom ze de voorkeur hebben boven Docker.
Kafka-serverbron toevoegen met data-bindmount
Om een data-bindmount toe te voegen aan de Kafka-serverresource, roept u de WithDataBindMount-methode aan.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Belangrijk
Gegevens koppelen beperkte functionaliteit hebben in vergelijking met volumes, die betere prestaties, draagbaarheid en beveiliging bieden, waardoor ze geschikter zijn voor productieomgevingen. Bind mounts bieden echter directe toegang tot en wijziging van bestanden op het hostsysteem, ideaal voor ontwikkeling en testen waarbij realtime wijzigingen nodig zijn.
Koppelen van gegevensbindingen zijn afhankelijk van het bestandssysteem van de hostcomputer om de Kafka-servergegevens bij het opnieuw opstarten van de container te behouden. De data-bindmount wordt gekoppeld op het pad C:\Kafka\Data
onder Windows (of /Kafka/Data
op Unix) op de hostmachine in de Kafka-container. Zie Docker docs: Bindingskoppelingenvoor meer informatie over koppelingskoppelingen voor gegevens.
Gezondheidscontroles voor hostingintegratie
De Integratie van Kafka-hosting voegt automatisch een statuscontrole toe voor de Kafka-serverresource. De statuscontrole controleert of een Kafka-producent met de opgegeven verbindingsnaam verbinding kan maken en een onderwerp kan behouden met de Kafka-server.
De hostingintegratie is afhankelijk van het 📦 AspNetCore.HealthChecks.Kafka NuGet-pakket.
integratie van Client
Installeer de .NET AspireApache Kafkaom aan de slag te gaan met de 📦Aspire-integratie. Confluent.Kafka NuGet-pakket in het clientgebruikte project, dat wil gezegd het project voor de toepassing die gebruikmaakt van de Apache Kafka-client.
dotnet add package Aspire.Confluent.Kafka
Kafka-producent toevoegen
Roep in het Program.cs bestand van het clientgebruikte project de AddKafkaProducer extensiemethode aan om een IProducer<TKey, TValue>
te registreren voor gebruik via de container voor afhankelijkheidsinjectie. De methode gebruikt twee algemene parameters die overeenkomen met het type sleutel en het type bericht dat naar de broker moet worden verzonden. Deze algemene parameters worden gebruikt door AddKafkaProducer
om een exemplaar van ProducerBuilder<TKey, TValue>
te maken. Deze methode gebruikt ook de parameter verbindingsnaam.
builder.AddKafkaProducer<string, string>("messaging");
Vervolgens kunt u het IProducer<TKey, TValue>
exemplaar ophalen met behulp van afhankelijkheidsinjectie. Als u bijvoorbeeld de producent wilt ophalen uit een IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Zie Worker-services in .NETvoor meer informatie over werknemers.
Kafka-consument toevoegen
Als u een IConsumer<TKey, TValue>
wilt registreren voor gebruik via de container voor afhankelijkheidsinjectie, roept u de AddKafkaConsumer-extensiemethode aan in het Program.cs-bestand van het project waarin de client wordt gebruikt. De methode gebruikt twee algemene parameters die overeenkomen met het type sleutel en het type bericht dat moet worden ontvangen van de broker. Deze algemene parameters worden gebruikt door AddKafkaConsumer
om een exemplaar van ConsumerBuilder<TKey, TValue>
te maken. Deze methode gebruikt ook de parameter verbindingsnaam.
builder.AddKafkaConsumer<string, string>("messaging");
Vervolgens kunt u het IConsumer<TKey, TValue>
exemplaar ophalen met behulp van afhankelijkheidsinjectie. Als u bijvoorbeeld de consument wilt ophalen uit een IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Belangrijke Kafka-producenten of -consumenten toevoegen
Er kunnen situaties zijn waarin u meerdere producenten- of consumentenexemplaren met verschillende verbindingsnamen wilt registreren. Als u de belangrijkste Kafka-producenten of -consumenten wilt registreren, roept u de juiste API aan:
- AddKeyedKafkaProducer: Registreert een belangrijke Kafka-producent.
- AddKeyedKafkaConsumer: Registreert een Kafka-consument met sleutel.
Voor meer informatie over sleutelservices, zie .NET afhankelijkheidsinjectie: Keyed Services.
Configuratie
De .NET AspireApache Kafka-integratie biedt meerdere opties voor het configureren van de verbinding op basis van de vereisten en conventies van uw project.
Een verbindingsreeks gebruiken
Wanneer u een verbindingsreeks uit de sectie ConnectionStrings
configuratie gebruikt, kunt u de naam van de verbindingsreeks opgeven bij het aanroepen van builder.AddKafkaProducer()
of builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Vervolgens wordt de verbindingsreeks opgehaald uit de ConnectionStrings
configuratiesectie:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
De waarde van de verbindingsreeks wordt ingesteld op de eigenschap BootstrapServers
van de geproduceerde IProducer<TKey, TValue>
- of IConsumer<TKey, TValue>
-exemplaren. Zie BootstrapServersvoor meer informatie.
Configuratieproviders gebruiken
De .NET AspireApache Kafka-integratie ondersteunt Microsoft.Extensions.Configuration. Het laadt respectievelijk de KafkaProducerSettings en KafkaConsumerSettings vanuit de configuratie door gebruik te maken van de Aspire:Confluent:Kafka:Producer
- en Aspire.Confluent:Kafka:Consumer
-sleutels. Het volgende codefragment is een voorbeeld van een appsettings.json-bestand waarmee een aantal van de opties wordt geconfigureerd:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
De Config
eigenschappen van zowel Aspire:Confluent:Kafka:Producer
als Aspire.Confluent:Kafka:Consumer
configuratiesecties binden respectievelijk aan exemplaren van ProducerConfig
en ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
vereist dat de eigenschap ClientId
wordt ingesteld om de verbruikte offset van berichten door de broker bij te houden.
Zie JSONvoor het volledige Kafka-clientintegratieschema Aspire. Confluent.Kafka/ConfigurationSchema.json.
Gebruik inline-afgevaardigden
Er zijn verschillende inline delegates beschikbaar om verschillende opties te configureren.
KafkaProducerSettings
en KafkaConsumerSettings
configureren
U kunt de Action<KafkaProducerSettings> configureSettings
delegate doorgeven om bepaalde of alle opties inline in te stellen, bijvoorbeeld om gezondheidscontroles uit de code uit te schakelen.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
U kunt direct een consument configureren vanuit de code.
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue>
en ConsumerBuilder<TKey, TValue>
configureren
Om Confluent.Kafka
builder functies te configureren, gebruikt u een Action<ProducerBuilder<TKey, TValue>>
(of Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Wanneer u producenten en consumenten registreert en u toegang wilt krijgen tot een service die is geregistreerd in de DI-container, kunt u respectievelijk een Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
of Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
doorgeven:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Bekijk het volgende voorbeeld van producentregistratie:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Zie ProducerBuilder<TKey, TValue>
- en ConsumerBuilder<TKey, TValue>
API-documentatie voor meer informatie.
statuscontroles voor Client integratie
Standaard kunnen .NET.NET Aspire integraties statuscontroles voor alle services inschakelen. Zie .NET.NET Aspire overzicht van integratiesvoor meer informatie.
De .NET AspireApache Kafka-integratie verwerkt de volgende statuscontrolescenario's:
- Voegt de
Aspire.Confluent.Kafka.Producer
gezondheidscontrole toe wanneer KafkaProducerSettings.DisableHealthChecksfalse
is. - Voegt de
Aspire.Confluent.Kafka.Consumer
gezondheidscontrole toe wanneer KafkaConsumerSettings.DisableHealthChecksfalse
is. - Kan worden geïntegreerd met het
/health
HTTP-eindpunt, waarmee alle geregistreerde statuscontroles moeten worden doorgegeven zodat de app als gereed wordt beschouwd voor het accepteren van verkeer.
Waarneembaarheid en telemetrie
.NET .NET Aspire integraties automatisch configuraties voor logging, tracering en metriek instellen, die ook wel bekend staan als de pijlers van observeerbaarheid. Zie .NET.NET Aspire overzicht van integratieintegratiesvoor meer informatie over de waarneembaarheid en telemetrie van integraties. Afhankelijk van de back-upservice ondersteunen sommige integraties mogelijk slechts enkele van deze functies. Sommige integraties ondersteunen bijvoorbeeld logboekregistratie en tracering, maar geen metrische gegevens. Telemetriefuncties kunnen ook worden uitgeschakeld met behulp van de technieken die worden weergegeven in de sectie Configuratie.
Logboekregistratie
De .NET AspireApache Kafka-integratie maakt gebruik van de volgende logboekcategorieën:
Aspire.Confluent.Kafka
Tracering
De .NET Aspire-Apache Kafka integratie produceert geen gedistribueerde traces.
Statistieken
De .NET AspireApache Kafka-integratie verzendt de volgende metrische gegevens met behulp van OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received