.NET Aspire Apache Kafka-Integration
umfasst Folgendes:Hostingintegration und Client Integration
Apache Kafka ist eine Open-Source-Plattform für verteiltes Ereignisstreaming. Es ist nützlich zum Erstellen von Echtzeit-Datenpipelines und Streaminganwendungen. Mit der .NET AspireApache Kafka-Integration können Sie eine Verbindung mit vorhandenen Kafka-Instanzen herstellen oder neue Instanzen aus .NET mit dem docker.io/confluentinc/confluent-local
Containerimageerstellen.
Hosting-Integration
Das Apache Kafka Hosting-Integrationsmodell modelliert ein Kafka server als KafkaServerResource Typ. Um auf diesen Typ zuzugreifen, installieren Sie das 📦Aspire.Hosting.Kafka NuGet Paket im App-Host Projekt und fügen Sie es dann mit dem Builder hinzu.
dotnet add package Aspire.Hosting.Kafka
Weitere Informationen finden Sie unter dotnet add package oder Verwalten von Paketabhängigkeiten in .NET-Anwendungen.
Kafka-server-Ressource hinzufügen
Rufen Sie in Ihrem App-Hostprojekt AddKafka in der builder
Instanz auf, um eine Kafka-server-Ressource hinzuzufügen:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Wenn .NET.NET Aspire dem App-Host ein Containerimage hinzufügt, wie im vorherigen Beispiel mit dem docker.io/confluentinc/confluent-local
-Image gezeigt, wird eine neue Kafka-server Instanz auf Ihrem lokalen Computer erstellt. Dem serverwird ein Verweis auf Kafka-kafka
(die ExampleProject
-Variable) hinzugefügt. Die Kafka-server-Ressource enthält Standardports
Die WithReference-Methode konfiguriert eine Verbindung im ExampleProject
namens "kafka"
. Weitere Informationen finden Sie im Abschnitt Containerressourcenlebenszyklus.
Tipp
Wenn Sie lieber eine Verbindung mit einem bestehenden Kafka-serverherstellen möchten, rufen Sie AddConnectionString stattdessen auf. Weitere Informationen finden Sie unter Referenzieren vorhandener Ressourcen.
Kafka-Benutzeroberfläche hinzufügen
Rufen Sie die Methode auf, um die server zur Kafka-WithKafkaUI-Ressource hinzuzufügen:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Die Kafka-Benutzeroberfläche ist eine kostenlose Open-Source-Webbenutzeroberfläche zum Überwachen und Verwalten von Apache Kafka Clustern.
.NET
.NET Aspire fügt dem Host der App, die die Kafka UI ausführt, ein weiteres Container-Image docker.io/provectuslabs/kafka-ui
hinzu.
Kafka-UI-Hostport ändern
Um den Kafka-UI-Hostport zu ändern, verketten Sie einen Aufruf der WithHostPort-Methode:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Die Kafka-Benutzeroberfläche ist im vorherigen Beispiel unter http://localhost:9100
erreichbar.
Hinzufügen der Kafka server Ressource mit Datenvolumen
Rufen Sie zum Hinzufügen eines Datenvolumes zur Kafka-server-Ressource die WithDataVolume-Methode für die Ressource Kafka server auf:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Das Datenvolume wird verwendet, um die Kafka-server Daten außerhalb des Lebenszyklus des Containers zu speichern. Das Datenvolumen wird am /var/lib/kafka/data
-Pfad im Kafka-server-Container bereitgestellt und wenn kein name
-Parameter angegeben wird, wird ein Name zufällig generiert. Weitere Informationen zu Datenvolumes und Details dazu, warum sie gegenüber Bind-Mountsbevorzugt werden, finden Sie in der Docker-Dokumentation: Volumes.
Hinzufügen einer Kafka-server-Ressource mit Datenbindungs-Bereitstellung
Rufen Sie die server-Methode auf, um eine Datenbindung zur Kafka-WithDataBindMount-Ressource hinzuzufügen.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Wichtig
Daten -Bind-Mounts haben im Vergleich zu Volumeseine eingeschränkte Funktionalität, wobei Volumes eine bessere Leistung, Portabilität und Sicherheit bieten, was sie für Produktionsumgebungen besser geeignet macht. Bind-Mounts ermöglichen jedoch direkten Zugriff auf und Änderung von Dateien auf dem Hostsystem, ideal für Entwicklungen und Tests, bei denen es auf Echtzeitänderungen ankommt.
Datenbindungs-Bereitstellungen basieren auf dem Dateisystem des Hostcomputers, um die Kafka-server Daten über Containerneustarts hinweg beizubehalten. Der Datenbind-Mount wird im Pfad C:\Kafka\Data
unter Windows (oder im Pfad /Kafka/Data
auf Unix) auf dem Hostrechner im Kafka-server-Container bereitgestellt. Weitere Informationen zu Daten-Bind-Mounts finden Sie in der Docker Dokumentation: Bind-Mounts.
Hosten von Integritätsprüfungen für Integration
Die Kafka-Hostingintegration fügt automatisch eine Integritätsprüfung für die Kafka-server-Ressource hinzu. Die Gesundheitsprüfung überprüft, ob ein Kafka-Produzent mit dem angegebenen Verbindungsnamen eine Verbindung herstellen und ein Topic in Kafka serverspeichern kann.
Die Hostingintegration basiert auf dem 📦 AspNetCore.HealthChecks.Kafka NuGet-Paket.
Client Integration
Um mit der .NET AspireApache Kafka-Integration zu beginnen, installieren Sie das 📦Aspire.Confluent.Kafka NuGet-Paket in dem Projekt, das clientkonsumiert, das heißt, das Projekt für die Anwendung, die die Apache Kafkaclientverwendet.
dotnet add package Aspire.Confluent.Kafka
Kafka-Produzent hinzufügen
Rufen Sie in der Datei Program.cs des client-verbrauchenden Projekts die AddKafkaProducer-Erweiterungsmethode auf, um eine IProducer<TKey, TValue>
für die Verwendung über den Container zum Einfügen von Abhängigkeiten zu registrieren. Die Methode verwendet zwei generische Parameter, die dem Typ des Schlüssels entsprechen, und den Typ der Nachricht, die an den Broker gesendet werden soll. Diese generischen Parameter werden von AddKafkaProducer
verwendet, um eine Instanz von ProducerBuilder<TKey, TValue>
zu erstellen. Diese Methode verwendet auch den Verbindungsnamenparameter.
builder.AddKafkaProducer<string, string>("messaging");
Anschließend können Sie die IProducer<TKey, TValue>
Instanz mithilfe der Abhängigkeitseinfügung abrufen. Um beispielsweise den Produzenten aus einer IHostedService
abzurufen:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Weitere Informationen zu Arbeitnehmern finden Sie unter Worker Services in .NET.
Hinzufügen von Kafka Consumer
Um eine IConsumer<TKey, TValue>
zur Verwendung über den Dependency Injection Container zu registrieren, rufen Sie die AddKafkaConsumer-Erweiterungsmethode in der Program.cs-Datei Ihres client-verwendenden Projekts auf. Die Methode verwendet zwei generische Parameter, die dem Typ des Schlüssels und dem Typ der Nachricht entsprechen, die vom Broker empfangen werden soll. Diese generischen Parameter werden von AddKafkaConsumer
verwendet, um eine Instanz von ConsumerBuilder<TKey, TValue>
zu erstellen. Diese Methode verwendet auch den Verbindungsnamenparameter.
builder.AddKafkaConsumer<string, string>("messaging");
Anschließend können Sie die IConsumer<TKey, TValue>
Instanz mithilfe der Abhängigkeitseinfügung abrufen. Um beispielsweise den Verbraucher aus einer IHostedService
abzurufen:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Hinzufügen von wichtigen Kafka-Produzenten oder Verbrauchern
Es kann Situationen geben, in denen Sie mehrere Produzenten- oder Consumerinstanzen mit unterschiedlichen Verbindungsnamen registrieren möchten. Rufen Sie die entsprechende API auf, um wichtige Kafka-Produzenten oder -Verbraucher zu registrieren:
- AddKeyedKafkaProducer: Registriert einen Kafka-Produzenten mit Schlüssel.
- AddKeyedKafkaConsumer: Registriert einen Kafka-Consumer mit Schlüsseln.
Weitere Informationen zu schlüsselbasierten Diensten finden Sie unter .NET Abhängigkeitsinjektion: schlüsselbasierte Dienste.
Konfiguration
Die .NET AspireApache Kafka-Integration bietet mehrere Optionen zum Konfigurieren der Verbindung basierend auf den Anforderungen und Konventionen Ihres Projekts.
Verwenden Sie eine Verbindungszeichenfolge
Wenn Sie eine Verbindungszeichenfolge aus dem Konfigurationsabschnitt ConnectionStrings
verwenden, können Sie beim Aufrufen von builder.AddKafkaProducer()
oder builder.AddKafkaProducer()
den Namen der Verbindungszeichenfolge angeben:
builder.AddKafkaProducer<string, string>("kafka-producer");
Anschließend wird die Verbindungszeichenfolge aus dem Konfigurationsabschnitt ConnectionStrings
abgerufen:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Der Wert der Verbindungszeichenfolge wird auf die Eigenschaft BootstrapServers
der erzeugten IProducer<TKey, TValue>
- oder IConsumer<TKey, TValue>
-Instanz gesetzt. Weitere Informationen finden Sie unter BootstrapServers.
Verwenden von Konfigurationsanbietern
Die Integration von .NET AspireApache Kafka unterstützt Microsoft.Extensions.Configuration. Es lädt die KafkaProducerSettings oder KafkaConsumerSettings aus der Konfiguration, indem es jeweils die Schlüssel Aspire:Confluent:Kafka:Producer
und Aspire.Confluent:Kafka:Consumer
verwendet. Der folgende Codeausschnitt ist ein Beispiel für eine appsettings.json Datei, die einige der Optionen konfiguriert:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Die Config
-Eigenschaften der Aspire:Confluent:Kafka:Producer
- und Aspire.Confluent:Kafka:Consumer
-Konfigurationsabschnitte binden jeweils an Instanzen von ProducerConfig
und ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
erfordert, dass die ClientId
-Eigenschaft festgelegt ist, damit der Broker die verbrauchten Nachrichtenoffsets nachverfolgen kann.
Die vollständige Kafka client Integration JSON Schema finden Sie unter Aspire. Confluent.Kafka/ConfigurationSchema.json.
Verwenden von Inlinedelegatn
Es stehen mehrere Inline-Delegates zur Verfügung, um verschiedene Optionen zu konfigurieren.
Konfigurieren SieKafkaProducerSettings
und KafkaConsumerSettings
Sie können den Action<KafkaProducerSettings> configureSettings
Delegat übergeben, um einige oder alle Optionen inline einzurichten, z. B. zum Deaktivieren von Integritätsprüfungen aus Code:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Sie können einen Consumer direkt im Code konfigurieren.
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Konfigurieren Sie ProducerBuilder<TKey, TValue>
und ConsumerBuilder<TKey, TValue>
Um Confluent.Kafka
Builder zu konfigurieren, übergeben Sie eine Action<ProducerBuilder<TKey, TValue>>
(oder Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Wenn Sie bei der Registrierung von Produzenten und Verbrauchern auf einen im DI-Container registrierten Dienst zugreifen müssen, können Sie eine Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
bzw. Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
übergeben:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Betrachten Sie das folgende Beispiel für die Registrierung des Herstellers:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Weitere Informationen finden Sie in ProducerBuilder<TKey, TValue>
- und ConsumerBuilder<TKey, TValue>
-API-Dokumentation.
Client Integritätsprüfungen der Integration
Standardmäßig aktivieren Integrationen .NET.NET Aspire Integritätsprüfungen für alle Dienste. Weitere Informationen finden Sie unter .NET.NET Aspire Integrationsübersicht.
Die .NET AspireApache Kafka-Integration behandelt die folgenden Gesundheitsprüfungsszenarien:
- Fügt die
Aspire.Confluent.Kafka.Producer
Gesundheitsüberprüfung hinzu, wenn KafkaProducerSettings.DisableHealthChecksfalse
ist. - Fügt die
Aspire.Confluent.Kafka.Consumer
Gesundheitsüberprüfung hinzu, wenn KafkaConsumerSettings.DisableHealthChecksfalse
ist. - Integriert sich in den
/health
HTTP-Endpunkt, der vorgibt, dass alle registrierten Gesundheitschecks bestanden werden müssen, damit die App als bereit zum Empfang von Datenverkehr gilt.
Beobachtbarkeit und Telemetrie
.NET .NET Aspire Integrationen richten automatisch Protokollierungs-, Ablaufverfolgungs- und Metrikkonfigurationen ein, die manchmal als die Säulen der Beobachtbarkeitbezeichnet werden. Weitere Informationen zur Integrationsobservability und Telemetrie finden Sie unter .NET.NET Aspire Übersicht der Integrationen. Abhängig vom unterstützenden Dienst unterstützen einige Integrationen möglicherweise nur einige dieser Funktionen. Beispielsweise unterstützen einige Integrationen Protokollierung und Ablaufverfolgung, aber keine Metriken. Telemetrie-Features können auch mithilfe der Techniken deaktiviert werden, die im Abschnitt Configuration dargestellt werden.
Protokollierung
Die .NET AspireApache Kafka-Integration verwendet die folgenden Protokollkategorien:
Aspire.Confluent.Kafka
Nachverfolgung
Die .NET AspireApache Kafka-Integration gibt keine verteilten Traces aus.
Metriken
Die .NET AspireApache Kafka-Integration gibt die folgenden Metriken mithilfe von OpenTelemetryaus:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received