Freigeben über


.NET Aspire Apache Kafka-Integration

umfasst Folgendes:Hostingintegration und Client Integration

Apache Kafka ist eine Open-Source-Plattform für verteiltes Ereignisstreaming. Es ist nützlich zum Erstellen von Echtzeit-Datenpipelines und Streaminganwendungen. Mit der .NET AspireApache Kafka-Integration können Sie eine Verbindung mit vorhandenen Kafka-Instanzen herstellen oder neue Instanzen aus .NET mit dem docker.io/confluentinc/confluent-local Containerimageerstellen.

Hosting-Integration

Das Apache Kafka Hosting-Integrationsmodell modelliert ein Kafka server als KafkaServerResource Typ. Um auf diesen Typ zuzugreifen, installieren Sie das 📦Aspire.Hosting.Kafka NuGet Paket im App-Host Projekt und fügen Sie es dann mit dem Builder hinzu.

dotnet add package Aspire.Hosting.Kafka

Weitere Informationen finden Sie unter dotnet add package oder Verwalten von Paketabhängigkeiten in .NET-Anwendungen.

Kafka-server-Ressource hinzufügen

Rufen Sie in Ihrem App-Hostprojekt AddKafka in der builder Instanz auf, um eine Kafka-server-Ressource hinzuzufügen:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Wenn .NET.NET Aspire dem App-Host ein Containerimage hinzufügt, wie im vorherigen Beispiel mit dem docker.io/confluentinc/confluent-local-Image gezeigt, wird eine neue Kafka-server Instanz auf Ihrem lokalen Computer erstellt. Dem serverwird ein Verweis auf Kafka-kafka (die ExampleProject-Variable) hinzugefügt. Die Kafka-server-Ressource enthält Standardports

Die WithReference-Methode konfiguriert eine Verbindung im ExampleProject namens "kafka". Weitere Informationen finden Sie im Abschnitt Containerressourcenlebenszyklus.

Tipp

Wenn Sie lieber eine Verbindung mit einem bestehenden Kafka-serverherstellen möchten, rufen Sie AddConnectionString stattdessen auf. Weitere Informationen finden Sie unter Referenzieren vorhandener Ressourcen.

Kafka-Benutzeroberfläche hinzufügen

Rufen Sie die Methode auf, um die server zur Kafka-WithKafkaUI-Ressource hinzuzufügen:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Die Kafka-Benutzeroberfläche ist eine kostenlose Open-Source-Webbenutzeroberfläche zum Überwachen und Verwalten von Apache Kafka Clustern. .NET .NET Aspire fügt dem Host der App, die die Kafka UI ausführt, ein weiteres Container-Image docker.io/provectuslabs/kafka-ui hinzu.

Kafka-UI-Hostport ändern

Um den Kafka-UI-Hostport zu ändern, verketten Sie einen Aufruf der WithHostPort-Methode:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Die Kafka-Benutzeroberfläche ist im vorherigen Beispiel unter http://localhost:9100 erreichbar.

Hinzufügen der Kafka server Ressource mit Datenvolumen

Rufen Sie zum Hinzufügen eines Datenvolumes zur Kafka-server-Ressource die WithDataVolume-Methode für die Ressource Kafka server auf:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Das Datenvolume wird verwendet, um die Kafka-server Daten außerhalb des Lebenszyklus des Containers zu speichern. Das Datenvolumen wird am /var/lib/kafka/data-Pfad im Kafka-server-Container bereitgestellt und wenn kein name-Parameter angegeben wird, wird ein Name zufällig generiert. Weitere Informationen zu Datenvolumes und Details dazu, warum sie gegenüber Bind-Mountsbevorzugt werden, finden Sie in der Docker-Dokumentation: Volumes.

Hinzufügen einer Kafka-server-Ressource mit Datenbindungs-Bereitstellung

Rufen Sie die server-Methode auf, um eine Datenbindung zur Kafka-WithDataBindMount-Ressource hinzuzufügen.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Wichtig

Daten -Bind-Mounts haben im Vergleich zu Volumeseine eingeschränkte Funktionalität, wobei Volumes eine bessere Leistung, Portabilität und Sicherheit bieten, was sie für Produktionsumgebungen besser geeignet macht. Bind-Mounts ermöglichen jedoch direkten Zugriff auf und Änderung von Dateien auf dem Hostsystem, ideal für Entwicklungen und Tests, bei denen es auf Echtzeitänderungen ankommt.

Datenbindungs-Bereitstellungen basieren auf dem Dateisystem des Hostcomputers, um die Kafka-server Daten über Containerneustarts hinweg beizubehalten. Der Datenbind-Mount wird im Pfad C:\Kafka\Data unter Windows (oder im Pfad /Kafka/Data auf Unix) auf dem Hostrechner im Kafka-server-Container bereitgestellt. Weitere Informationen zu Daten-Bind-Mounts finden Sie in der Docker Dokumentation: Bind-Mounts.

Hosten von Integritätsprüfungen für Integration

Die Kafka-Hostingintegration fügt automatisch eine Integritätsprüfung für die Kafka-server-Ressource hinzu. Die Gesundheitsprüfung überprüft, ob ein Kafka-Produzent mit dem angegebenen Verbindungsnamen eine Verbindung herstellen und ein Topic in Kafka serverspeichern kann.

Die Hostingintegration basiert auf dem 📦 AspNetCore.HealthChecks.Kafka NuGet-Paket.

Client Integration

Um mit der .NET AspireApache Kafka-Integration zu beginnen, installieren Sie das 📦Aspire.Confluent.Kafka NuGet-Paket in dem Projekt, das clientkonsumiert, das heißt, das Projekt für die Anwendung, die die Apache Kafkaclientverwendet.

dotnet add package Aspire.Confluent.Kafka

Kafka-Produzent hinzufügen

Rufen Sie in der Datei Program.cs des client-verbrauchenden Projekts die AddKafkaProducer-Erweiterungsmethode auf, um eine IProducer<TKey, TValue> für die Verwendung über den Container zum Einfügen von Abhängigkeiten zu registrieren. Die Methode verwendet zwei generische Parameter, die dem Typ des Schlüssels entsprechen, und den Typ der Nachricht, die an den Broker gesendet werden soll. Diese generischen Parameter werden von AddKafkaProducer verwendet, um eine Instanz von ProducerBuilder<TKey, TValue>zu erstellen. Diese Methode verwendet auch den Verbindungsnamenparameter.

builder.AddKafkaProducer<string, string>("messaging");

Anschließend können Sie die IProducer<TKey, TValue> Instanz mithilfe der Abhängigkeitseinfügung abrufen. Um beispielsweise den Produzenten aus einer IHostedServiceabzurufen:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Weitere Informationen zu Arbeitnehmern finden Sie unter Worker Services in .NET.

Hinzufügen von Kafka Consumer

Um eine IConsumer<TKey, TValue> zur Verwendung über den Dependency Injection Container zu registrieren, rufen Sie die AddKafkaConsumer-Erweiterungsmethode in der Program.cs-Datei Ihres client-verwendenden Projekts auf. Die Methode verwendet zwei generische Parameter, die dem Typ des Schlüssels und dem Typ der Nachricht entsprechen, die vom Broker empfangen werden soll. Diese generischen Parameter werden von AddKafkaConsumer verwendet, um eine Instanz von ConsumerBuilder<TKey, TValue>zu erstellen. Diese Methode verwendet auch den Verbindungsnamenparameter.

builder.AddKafkaConsumer<string, string>("messaging");

Anschließend können Sie die IConsumer<TKey, TValue> Instanz mithilfe der Abhängigkeitseinfügung abrufen. Um beispielsweise den Verbraucher aus einer IHostedServiceabzurufen:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Hinzufügen von wichtigen Kafka-Produzenten oder Verbrauchern

Es kann Situationen geben, in denen Sie mehrere Produzenten- oder Consumerinstanzen mit unterschiedlichen Verbindungsnamen registrieren möchten. Rufen Sie die entsprechende API auf, um wichtige Kafka-Produzenten oder -Verbraucher zu registrieren:

Weitere Informationen zu schlüsselbasierten Diensten finden Sie unter .NET Abhängigkeitsinjektion: schlüsselbasierte Dienste.

Konfiguration

Die .NET AspireApache Kafka-Integration bietet mehrere Optionen zum Konfigurieren der Verbindung basierend auf den Anforderungen und Konventionen Ihres Projekts.

Verwenden Sie eine Verbindungszeichenfolge

Wenn Sie eine Verbindungszeichenfolge aus dem Konfigurationsabschnitt ConnectionStrings verwenden, können Sie beim Aufrufen von builder.AddKafkaProducer() oder builder.AddKafkaProducer()den Namen der Verbindungszeichenfolge angeben:

builder.AddKafkaProducer<string, string>("kafka-producer");

Anschließend wird die Verbindungszeichenfolge aus dem Konfigurationsabschnitt ConnectionStrings abgerufen:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Der Wert der Verbindungszeichenfolge wird auf die Eigenschaft BootstrapServers der erzeugten IProducer<TKey, TValue>- oder IConsumer<TKey, TValue>-Instanz gesetzt. Weitere Informationen finden Sie unter BootstrapServers.

Verwenden von Konfigurationsanbietern

Die Integration von .NET AspireApache Kafka unterstützt Microsoft.Extensions.Configuration. Es lädt die KafkaProducerSettings oder KafkaConsumerSettings aus der Konfiguration, indem es jeweils die Schlüssel Aspire:Confluent:Kafka:Producer und Aspire.Confluent:Kafka:Consumer verwendet. Der folgende Codeausschnitt ist ein Beispiel für eine appsettings.json Datei, die einige der Optionen konfiguriert:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Die Config-Eigenschaften der Aspire:Confluent:Kafka:Producer- und Aspire.Confluent:Kafka:Consumer-Konfigurationsabschnitte binden jeweils an Instanzen von ProducerConfig und ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> erfordert, dass die ClientId-Eigenschaft festgelegt ist, damit der Broker die verbrauchten Nachrichtenoffsets nachverfolgen kann.

Die vollständige Kafka client Integration JSON Schema finden Sie unter Aspire. Confluent.Kafka/ConfigurationSchema.json.

Verwenden von Inlinedelegatn

Es stehen mehrere Inline-Delegates zur Verfügung, um verschiedene Optionen zu konfigurieren.

Konfigurieren SieKafkaProducerSettings und KafkaConsumerSettings

Sie können den Action<KafkaProducerSettings> configureSettings Delegat übergeben, um einige oder alle Optionen inline einzurichten, z. B. zum Deaktivieren von Integritätsprüfungen aus Code:

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Sie können einen Consumer direkt im Code konfigurieren.

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Konfigurieren Sie ProducerBuilder<TKey, TValue> und ConsumerBuilder<TKey, TValue>

Um Confluent.Kafka Builder zu konfigurieren, übergeben Sie eine Action<ProducerBuilder<TKey, TValue>> (oder Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Wenn Sie bei der Registrierung von Produzenten und Verbrauchern auf einen im DI-Container registrierten Dienst zugreifen müssen, können Sie eine Action<IServiceProvider, ProducerBuilder<TKey, TValue>> bzw. Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> übergeben:

Betrachten Sie das folgende Beispiel für die Registrierung des Herstellers:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Weitere Informationen finden Sie in ProducerBuilder<TKey, TValue>- und ConsumerBuilder<TKey, TValue>-API-Dokumentation.

Client Integritätsprüfungen der Integration

Standardmäßig aktivieren Integrationen .NET.NET Aspire Integritätsprüfungen für alle Dienste. Weitere Informationen finden Sie unter .NET.NET Aspire Integrationsübersicht.

Die .NET AspireApache Kafka-Integration behandelt die folgenden Gesundheitsprüfungsszenarien:

  • Fügt die Aspire.Confluent.Kafka.Producer Gesundheitsüberprüfung hinzu, wenn KafkaProducerSettings.DisableHealthChecksfalseist.
  • Fügt die Aspire.Confluent.Kafka.Consumer Gesundheitsüberprüfung hinzu, wenn KafkaConsumerSettings.DisableHealthChecksfalseist.
  • Integriert sich in den /health HTTP-Endpunkt, der vorgibt, dass alle registrierten Gesundheitschecks bestanden werden müssen, damit die App als bereit zum Empfang von Datenverkehr gilt.

Beobachtbarkeit und Telemetrie

.NET .NET Aspire Integrationen richten automatisch Protokollierungs-, Ablaufverfolgungs- und Metrikkonfigurationen ein, die manchmal als die Säulen der Beobachtbarkeitbezeichnet werden. Weitere Informationen zur Integrationsobservability und Telemetrie finden Sie unter .NET.NET Aspire Übersicht der Integrationen. Abhängig vom unterstützenden Dienst unterstützen einige Integrationen möglicherweise nur einige dieser Funktionen. Beispielsweise unterstützen einige Integrationen Protokollierung und Ablaufverfolgung, aber keine Metriken. Telemetrie-Features können auch mithilfe der Techniken deaktiviert werden, die im Abschnitt Configuration dargestellt werden.

Protokollierung

Die .NET AspireApache Kafka-Integration verwendet die folgenden Protokollkategorien:

  • Aspire.Confluent.Kafka

Nachverfolgung

Die .NET AspireApache Kafka-Integration gibt keine verteilten Traces aus.

Metriken

Die .NET AspireApache Kafka-Integration gibt die folgenden Metriken mithilfe von OpenTelemetryaus:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Siehe auch