Dela via


.NET Aspire Apache Kafka integrering

omfattar:Hostingintegration och Client integration

Apache Kafka är en plattform för distribuerad händelseströmning med öppen källkod. Det är användbart för att bygga datapipelines i realtid och strömmande applikationer. Med integreringen .NET AspireApache Kafka kan du ansluta till befintliga Kafka-instanser eller skapa nya instanser från .NET med docker.io/confluentinc/confluent-local containeravbildningen.

Värdintegrering

Apache Kafka som är värd för integrering modellerar en Kafka-server som KafkaServerResource typ. Om du vill komma åt den här typen installerar du 📦Aspire.Hosting.Kafka NuGet-paketet i appvärdprojektet för , och lägg sedan till det med buildern.

dotnet add package Aspire.Hosting.Kafka

Mer information finns i dotnet add package eller Hantera paketberoenden i .NET applikationer.

Lägg till Kafka-serverresurs

I appvärdprojektet anropar du AddKafkabuilder-instansen för att lägga till en Kafka-serverresurs:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

När .NET.NET Aspire lägger till en containerbild i apputrymmet, som visas i det föregående exemplet med docker.io/confluentinc/confluent-local-bilden, skapar det en ny Kafka-serverinstans på din lokala dator. En referens till Kafka-servern (variabeln kafka) läggs till i ExampleProject. Kafka-serverresursen innehåller standardportar

Metoden WithReference konfigurerar en anslutning i ExampleProject med namnet "kafka". Mer information finns i Livscykel för containerresurser.

Tips

Om du hellre vill ansluta till en befintlig Kafka-server anropar du AddConnectionString i stället. Mer information finns i Referera till befintliga resurser.

Lägg till Kafka-användargränssnitt

Om du vill lägga till Kafka-användargränssnittet till Kafka-serverresursen anropar du metoden WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka-användargränssnittet är ett kostnadsfritt webbgränssnitt med öppen källkod för att övervaka och hantera Apache Kafka kluster. .NET .NET Aspire lägger till ytterligare en containeravbildning docker.io/provectuslabs/kafka-ui till appvärden som kör Kafka-användargränssnittet.

Ändra värdporten för Kafka-användargränssnittet

Om du vill ändra värdporten för Kafka-användargränssnittet kedjar du ett anrop till metoden WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka-användargränssnittet är tillgängligt på http://localhost:9100 i föregående exempel.

Lägga till Kafka-serverresurs med datavolym

Om du vill lägga till en datavolym i Kafka-serverresursen anropar du metoden WithDataVolume på Kafka-serverresursen:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Datavolymen används för att bevara Kafka-serverdata utanför containerns livscykel. Datavolymen monteras på sökvägen /var/lib/kafka/data i Kafka-servercontainern och när en name parameter inte anges, genereras namnet slumpvis. Mer information om datavolymer samt varför de föredras framför bindmountsfinns i dokumentet Docker Volymer.

Lägg till Kafka-serverresurs med databindningspunkt

Om du vill lägga till en databindningsmontering till Kafka-serverresursen anropar du metoden WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Viktig

Data bindningsmonteringar har begränsade funktioner jämfört med volymer, vilket ger bättre prestanda, portabilitet och säkerhet, vilket gör dem mer lämpliga för produktionsmiljöer. Bindningsmonteringar tillåter dock direkt åtkomst och ändring av filer i värdsystemet, perfekt för utveckling och testning där realtidsändringar behövs.

Databindningsmonteringar förlitar sig på värddatorns filsystem för att bevara Kafka-serverdata mellan omstarter av containrar. Databindningspunkten monteras i sökvägen C:\Kafka\Data på Windows (eller /Kafka/Data på Unix) på värddatorn i Kafka-serverns container. Mer information om bindningar av data finns i dokumentationen för Docker: Monteringar av bindningar.

Genomföra hälsokontroller för integrationer

Kafka-värdintegrering lägger automatiskt till en hälsokontroll för Kafka-serverresursen. Hälsokontrollen verifierar att en Kafka-producent med det angivna anslutningsnamnet kan ansluta och bevara ett ämne till Kafka-servern.

Värdtjänstintegration förlitar sig på 📦 AspNetCore.HealthChecks.Kafka NuGet-paket.

Client integrering

Kom igång med .NET AspireApache Kafka-integreringen genom att installera 📦Aspire. Confluent.Kafka NuGet-paket i det klientkrävande projektet, det vill säga projektet för det program som använder Apache Kafka-klienten.

dotnet add package Aspire.Confluent.Kafka

Lägg till Kafka-producent

Anropa Program.cs-tilläggsmetoden i AddKafkaProducer-filen för ditt klientprojekt för att registrera en IProducer<TKey, TValue> för användning via beroendeinjektionscontainern. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska skickas till meddelandemäklaren. Dessa allmänna parametrar används av AddKafkaProducer för att skapa en instans av ProducerBuilder<TKey, TValue>. Den här metoden använder även parametern anslutningsnamn.

builder.AddKafkaProducer<string, string>("messaging");

Du kan sedan hämta IProducer<TKey, TValue>-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta producenten från en IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Mer information om arbetare finns i Worker-tjänster i .NET.

Lägg till Kafka-konsument

Om du vill registrera en IConsumer<TKey, TValue> för användning via containern för beroendeinjektion anropar du AddKafkaConsumer-tilläggsmetoden i Program.cs-filen för ditt klientprojekt. Metoden tar två generiska parametrar som motsvarar typen av nyckel och typen av meddelande som ska tas emot från mäklaren. Dessa allmänna parametrar används av AddKafkaConsumer för att skapa en instans av ConsumerBuilder<TKey, TValue>. Den här metoden använder även parametern anslutningsnamn.

builder.AddKafkaConsumer<string, string>("messaging");

Du kan sedan hämta IConsumer<TKey, TValue>-instansen med hjälp av beroendeinjektion. Om du till exempel vill hämta en konsument från IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Lägg till nyckelade Kafka-producenter eller konsumenter

Det kan finnas situationer där du vill registrera flera producent- eller konsumentinstanser med olika anslutningsnamn. Om du vill registrera nyckelade Kafka-producenter eller konsumenter anropar du lämpligt API:

För mer information om nyckelade tjänster, se .NET beroendeinjektion: Nyckelade tjänster.

Konfiguration

Den .NET AspireApache Kafka integreringen innehåller flera alternativ för att konfigurera anslutningen baserat på kraven och konventionerna i ditt projekt.

Använda en anslutningssträng

När du använder en anslutningssträng från ConnectionStrings-konfigurationsavsnittet kan du ange namnet på anslutningssträngen när du anropar builder.AddKafkaProducer() eller builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Sedan hämtas anslutningssträngen från ConnectionStrings-konfigurationsavsnittet:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Anslutningssträngsvärdet anges till egenskapen BootstrapServers för den producerade IProducer<TKey, TValue>- eller IConsumer<TKey, TValue>-instansen. Mer information finns i BootstrapServers.

Använda konfigurationsprovidrar

.NET Aspire Apache Kafka-integreringen stöder Microsoft.Extensions.Configuration. Den läser in KafkaProducerSettings eller KafkaConsumerSettings från konfigurationen med hjälp av Aspire:Confluent:Kafka:Producer respektive Aspire.Confluent:Kafka:Consumer nycklar. Följande kodfragment är ett exempel på en appsettings.json fil som konfigurerar några av alternativen:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Egenskaperna för Config för konfigurationsavsnitten både Aspire:Confluent:Kafka:Producer och Aspire.Confluent:Kafka:Consumer binder till instanser av ProducerConfig och ConsumerConfigrespektive.

Confluent.Kafka.Consumer<TKey, TValue> kräver att egenskapen ClientId anges så att mäklaren spårar förbrukade meddelandeförskjutningar.

Det fullständiga schemat för Kafka-klientintegrering JSON finns i Aspire. Confluent.Kafka/ConfigurationSchema.json.

Använd inline-delegater

Det finns flera interna ombud tillgängliga för att konfigurera olika alternativ.

KonfigureraKafkaProducerSettings och KafkaConsumerSettings

Du kan skicka delegaten Action<KafkaProducerSettings> configureSettings för att konfigurera vissa eller alla alternativ direkt i koden, till exempel för att inaktivera hälsokontroller från koden.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Du kan konfigurera en konsument inline från koden.

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Konfigurera ProducerBuilder<TKey, TValue> och ConsumerBuilder<TKey, TValue>

För att konfigurera Confluent.Kafka byggare, skicka en Action<ProducerBuilder<TKey, TValue>> (eller Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

När du registrerar producenter och konsumenter kan du skicka en Action<IServiceProvider, ProducerBuilder<TKey, TValue>> eller Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> om du behöver komma åt en tjänst som är registrerad i DI-containern:

Tänk dig följande exempel på producentregistrering:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Mer information finns i dokumentationen om ProducerBuilder<TKey, TValue> och ConsumerBuilder<TKey, TValue> API.

Client hälsokontroller för integrering

Som standard aktiverar .NET.NET Aspire integreringar hälsokontroller för alla tjänster. Mer information finns i översikten över .NET.NET Aspire integreringar.

Integreringen .NET AspireApache Kafka hanterar följande hälsokontrollscenarier:

Observerbarhet och telemetri

.NET .NET Aspire integreringar ställer automatiskt in loggning, spårning och mätvärden, som ibland benämns grundpelarna för observerbarhet. Mer information om integreringsobservabilitet och telemetri finns i översikten över .NET.NET Aspire integreringar. Beroende på säkerhetskopieringstjänsten kanske vissa integreringar bara stöder vissa av dessa funktioner. Vissa integreringar stöder till exempel loggning och spårning, men inte mått. Telemetrifunktioner kan också inaktiveras med hjälp av de tekniker som visas i avsnittet Configuration.

Skogsavverkning

.NET Aspire Apache Kafka-integreringen använder följande loggkategorier:

  • Aspire.Confluent.Kafka

Spårning

Den .NET AspireApache Kafka integreringen genererar inte distribuerade spårningar.

Mätvärden

Integreringen .NET AspireApache Kafka genererar följande mått med hjälp av OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Se även