integrace .NET AspireApache Kafka
zahrnuje:integraci hostování a integraci Client
Apache Kafka je open source distribuovaná platforma streamování událostí. Je užitečné pro vytváření datových kanálů v reálném čase a streamovacích aplikací. Integrace .NET AspireApache Kafka umožňuje připojit se k existujícím instancím Kafka nebo vytvářet nové instance z .NET pomocí image kontejneru docker.io/confluentinc/confluent-local
.
Integrace hostování
Apache Kafka integrace hostování modeluje server Kafka jako typ KafkaServerResource. Chcete-li získat přístup k tomuto typu, nainstalujte balíček NuGet 📦Aspire.Hosting.Kafka v projektu hostitele aplikace a poté jej přidejte pomocí buildera.
dotnet add package Aspire.Hosting.Kafka
Další informace najdete v tématu dotnet add package nebo Správa závislostí balíčků v aplikacích .NET.
Přidejte prostředek Kafka server
V projektu hostitele aplikace zavolejte AddKafka na instanci builder
a přidejte prostředek server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Když .NET.NET Aspire přidá obraz kontejneru do hostitele aplikace, jak je znázorněno v předchozím příkladu s obrazem docker.io/confluentinc/confluent-local
, vytvoří na místním počítači novou instanci Kafka server. Do serverje přidán odkaz na Kafka kafka
(proměnná ExampleProject
). Prostředek server Kafka obsahuje výchozí porty.
Metoda WithReference nakonfiguruje připojení v ExampleProject
s názvem "kafka"
. Další informace najdete v tématu životní cyklus prostředků kontejneru.
Spropitné
Pokud se raději připojíte k existujícímu Kafka server, volejte spíše AddConnectionString. Pro více informací viz Odkaz na existující prostředky.
Přidání uživatelského rozhraní Kafka
Pokud chcete k prostředku Kafka přidat uživatelské rozhraní server, použijte metodu WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Uživatelské rozhraní Kafka je bezplatné opensourcové webové uživatelské rozhraní pro monitorování a správu Apache Kafka clusterů.
.NET
.NET Aspire přidá do hostitele aplikace, na kterém běží uživatelské rozhraní Kafka, další image kontejneru docker.io/provectuslabs/kafka-ui
.
Změna portu hostitele uživatelského rozhraní Kafka
Pokud chcete změnit port hostitele uživatelského rozhraní Kafka, zřetězte volání metody WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Uživatelské rozhraní Kafka je přístupné v http://localhost:9100
v předchozím příkladu.
Přidání prostředku Kafka server s datovým svazkem
Pokud chcete přidat datový svazek do prostředku Kafka server, zavolejte metodu WithDataVolume pro prostředek Kafka server:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Datový svazek slouží k zachování dat Kafka server mimo životní cyklus kontejneru. Datový svazek se připojí k cestě /var/lib/kafka/data
v kontejneru server Kafka a pokud není zadaný parametr name
, název se náhodně vygeneruje. Další informace o datových objemech a podrobnosti o tom, proč se upřednostňují před připojeními bind, najdete v dokumentaci Docker s názvem: Svazky.
Přidejte prostředek Kafka server s připojením dat pomocí bind mount
Pokud chcete přidat datový bind mount k prostředku Kafka server, zavolejte metodu WithDataBindMount.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Důležitý
Připojení typu bind dat mají omezenou funkčnost ve srovnání se svazky, které nabízejí lepší výkon, přenositelnost a zabezpečení, čímž jsou vhodnější pro produkční prostředí. Připojení pomocí vazby však umožňují přímý přístup a úpravy souborů na hostitelském systému, což je ideální pro vývoj a testování, kde jsou potřebné změny v reálném čase.
Připojení vazebného úložiště spoléhají na systém souborů hostitelského počítače, aby uchovala data Kafka server během restartů kontejneru. Připojení úložiště dat je připojeno na cestu C:\Kafka\Data
ve Windows (nebo /Kafka/Data
na Unix) na hostitelském počítači v kontejneru Kafka server. Další informace o připojeních datových vazeb najdete v dokumentaci Docker: Připojení vazby.
Hostování kontrol stavu integrace
Integrace hostování Kafka automaticky přidá kontrolu stavu prostředku server Kafka. Kontrola stavu ověřuje, že Kafka producent se zadaným názvem připojení se může připojit a uložit téma na Kafka server.
Integrace hostování spoléhá na balíček NuGet 📦 AspNetCore.HealthChecks.Kafka.
integrace Client
Pokud chcete začít s integrací .NET AspireApache Kafka, nainstalujte 📦Aspire.Confluent.Kafka balíček NuGet v projektu client, tedy projektu aplikace, která používá Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Přidání producenta Kafka
V souboru Program.cs projektu, který využívá client, zavolejte metodu rozšíření AddKafkaProducer, která zaregistruje IProducer<TKey, TValue>
pro použití prostřednictvím kontejneru injektáže závislostí. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy k odeslání brokerovi. Tyto obecné parametry používají AddKafkaProducer
k vytvoření instance ProducerBuilder<TKey, TValue>
. Tato metoda také přebírá parametr názvu připojení.
builder.AddKafkaProducer<string, string>("messaging");
Potom můžete načíst instanci IProducer<TKey, TValue>
pomocí injektáže závislostí. Pokud chcete například načíst producenta z kódu IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Další informace o pracovnících naleznete v sekci Služby pracovníků v .NET.
Přidání příjemce Kafka
Pokud chcete zaregistrovat IConsumer<TKey, TValue>
pro použití prostřednictvím kontejneru pro injektáž závislostí, zavolejte metodu rozšíření AddKafkaConsumer v souboru Program.cs projektu, který clientvyužívá. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy přijaté od zprostředkovatele. Tyto obecné parametry používají AddKafkaConsumer
k vytvoření instance ConsumerBuilder<TKey, TValue>
. Tato metoda také přebírá parametr názvu připojení.
builder.AddKafkaConsumer<string, string>("messaging");
Potom můžete načíst instanci IConsumer<TKey, TValue>
pomocí injektáže závislostí. Pokud chcete například načíst příjemce z IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Přidání klíčových výrobců nebo příjemců Kafka
Můžou se zde vyskytovat situace, kdy chcete zaregistrovat více instancí producenta nebo příjemce s různými názvy připojení. Pokud chcete zaregistrovat klíčové producenty nebo uživatele Kafka, zavolejte příslušné rozhraní API:
- AddKeyedKafkaProducer: Zaregistruje klíčového producenta Kafka.
- AddKeyedKafkaConsumer: Zaregistruje klíčovaného uživatele Kafka.
Další informace o službách s klíčem najdete v části .NET injektáž závislostí: Služby s klíčem.
Konfigurace
Integrace .NET AspireApache Kafka poskytuje několik možností konfigurace připojení na základě požadavků a konvencí projektu.
Použijte připojovací řetězec
Při použití připojovacího řetězce z oddílu konfigurace ConnectionStrings
můžete zadat název připojovacího řetězce při volání builder.AddKafkaProducer()
nebo builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Pak se načte připojovací řetězec z oddílu konfigurace ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Hodnota připojovacího řetězce je nastavena na BootstrapServers
vlastnost vytvořené IProducer<TKey, TValue>
nebo IConsumer<TKey, TValue>
instance. Další informace naleznete v tématu BootstrapServers.
Použití zprostředkovatelů konfigurace
Integrace .NET AspireApache Kafka podporuje Microsoft.Extensions.Configuration. Načte KafkaProducerSettings nebo KafkaConsumerSettings z konfigurace pomocí klíčů Aspire:Confluent:Kafka:Producer
a Aspire.Confluent:Kafka:Consumer
. Následující fragment kódu je příkladem souboru appsettings.json, který konfiguruje některé z možností:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Vlastnosti Config
částí konfigurace Aspire:Confluent:Kafka:Producer
a Aspire.Confluent:Kafka:Consumer
se vážou k instancím ProducerConfig
a ConsumerConfig
respektive.
Confluent.Kafka.Consumer<TKey, TValue>
vyžaduje, aby byla vlastnost ClientId
nastavená, aby broker mohl sledovat offsety spotřebovaných zpráv.
Kompletní schéma integrace client Kafka JSON najdete v tématu Aspire. Confluent.Kafka/ConfigurationSchema.json.
Použití vložených delegátů
Pro nastavení různých možností je k dispozici několik vložených delegátů.
KonfigurujteKafkaProducerSettings
a KafkaConsumerSettings
Delegáta Action<KafkaProducerSettings> configureSettings
můžete předat pro nastavení některých nebo všech možností přímo ve kódu, například zakázat kontroly stavu přímo v kódu.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Můžete nakonfigurovat vložený příjemce z kódu:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Nakonfigurujte ProducerBuilder<TKey, TValue>
a ConsumerBuilder<TKey, TValue>
Pokud chcete konfigurovat tvůrce Confluent.Kafka
, předejte Action<ProducerBuilder<TKey, TValue>>
(nebo Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Pokud potřebujete získat přístup ke službě zaregistrované v kontejneru DI, můžete při registraci výrobců a spotřebitelů předat Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
nebo Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Představte si následující příklad registrace producenta:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Další informace najdete v dokumentaci k rozhraní API ProducerBuilder<TKey, TValue>
a ConsumerBuilder<TKey, TValue>
.
Client kontroly stavu integrace
Integrace .NET.NET Aspire ve výchozím nastavení umožňují kontroly stavu pro všechny služby. Další informace viz .NET.NET Aspire přehled integrací.
Integrace .NET AspireApache Kafka zpracovává následující scénáře kontroly stavu:
- Přidá kontrolu stavu
Aspire.Confluent.Kafka.Producer
, když je KafkaProducerSettings.DisableHealthChecksfalse
. - Přidá kontrolu stavu
Aspire.Confluent.Kafka.Consumer
, když je KafkaConsumerSettings.DisableHealthChecksfalse
. - Integruje se s HTTP koncovým bodem
/health
, který určuje, že všechny registrované kontroly stavu musí projít, aby byla aplikace považována za připravenou k přijetí provozu.
Pozorovatelnost a telemetrie
.NET .NET Aspire integrace automaticky nastaví konfigurace protokolování, trasování a metrik, které se někdy označují jako piliře pozorovatelnosti. Další informace o pozorovatelnosti a telemetrii integrace najdete v přehledu integrace .NET.NET Aspire. V závislosti na zálohovací službě můžou některé integrace podporovat pouze některé z těchto funkcí. Například některé integrace podporují protokolování a trasování, ale ne metriky. Funkce telemetrie je také možné zakázat pomocí technik uvedených v části Konfigurace.
Protokolování
Integrace .NET AspireApache Kafka používá následující kategorie protokolů:
Aspire.Confluent.Kafka
Trasování
Integrace .NET AspireApache Kafka negeneruje distribuované trasování.
Metriky
Integrace .NET AspireApache Kafka generuje následující metriky pomocí OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received