.NET Aspire Apache Kafka integracja
obejmuje: integracjahostingu i integracja Client
Apache Kafka to rozproszona platforma przesyłania strumieniowego zdarzeń typu open source. Jest to przydatne w przypadku tworzenia potoków danych w czasie rzeczywistym i aplikacji przesyłania strumieniowego. Integracja .NET AspireApache Kafka umożliwia łączenie się z istniejącymi instancjami Kafka lub tworzenie nowych instancji z .NET za pomocą obrazu kontenera docker.io/confluentinc/confluent-local
.
Integracja hostingu
Apache Kafka hostowanie modeluje serwer Kafka jako typ KafkaServerResource. Aby uzyskać dostęp do tego typu, zainstaluj 📦Aspire.Hosting.Kafka pakiet NuGet w projekcie hosta aplikacji , a następnie dodaj go za pomocą konstruktora.
dotnet add package Aspire.Hosting.Kafka
Aby uzyskać więcej informacji, zobacz dotnet add package lub Zarządzanie zależnościami pakietów w .NET aplikacjach.
Dodawanie zasobu serwera Platformy Kafka
W projekcie hosta aplikacji wywołaj AddKafka w wystąpieniu builder
, aby dodać zasób serwera Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Gdy .NET.NET Aspire dodaje obraz kontenera do hosta aplikacji, jak pokazano w poprzednim przykładzie z obrazem docker.io/confluentinc/confluent-local
, tworzy nową instancję serwera Kafka na komputerze lokalnym. Odwołanie do twojego serwera Kafka (zmiennej kafka
) jest dodawane do ExampleProject
. Zasób serwera Kafka zawiera porty domyślne
Metoda WithReference konfiguruje połączenie w ExampleProject
o nazwie "kafka"
. Aby uzyskać więcej informacji, zobacz Cykl życia zasobów kontenera.
Wskazówka
Jeśli wolisz nawiązać połączenie z istniejącym serwerem Kafka, wywołaj AddConnectionString zamiast tego. Aby uzyskać więcej informacji, odnieś się do istniejących zasobów .
Dodawanie interfejsu użytkownika platformy Kafka
Aby dodać interfejs użytkownika Kafka do zasobu serwera Kafka, wywołaj metodę WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Interfejs użytkownika platformy Kafka to bezpłatny internetowy interfejs użytkownika typu open source do monitorowania klastrów Apache Kafka i zarządzania nimi.
.NET
.NET Aspire dodaje kolejny obraz kontenera docker.io/provectuslabs/kafka-ui
do hosta aplikacji z uruchomionym interfejsem użytkownika platformy Kafka.
Zmienianie portu hosta interfejsu użytkownika platformy Kafka
Aby zmienić port hosta UI platformy Kafka, połącz wywołanie metody WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Interfejs użytkownika platformy Kafka jest dostępny pod adresem http://localhost:9100
w poprzednim przykładzie.
Dodawanie zasobu serwera Kafka z woluminem danych
Aby dodać wolumin danych do zasobu serwera Platformy Kafka, wywołaj metodę WithDataVolume na zasobie serwera Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Wolumin danych jest używany do utrwalania danych serwera Kafka poza cyklem życia kontenera. Wolumin danych jest instalowany w ścieżce /var/lib/kafka/data
w kontenerze serwera Kafka, a gdy nie podano parametru name
, nazwa jest generowana losowo. Aby uzyskać więcej informacji na temat wolumenów danych i szczegółów, dlaczego są preferowane zamiast wiązań montowań , zobacz dokumentację Docker: Woluminy.
Dodawanie zasobu serwera Kafka z instalacją powiązania danych
Aby dodać powiązanie danych do zasobu serwera Kafka, wywołaj metodę WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Ważny
Instalacje powiązane danych mają ograniczoną funkcjonalność w porównaniu z woluminami , co zapewnia lepszą wydajność, przenośność i bezpieczeństwo, co czyni je bardziej odpowiednimi dla środowisk produkcyjnych. Jednak wiązane montowanie umożliwia bezpośredni dostęp i modyfikację plików w systemie hosta, co jest idealne do tworzenia oprogramowania i testowania, gdzie potrzebne są zmiany w czasie rzeczywistym.
Montowanie danych opiera się na systemie plików maszyny hosta, aby utrwalać dane serwera Kafka podczas ponownego uruchamiania kontenera. Powiązanie danych jest zamontowane na ścieżce C:\Kafka\Data
w systemie Windows (lub /Kafka/Data
na Unix) na maszynie hosta w kontenerze serwera Kafka. Aby uzyskać więcej informacji na temat instalacji powiązań danych, zobacz Docker docs: Bind mounts.
Hostowanie testów zdrowia integracji
Integracja hostingu platformy Kafka automatycznie dodaje kontrolę kondycji zasobu serwera Kafka. Sprawdzenie kondycji sprawdza, czy producent Kafka z określoną nazwą połączenia może nawiązać połączenie i utrwalić temat na serwerze Kafka.
Integracja hostingu opiera się na pakiecie NuGet 📦 AspNetCore.HealthChecks.Kafka.
integracja Client
Aby rozpocząć pracę z integracją .NET AspireApache Kafka, zainstaluj 📦Aspire.Confluent.Kafka pakiet NuGet w projekcie korzystającym z klienta, czyli w projekcie aplikacji używającej klienta Apache Kafka.
dotnet add package Aspire.Confluent.Kafka
Dodawanie producenta platformy Kafka
W pliku Program.cs projektu korzystającego z klienta wywołaj metodę rozszerzenia AddKafkaProducer, aby zarejestrować IProducer<TKey, TValue>
do użycia za pośrednictwem kontenera wstrzykiwania zależności. Metoda przyjmuje dwa ogólne parametry odpowiadające typowi klucza i typowi komunikatu do wysłania do brokera. Te parametry ogólne są używane przez AddKafkaProducer
do utworzenia wystąpienia ProducerBuilder<TKey, TValue>
. Ta metoda przyjmuje również parametr nazwy połączenia.
builder.AddKafkaProducer<string, string>("messaging");
Następnie można pobrać instancję IProducer<TKey, TValue>
przy użyciu wstrzykiwania zależności. Aby pobrać na przykład producenta z IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Aby uzyskać więcej informacji na temat pracowników, zobacz usługi Worker w .NET.
Dodawanie użytkownika platformy Kafka
Aby zarejestrować IConsumer<TKey, TValue>
do użycia przez kontener wstrzykiwania zależności, wywołaj metodę rozszerzenia AddKafkaConsumer w pliku Program.cs projektu używanego przez klienta. Metoda przyjmuje dwa ogólne parametry odpowiadające typowi klucza i typowi komunikatu odbieranego z brokera. Te parametry ogólne są używane przez AddKafkaConsumer
do utworzenia wystąpienia ConsumerBuilder<TKey, TValue>
. Ta metoda przyjmuje również parametr nazwy połączenia.
builder.AddKafkaConsumer<string, string>("messaging");
Następnie można pobrać instancję IConsumer<TKey, TValue>
przy użyciu wstrzykiwania zależności. Aby na przykład uzyskać dostęp do odbiorcy z IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Dodawanie kluczowych producentów lub użytkowników platformy Kafka
Mogą wystąpić sytuacje, w których chcesz zarejestrować wiele instancji producenta lub konsumenta z różnymi nazwami połączeń. Aby zarejestrować kluczowych producentów lub konsumentów platformy Kafka, wywołaj odpowiedni interfejs API:
- AddKeyedKafkaProducer: rejestruje klucza producenta platformy Kafka.
- AddKeyedKafkaConsumer: rejestruje kluczowego konsumenta platformy Kafka.
Aby uzyskać więcej informacji na temat usług kluczowanych, zobacz .NET wstrzykiwanie zależności: usługi kluczowane.
Konfiguracja
Integracja .NET AspireApache Kafka udostępnia wiele opcji konfigurowania połączenia na podstawie wymagań i konwencji projektu.
Używanie parametrów połączenia
W przypadku używania parametrów połączenia z sekcji konfiguracji ConnectionStrings
można podać nazwę parametrów połączenia podczas wywoływania builder.AddKafkaProducer()
lub builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Następnie parametry połączenia są pobierane z sekcji konfiguracji ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Wartość parametru połączenia jest ustawiona na właściwość BootstrapServers
utworzonego wystąpienia IProducer<TKey, TValue>
lub IConsumer<TKey, TValue>
. Aby uzyskać więcej informacji, zobacz BootstrapServers.
Korzystanie z dostawców konfiguracji
Integracja .NET AspireApache Kafka obsługuje Microsoft.Extensions.Configuration. Ładuje najpierw KafkaProducerSettings, a następnie KafkaConsumerSettings z konfiguracji, przy użyciu kluczy Aspire:Confluent:Kafka:Producer
i Aspire.Confluent:Kafka:Consumer
. Poniższy fragment kodu to przykład pliku appsettings.json, który konfiguruje niektóre opcje:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Właściwości Config
sekcji konfiguracji Aspire:Confluent:Kafka:Producer
i Aspire.Confluent:Kafka:Consumer
są powiązane odpowiednio z wystąpieniami ProducerConfig
i ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
wymaga ustawienia właściwości ClientId
, aby broker mógł śledzić przesunięcia konsumowanych komunikatów.
Aby uzyskać pełny schemat integracji klienta platformy Kafka JSON, zobacz Aspire. Confluent.Kafka/ConfigurationSchema.json.
Używanie delegatów wbudowanych
Istnieje kilka wewnętrznych delegatów dostępnych do konfigurowania różnych opcji.
KonfigurowanieKafkaProducerSettings
i KafkaConsumerSettings
Możesz przekazać delegata Action<KafkaProducerSettings> configureSettings
, aby skonfigurować niektóre lub wszystkie opcje wbudowane, na przykład w celu wyłączenia kontroli kondycji z kodu:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Możesz skonfigurować wbudowany odbiorcę z poziomu kodu:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Konfigurowanie ProducerBuilder<TKey, TValue>
i ConsumerBuilder<TKey, TValue>
Aby konfigurować konstruktorów Confluent.Kafka
, przekaż Action<ProducerBuilder<TKey, TValue>>
(lub Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Rejestrując producentów i konsumentów, jeśli musisz uzyskać dostęp do usługi zarejestrowanej w kontenerze DI, możesz przekazać odpowiednio Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
lub Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Rozważmy następujący przykład rejestracji producenta:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Aby uzyskać więcej informacji, zobacz dokumentację interfejsu API ProducerBuilder<TKey, TValue>
i ConsumerBuilder<TKey, TValue>
.
Client kontrole stanu integracji
Domyślnie .NET.NET Aspire integracje umożliwiają sprawdzanie kondycji dla wszystkich usług. Aby uzyskać więcej informacji, zobacz omówienie integracji .NET.NET Aspire.
Integracja .NET AspireApache Kafka obsługuje następujące scenariusze sprawdzania kondycji:
- Dodaje sprawdzanie kondycji
Aspire.Confluent.Kafka.Producer
, gdy KafkaProducerSettings.DisableHealthChecks jestfalse
. - Dodaje sprawdzanie kondycji
Aspire.Confluent.Kafka.Consumer
, gdy KafkaConsumerSettings.DisableHealthChecks jestfalse
. - Integruje się z punktem końcowym HTTP
/health
, który wymaga, aby wszystkie zarejestrowane kontrole kondycji zostały pomyślnie zakończone, aby aplikacja została uznana za gotową do akceptowania ruchu.
Obserwowanie i telemetria
.NET
.NET Aspire integracje automatycznie konfigurują rejestrowanie, śledzenie i metryki, które są czasami nazywane filarami obserwowalności. Aby uzyskać więcej informacji na temat możliwości obserwacji integracji i telemetrii, zobacz omówienie integracji .NET.NET Aspire. W zależności od usługi pomocniczej niektóre integracje mogą obsługiwać tylko niektóre z tych funkcji. Na przykład niektóre integracje obsługują rejestrowanie i śledzenie, ale nie metryki. Funkcje telemetrii można również wyłączyć przy użyciu technik przedstawionych w sekcji konfiguracji
Logowanie
Integracja .NET AspireApache Kafka używa następujących kategorii dzienników:
Aspire.Confluent.Kafka
Śledzenie
Integracja .NET AspireApache Kafka nie emituje rozproszonych śladów.
Metryki
Integracja .NET AspireApache Kafka emituje następujące metryki przy użyciu OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received