Delen via


integratie van .NET AspireApache Kafka

omvat:hostingintegratie en Client integratie

Apache Kafka is een opensource gedistribueerd gebeurtenisstreamingplatform. Het is handig voor het bouwen van realtime gegevenspijplijnen en streamingtoepassingen. Met de .NET AspireApache Kafka-integratie kunt u verbinding maken met bestaande Kafka-exemplaren of nieuwe exemplaren maken vanuit .NET met de docker.io/confluentinc/confluent-local containerinstallatiekopieën.

Hostingintegratie

Het Apache Kafka-hostingintegratiemodel modelleert een Kafka-server als het KafkaServerResource-type. Installeer het 📦Aspire.Hosting.Kafka NuGet-pakket in het app-host project om toegang te krijgen tot dit type, en voeg het vervolgens toe met de builder.

dotnet add package Aspire.Hosting.Kafka

Zie dotnet pakket toevoegen of Pakketafhankelijkheden beheren in .NET toepassingenvoor meer informatie.

Kafka-serverresource toevoegen

Roep in uw app-hostproject AddKafka aan op het builder exemplaar om een Kafka-serverresource toe te voegen:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Wanneer .NET.NET Aspire een containerafbeelding toevoegt aan de app-host, zoals wordt weergegeven in het vorige voorbeeld met de docker.io/confluentinc/confluent-local-afbeelding, wordt er een nieuw Kafka-serverexemplaar op uw lokale machine gemaakt. Er wordt een verwijzing naar uw Kafka-server (de variabele kafka) toegevoegd aan de ExampleProject. De Kafka-serverresource bevat standaardpoorten

De methode WithReference configureert een verbinding in de ExampleProject met de naam "kafka". Zie levenscyclus van containerresourcesvoor meer informatie.

Tip

Als u liever verbinding wilt maken met een bestaande Kafka-server, roept u in plaats daarvan AddConnectionString aan. Zie Bestaande bronnenvoor meer informatie.

Kafka-gebruikersinterface toevoegen

Als u de Kafka-gebruikersinterface wilt toevoegen aan de Kafka-serverresource, roept u de methode WithKafkaUI aan:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

De Kafka-gebruikersinterface is een gratis opensource-webinterface voor het bewaken en beheren van Apache Kafka clusters. .NET .NET Aspire voegt nog een containerimage docker.io/provectuslabs/kafka-ui toe aan de app-host waarin de Kafka-gebruikersinterface draait.

De kafka UI-hostpoort wijzigen

Als u de kafka UI-hostpoort wilt wijzigen, koppelt u een aanroep aan de WithHostPort methode:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

De Kafka-gebruikersinterface is toegankelijk op http://localhost:9100 in het vorige voorbeeld.

Kafka-serverresource met gegevensvolume toevoegen

Als u een gegevensvolume wilt toevoegen aan de Kafka-serverresource, roept u de WithDataVolume methode aan op de Kafka-serverresource:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Het gegevensvolume wordt gebruikt om de Kafka-servergegevens buiten de levenscyclus van de container te behouden. Het gegevensvolume wordt gekoppeld aan het /var/lib/kafka/data pad in de Kafka-servercontainer en wanneer er geen name parameter wordt opgegeven, wordt de naam willekeurig gegenereerd. Zie de voor meer informatie over gegevensvolumes en details waarom ze de voorkeur hebben boven Docker.

Kafka-serverbron toevoegen met data-bindmount

Om een data-bindmount toe te voegen aan de Kafka-serverresource, roept u de WithDataBindMount-methode aan.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Belangrijk

Gegevens koppelen beperkte functionaliteit hebben in vergelijking met volumes, die betere prestaties, draagbaarheid en beveiliging bieden, waardoor ze geschikter zijn voor productieomgevingen. Bind mounts bieden echter directe toegang tot en wijziging van bestanden op het hostsysteem, ideaal voor ontwikkeling en testen waarbij realtime wijzigingen nodig zijn.

Koppelen van gegevensbindingen zijn afhankelijk van het bestandssysteem van de hostcomputer om de Kafka-servergegevens bij het opnieuw opstarten van de container te behouden. De data-bindmount wordt gekoppeld op het pad C:\Kafka\Data onder Windows (of /Kafka/Data op Unix) op de hostmachine in de Kafka-container. Zie Docker docs: Bindingskoppelingenvoor meer informatie over koppelingskoppelingen voor gegevens.

Gezondheidscontroles voor hostingintegratie

De Integratie van Kafka-hosting voegt automatisch een statuscontrole toe voor de Kafka-serverresource. De statuscontrole controleert of een Kafka-producent met de opgegeven verbindingsnaam verbinding kan maken en een onderwerp kan behouden met de Kafka-server.

De hostingintegratie is afhankelijk van het 📦 AspNetCore.HealthChecks.Kafka NuGet-pakket.

integratie van Client

Installeer de .NET AspireApache Kafkaom aan de slag te gaan met de 📦Aspire-integratie. Confluent.Kafka NuGet-pakket in het clientgebruikte project, dat wil gezegd het project voor de toepassing die gebruikmaakt van de Apache Kafka-client.

dotnet add package Aspire.Confluent.Kafka

Kafka-producent toevoegen

Roep in het Program.cs bestand van het clientgebruikte project de AddKafkaProducer extensiemethode aan om een IProducer<TKey, TValue> te registreren voor gebruik via de container voor afhankelijkheidsinjectie. De methode gebruikt twee algemene parameters die overeenkomen met het type sleutel en het type bericht dat naar de broker moet worden verzonden. Deze algemene parameters worden gebruikt door AddKafkaProducer om een exemplaar van ProducerBuilder<TKey, TValue>te maken. Deze methode gebruikt ook de parameter verbindingsnaam.

builder.AddKafkaProducer<string, string>("messaging");

Vervolgens kunt u het IProducer<TKey, TValue> exemplaar ophalen met behulp van afhankelijkheidsinjectie. Als u bijvoorbeeld de producent wilt ophalen uit een IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Zie Worker-services in .NETvoor meer informatie over werknemers.

Kafka-consument toevoegen

Als u een IConsumer<TKey, TValue> wilt registreren voor gebruik via de container voor afhankelijkheidsinjectie, roept u de AddKafkaConsumer-extensiemethode aan in het Program.cs-bestand van het project waarin de client wordt gebruikt. De methode gebruikt twee algemene parameters die overeenkomen met het type sleutel en het type bericht dat moet worden ontvangen van de broker. Deze algemene parameters worden gebruikt door AddKafkaConsumer om een exemplaar van ConsumerBuilder<TKey, TValue>te maken. Deze methode gebruikt ook de parameter verbindingsnaam.

builder.AddKafkaConsumer<string, string>("messaging");

Vervolgens kunt u het IConsumer<TKey, TValue> exemplaar ophalen met behulp van afhankelijkheidsinjectie. Als u bijvoorbeeld de consument wilt ophalen uit een IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Belangrijke Kafka-producenten of -consumenten toevoegen

Er kunnen situaties zijn waarin u meerdere producenten- of consumentenexemplaren met verschillende verbindingsnamen wilt registreren. Als u de belangrijkste Kafka-producenten of -consumenten wilt registreren, roept u de juiste API aan:

Voor meer informatie over sleutelservices, zie .NET afhankelijkheidsinjectie: Keyed Services.

Configuratie

De .NET AspireApache Kafka-integratie biedt meerdere opties voor het configureren van de verbinding op basis van de vereisten en conventies van uw project.

Een verbindingsreeks gebruiken

Wanneer u een verbindingsreeks uit de sectie ConnectionStrings configuratie gebruikt, kunt u de naam van de verbindingsreeks opgeven bij het aanroepen van builder.AddKafkaProducer() of builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Vervolgens wordt de verbindingsreeks opgehaald uit de ConnectionStrings configuratiesectie:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

De waarde van de verbindingsreeks wordt ingesteld op de eigenschap BootstrapServers van de geproduceerde IProducer<TKey, TValue>- of IConsumer<TKey, TValue>-exemplaren. Zie BootstrapServersvoor meer informatie.

Configuratieproviders gebruiken

De .NET AspireApache Kafka-integratie ondersteunt Microsoft.Extensions.Configuration. Het laadt respectievelijk de KafkaProducerSettings en KafkaConsumerSettings vanuit de configuratie door gebruik te maken van de Aspire:Confluent:Kafka:Producer- en Aspire.Confluent:Kafka:Consumer-sleutels. Het volgende codefragment is een voorbeeld van een appsettings.json-bestand waarmee een aantal van de opties wordt geconfigureerd:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

De Config eigenschappen van zowel Aspire:Confluent:Kafka:Producer als Aspire.Confluent:Kafka:Consumer configuratiesecties binden respectievelijk aan exemplaren van ProducerConfig en ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> vereist dat de eigenschap ClientId wordt ingesteld om de verbruikte offset van berichten door de broker bij te houden.

Zie JSONvoor het volledige Kafka-clientintegratieschema Aspire. Confluent.Kafka/ConfigurationSchema.json.

Gebruik inline-afgevaardigden

Er zijn verschillende inline delegates beschikbaar om verschillende opties te configureren.

KafkaProducerSettings en KafkaConsumerSettings configureren

U kunt de Action<KafkaProducerSettings> configureSettings delegate doorgeven om bepaalde of alle opties inline in te stellen, bijvoorbeeld om gezondheidscontroles uit de code uit te schakelen.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

U kunt direct een consument configureren vanuit de code.

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue> en ConsumerBuilder<TKey, TValue> configureren

Om Confluent.Kafka builder functies te configureren, gebruikt u een Action<ProducerBuilder<TKey, TValue>> (of Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Wanneer u producenten en consumenten registreert en u toegang wilt krijgen tot een service die is geregistreerd in de DI-container, kunt u respectievelijk een Action<IServiceProvider, ProducerBuilder<TKey, TValue>> of Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> doorgeven:

Bekijk het volgende voorbeeld van producentregistratie:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Zie ProducerBuilder<TKey, TValue>- en ConsumerBuilder<TKey, TValue> API-documentatie voor meer informatie.

statuscontroles voor Client integratie

Standaard kunnen .NET.NET Aspire integraties statuscontroles voor alle services inschakelen. Zie .NET.NET Aspire overzicht van integratiesvoor meer informatie.

De .NET AspireApache Kafka-integratie verwerkt de volgende statuscontrolescenario's:

  • Voegt de Aspire.Confluent.Kafka.Producer gezondheidscontrole toe wanneer KafkaProducerSettings.DisableHealthChecksfalseis.
  • Voegt de Aspire.Confluent.Kafka.Consumer gezondheidscontrole toe wanneer KafkaConsumerSettings.DisableHealthChecksfalseis.
  • Kan worden geïntegreerd met het /health HTTP-eindpunt, waarmee alle geregistreerde statuscontroles moeten worden doorgegeven zodat de app als gereed wordt beschouwd voor het accepteren van verkeer.

Waarneembaarheid en telemetrie

.NET .NET Aspire integraties automatisch configuraties voor logging, tracering en metriek instellen, die ook wel bekend staan als de pijlers van observeerbaarheid. Zie .NET.NET Aspire overzicht van integratieintegratiesvoor meer informatie over de waarneembaarheid en telemetrie van integraties. Afhankelijk van de back-upservice ondersteunen sommige integraties mogelijk slechts enkele van deze functies. Sommige integraties ondersteunen bijvoorbeeld logboekregistratie en tracering, maar geen metrische gegevens. Telemetriefuncties kunnen ook worden uitgeschakeld met behulp van de technieken die worden weergegeven in de sectie Configuratie.

Logboekregistratie

De .NET AspireApache Kafka-integratie maakt gebruik van de volgende logboekcategorieën:

  • Aspire.Confluent.Kafka

Tracering

De .NET Aspire-Apache Kafka integratie produceert geen gedistribueerde traces.

Statistieken

De .NET AspireApache Kafka-integratie verzendt de volgende metrische gegevens met behulp van OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Zie ook