.NET Aspire Apache Kafka integracja
obejmuje: integracjahostingu i integracja Client
Apache Kafka to rozproszona platforma przesyłania strumieniowego zdarzeń typu open source. Jest to przydatne w przypadku tworzenia potoków danych w czasie rzeczywistym i aplikacji przesyłania strumieniowego. Integracja .NET AspireApache Kafka umożliwia łączenie się z istniejącymi instancjami Kafka lub tworzenie nowych instancji z .NET za pomocą obrazu kontenera docker.io/confluentinc/confluent-local
.
Integracja hostingu
Model integracji hostowania Apache Kafka modeluje platformę Kafka server jako typ KafkaServerResource. Aby uzyskać dostęp do tego typu, zainstaluj 📦Aspire.Hosting.Kafka pakiet NuGet w projekcie hosta aplikacji , a następnie dodaj go za pomocą konstruktora.
dotnet add package Aspire.Hosting.Kafka
Aby uzyskać więcej informacji, zobacz dotnet add package lub Zarządzanie zależnościami pakietów w .NET aplikacjach.
Dodawanie zasobu platformy Kafka server
W projekcie hosta aplikacji wywołaj AddKafka w wystąpieniu builder
, aby dodać zasób Kafka server.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Gdy .NET.NET Aspire dodaje obraz kontenera do hosta aplikacji, jak pokazano w poprzednim przykładzie z obrazem docker.io/confluentinc/confluent-local
, tworzy instancję Kafka server na komputerze lokalnym. Odniesienie do serwera Kafka server (zmiennej kafka
) jest dodawane do ExampleProject
. Zasób platformy Kafka server zawiera porty domyślne
Metoda WithReference konfiguruje połączenie w ExampleProject
o nazwie "kafka"
. Aby uzyskać więcej informacji, zobacz Cykl życia zasobów kontenera.
Wskazówka
Jeśli wolisz nawiązać połączenie z istniejącą platformą Kafka server, zamiast tego wywołaj AddConnectionString. Aby uzyskać więcej informacji, odnieś się do istniejących zasobów .
Dodawanie interfejsu użytkownika platformy Kafka
Aby dodać interfejs użytkownika platformy Kafka do zasobu platformy Kafka server, wywołaj metodę WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Interfejs użytkownika platformy Kafka to bezpłatny internetowy interfejs użytkownika typu open source do monitorowania klastrów Apache Kafka i zarządzania nimi.
.NET
.NET Aspire dodaje kolejny obraz kontenera docker.io/provectuslabs/kafka-ui
do hosta aplikacji z uruchomionym interfejsem użytkownika platformy Kafka.
Zmienianie portu hosta interfejsu użytkownika platformy Kafka
Aby zmienić port hosta UI platformy Kafka, połącz wywołanie metody WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Interfejs użytkownika platformy Kafka jest dostępny pod adresem http://localhost:9100
w poprzednim przykładzie.
Dodawanie zasobu platformy Kafka server z woluminem danych
Aby dodać wolumin danych do zasobu platformy Kafka server, wywołaj metodę WithDataVolume w zasobie platformy Kafka server:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Wolumin danych służy do zapisywania danych Kafka server niezależnie od cyklu życia kontenera. Wolumin danych jest instalowany w ścieżce /var/lib/kafka/data
w kontenerze platformy Kafka server, a gdy nie podano parametru name
, nazwa jest generowana losowo. Aby uzyskać więcej informacji na temat wolumenów danych i szczegółów, dlaczego są preferowane zamiast wiązań montowań , zobacz dokumentację Docker: Woluminy.
Dodaj zasób Kafka server z punktem podmontowania danych
Aby dodać instalację powiązania danych do zasobu platformy Kafka server, wywołaj metodę WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Ważny
Instalacje powiązane danych mają ograniczoną funkcjonalność w porównaniu z woluminami , co zapewnia lepszą wydajność, przenośność i bezpieczeństwo, co czyni je bardziej odpowiednimi dla środowisk produkcyjnych. Jednak wiązane montowanie umożliwia bezpośredni dostęp i modyfikację plików w systemie hosta, co jest idealne do tworzenia oprogramowania i testowania, gdzie potrzebne są zmiany w czasie rzeczywistym.
Instalacje powiązania danych polegają na systemie plików maszyny hosta w celu utrwalania danych platformy Kafka server między ponownymi uruchomieniami kontenera. Montowanie powiązania danych jest zamontowane na ścieżce C:\Kafka\Data
w systemie Windows (lub /Kafka/Data
w Unix) na maszynie hosta w kontenerze Kafka server. Aby uzyskać więcej informacji na temat instalacji powiązań danych, zobacz Docker docs: Bind mounts.
Hostowanie testów zdrowia integracji
Integracja hostingu platformy Kafka automatycznie dodaje kontrolę kondycji zasobu platformy Kafka server. Kontrola kondycji sprawdza, czy producent Kafka o określonej nazwie połączenia może nawiązać połączenie i zapisać temat do Kafka server.
Integracja hostingu opiera się na pakiecie NuGet 📦 AspNetCore.HealthChecks.Kafka.
integracja Client
Aby rozpocząć pracę z integracją .NET AspireApache Kafka, zainstaluj pakiet NuGet 📦Aspire.Confluent.Kafka w projekcie używającym client, czyli w projekcie aplikacji korzystającej z Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Dodawanie producenta platformy Kafka
W pliku Program.cs projektu clientwywołaj metodę rozszerzenia AddKafkaProducer, aby zarejestrować IProducer<TKey, TValue>
do użycia przez kontener iniekcji zależności. Metoda przyjmuje dwa ogólne parametry odpowiadające typowi klucza i typowi komunikatu do wysłania do brokera. Te parametry ogólne są używane przez AddKafkaProducer
do utworzenia wystąpienia ProducerBuilder<TKey, TValue>
. Ta metoda przyjmuje również parametr nazwy połączenia.
builder.AddKafkaProducer<string, string>("messaging");
Następnie można pobrać instancję IProducer<TKey, TValue>
przy użyciu wstrzykiwania zależności. Aby pobrać na przykład producenta z IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Aby uzyskać więcej informacji na temat pracowników, zobacz usługi Worker w .NET.
Dodawanie użytkownika platformy Kafka
Aby zarejestrować IConsumer<TKey, TValue>
do użycia za pośrednictwem kontenera iniekcji zależności, wywołaj metodę rozszerzenia AddKafkaConsumer w pliku Program.cs projektu korzystającego z client. Metoda przyjmuje dwa ogólne parametry odpowiadające typowi klucza i typowi komunikatu odbieranego z brokera. Te parametry ogólne są używane przez AddKafkaConsumer
do utworzenia wystąpienia ConsumerBuilder<TKey, TValue>
. Ta metoda przyjmuje również parametr nazwy połączenia.
builder.AddKafkaConsumer<string, string>("messaging");
Następnie można pobrać instancję IConsumer<TKey, TValue>
przy użyciu wstrzykiwania zależności. Aby na przykład uzyskać dostęp do odbiorcy z IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Dodawanie kluczowych producentów lub użytkowników platformy Kafka
Mogą wystąpić sytuacje, w których chcesz zarejestrować wiele instancji producenta lub konsumenta z różnymi nazwami połączeń. Aby zarejestrować kluczowych producentów lub konsumentów platformy Kafka, wywołaj odpowiedni interfejs API:
- AddKeyedKafkaProducer: rejestruje klucza producenta platformy Kafka.
- AddKeyedKafkaConsumer: rejestruje kluczowego konsumenta platformy Kafka.
Aby uzyskać więcej informacji na temat usług kluczowanych, zobacz .NET wstrzykiwanie zależności: usługi kluczowane.
Konfiguracja
Integracja .NET AspireApache Kafka udostępnia wiele opcji konfigurowania połączenia na podstawie wymagań i konwencji projektu.
Używanie parametrów połączenia
W przypadku używania parametrów połączenia z sekcji konfiguracji ConnectionStrings
można podać nazwę parametrów połączenia podczas wywoływania builder.AddKafkaProducer()
lub builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Następnie parametry połączenia są pobierane z sekcji konfiguracji ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Wartość parametru połączenia jest ustawiona na właściwość BootstrapServers
utworzonego wystąpienia IProducer<TKey, TValue>
lub IConsumer<TKey, TValue>
. Aby uzyskać więcej informacji, zobacz BootstrapServers.
Korzystanie z dostawców konfiguracji
Integracja .NET AspireApache Kafka obsługuje Microsoft.Extensions.Configuration. Ładuje najpierw KafkaProducerSettings, a następnie KafkaConsumerSettings z konfiguracji, przy użyciu kluczy Aspire:Confluent:Kafka:Producer
i Aspire.Confluent:Kafka:Consumer
. Poniższy fragment kodu to przykład pliku appsettings.json, który konfiguruje niektóre opcje:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Właściwości Config
sekcji konfiguracji Aspire:Confluent:Kafka:Producer
i Aspire.Confluent:Kafka:Consumer
są powiązane odpowiednio z wystąpieniami ProducerConfig
i ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
wymaga ustawienia właściwości ClientId
, aby broker mógł śledzić przesunięcia konsumowanych komunikatów.
Aby uzyskać pełny schemat integracji platformy Kafka clientJSON, zobacz Aspire. Confluent.Kafka/ConfigurationSchema.json.
Używanie delegatów wbudowanych
Istnieje kilka wewnętrznych delegatów dostępnych do konfigurowania różnych opcji.
KonfigurowanieKafkaProducerSettings
i KafkaConsumerSettings
Możesz przekazać delegata Action<KafkaProducerSettings> configureSettings
, aby skonfigurować niektóre lub wszystkie opcje wbudowane, na przykład w celu wyłączenia kontroli kondycji z kodu:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Możesz skonfigurować wbudowany odbiorcę z poziomu kodu:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Konfigurowanie ProducerBuilder<TKey, TValue>
i ConsumerBuilder<TKey, TValue>
Aby konfigurować konstruktorów Confluent.Kafka
, przekaż Action<ProducerBuilder<TKey, TValue>>
(lub Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Rejestrując producentów i konsumentów, jeśli musisz uzyskać dostęp do usługi zarejestrowanej w kontenerze DI, możesz przekazać odpowiednio Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
lub Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Rozważmy następujący przykład rejestracji producenta:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Aby uzyskać więcej informacji, zobacz dokumentację interfejsu API ProducerBuilder<TKey, TValue>
i ConsumerBuilder<TKey, TValue>
.
Client kontrole stanu integracji
Domyślnie .NET.NET Aspire integracje umożliwiają sprawdzanie kondycji dla wszystkich usług. Aby uzyskać więcej informacji, zobacz omówienie integracji .NET.NET Aspire.
Integracja .NET AspireApache Kafka obsługuje następujące scenariusze sprawdzania kondycji:
- Dodaje sprawdzanie kondycji
Aspire.Confluent.Kafka.Producer
, gdy KafkaProducerSettings.DisableHealthChecks jestfalse
. - Dodaje sprawdzanie kondycji
Aspire.Confluent.Kafka.Consumer
, gdy KafkaConsumerSettings.DisableHealthChecks jestfalse
. - Integruje się z punktem końcowym HTTP
/health
, który wymaga, aby wszystkie zarejestrowane kontrole kondycji zostały pomyślnie zakończone, aby aplikacja została uznana za gotową do akceptowania ruchu.
Obserwowanie i telemetria
.NET
.NET Aspire integracje automatycznie konfigurują rejestrowanie, śledzenie i metryki, które są czasami nazywane filarami obserwowalności. Aby uzyskać więcej informacji na temat możliwości obserwacji integracji i telemetrii, zobacz omówienie integracji .NET.NET Aspire. W zależności od usługi pomocniczej niektóre integracje mogą obsługiwać tylko niektóre z tych funkcji. Na przykład niektóre integracje obsługują rejestrowanie i śledzenie, ale nie metryki. Funkcje telemetrii można również wyłączyć przy użyciu technik przedstawionych w sekcji konfiguracji
Logowanie
Integracja .NET AspireApache Kafka używa następujących kategorii dzienników:
Aspire.Confluent.Kafka
Śledzenie
Integracja .NET AspireApache Kafka nie emituje rozproszonych śladów.
Metryki
Integracja .NET AspireApache Kafka emituje następujące metryki przy użyciu OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received