Sdílet prostřednictvím


integrace .NET AspireApache Kafka

zahrnuje:integraci hostování a integraci Client

Apache Kafka je open source distribuovaná platforma streamování událostí. Je užitečné pro vytváření datových kanálů v reálném čase a streamovacích aplikací. Integrace .NET AspireApache Kafka umožňuje připojit se k existujícím instancím Kafka nebo vytvářet nové instance z .NET pomocí image kontejneru docker.io/confluentinc/confluent-local.

Integrace hostování

Model integračního Apache Kafka serveru představuje server Kafka jako typ KafkaServerResource. Chcete-li získat přístup k tomuto typu, nainstalujte balíček NuGet 📦Aspire.Hosting.Kafka v projektu hostitele aplikace a poté jej přidejte pomocí buildera.

dotnet add package Aspire.Hosting.Kafka

Další informace najdete v tématu dotnet add package nebo Správa závislostí balíčků v aplikacích .NET.

Přidání prostředku serveru Kafka

V projektu hostitele aplikace zavolejte AddKafka na instanci builder pro přidání zdroje serveru Kafka.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Když .NET.NET Aspire přidá do hostitele aplikace image kontejneru, jak je znázorněno v předchozím příkladu s imagí docker.io/confluentinc/confluent-local, vytvoří na místním počítači novou instanci serveru Kafka. Do ExampleProjectse přidá odkaz na váš server Kafka (proměnná kafka). Prostředek serveru Kafka obsahuje výchozí porty.

Metoda WithReference nakonfiguruje připojení v ExampleProject s názvem "kafka". Další informace najdete v tématu životní cyklus prostředků kontejneru.

Spropitné

Pokud byste se raději připojili k existujícímu serveru Kafka, volejte místo toho AddConnectionString. Pro více informací viz Odkaz na existující prostředky.

Přidání uživatelského rozhraní Kafka

Pokud chcete do prostředku serveru Kafka přidat uživatelské rozhraní Kafka, zavolejte metodu WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Uživatelské rozhraní Kafka je bezplatné opensourcové webové uživatelské rozhraní pro monitorování a správu Apache Kafka clusterů. .NET .NET Aspire přidá do hostitele aplikace, na kterém běží uživatelské rozhraní Kafka, další image kontejneru docker.io/provectuslabs/kafka-ui.

Změna portu hostitele uživatelského rozhraní Kafka

Pokud chcete změnit port hostitele uživatelského rozhraní Kafka, zřetězte volání metody WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Uživatelské rozhraní Kafka je přístupné v http://localhost:9100 v předchozím příkladu.

Přidání resursu serveru Kafka s datovým svazkem

Pokud chcete přidat datový svazek do prostředku serveru Kafka, zavolejte metodu WithDataVolume na prostředku serveru Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Datový svazek se používá k zachování dat serveru Kafka mimo životní cyklus kontejneru. Datový svazek se připojí k cestě /var/lib/kafka/data v kontejneru serveru Kafka a pokud není zadaný parametr name, název se náhodně vygeneruje. Další informace o datových objemech a podrobnosti o tom, proč se upřednostňují před připojeními bind, najdete v dokumentaci Docker s názvem: Svazky.

Přidání prostředku serveru Kafka s připojením vazby dat

Pokud chcete přidat datovou vazbu k prostředku serveru Kafka, zavolejte metodu WithDataBindMount.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Důležitý

Připojení typu bind dat mají omezenou funkčnost ve srovnání se svazky, které nabízejí lepší výkon, přenositelnost a zabezpečení, čímž jsou vhodnější pro produkční prostředí. Připojení pomocí vazby však umožňují přímý přístup a úpravy souborů na hostitelském systému, což je ideální pro vývoj a testování, kde jsou potřebné změny v reálném čase.

Datové připojení typu mount spoléhají na systém souborů hostitelského počítače k uchování dat serveru Kafka při restartu kontejneru. Datový připojený svazek je připojen k C:\Kafka\Data na Windows (nebo /Kafka/Data na Unix) na hostitelském počítači v kontejneru serveru Kafka. Další informace o připojeních datových vazeb najdete v dokumentaci Docker: Připojení vazby.

Hostování kontrol stavu integrace

Integrace hostování Kafka automaticky přidá kontrolu stavu pro prostředek serveru Kafka. Kontrola funkčnosti ověřuje, že Kafka producent se zadaným názvem připojení je schopný se připojit a uložit téma na serveru Kafka.

Integrace hostování spoléhá na balíček NuGet 📦 AspNetCore.HealthChecks.Kafka.

integrace Client

Pokud chcete začít s integrací .NET AspireApache Kafka, nainstalujte 📦Aspire. Confluent.Kafka balíček NuGet v projektu využívajícím klienta, tj. projekt aplikace, která používá klienta Apache Kafka.

dotnet add package Aspire.Confluent.Kafka

Přidání producenta Kafka

V souboru Program.cs ve vašem projektu využívajícím klienta zavolejte metodu rozšíření AddKafkaProducer pro registraci IProducer<TKey, TValue> pro použití prostřednictvím kontejneru pro vkládání závislostí. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy k odeslání brokerovi. Tyto obecné parametry používají AddKafkaProducer k vytvoření instance ProducerBuilder<TKey, TValue>. Tato metoda také přebírá parametr názvu připojení.

builder.AddKafkaProducer<string, string>("messaging");

Potom můžete načíst instanci IProducer<TKey, TValue> pomocí injektáže závislostí. Pokud chcete například načíst producenta z kódu IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Další informace o pracovnících naleznete v sekci Služby pracovníků v .NET.

Přidání příjemce Kafka

Pokud chcete zaregistrovat IConsumer<TKey, TValue> pro použití prostřednictvím kontejneru pro vkládání závislostí, zavolejte metodu rozšíření AddKafkaConsumer v souboru Program.cs vašeho projektu, který používá klienta. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy přijaté od zprostředkovatele. Tyto obecné parametry používají AddKafkaConsumer k vytvoření instance ConsumerBuilder<TKey, TValue>. Tato metoda také přebírá parametr názvu připojení.

builder.AddKafkaConsumer<string, string>("messaging");

Potom můžete načíst instanci IConsumer<TKey, TValue> pomocí injektáže závislostí. Pokud chcete například načíst příjemce z IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Přidání klíčových výrobců nebo příjemců Kafka

Můžou se zde vyskytovat situace, kdy chcete zaregistrovat více instancí producenta nebo příjemce s různými názvy připojení. Pokud chcete zaregistrovat klíčové producenty nebo uživatele Kafka, zavolejte příslušné rozhraní API:

Další informace o službách s klíčem najdete v části .NET injektáž závislostí: Služby s klíčem.

Konfigurace

Integrace .NET AspireApache Kafka poskytuje několik možností konfigurace připojení na základě požadavků a konvencí projektu.

Použijte připojovací řetězec

Při použití připojovacího řetězce z oddílu konfigurace ConnectionStrings můžete zadat název připojovacího řetězce při volání builder.AddKafkaProducer() nebo builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Pak se načte připojovací řetězec z oddílu konfigurace ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Hodnota připojovacího řetězce je nastavena na BootstrapServers vlastnost vytvořené IProducer<TKey, TValue> nebo IConsumer<TKey, TValue> instance. Další informace naleznete v tématu BootstrapServers.

Použití zprostředkovatelů konfigurace

Integrace .NET AspireApache Kafka podporuje Microsoft.Extensions.Configuration. Načte KafkaProducerSettings nebo KafkaConsumerSettings z konfigurace pomocí klíčů Aspire:Confluent:Kafka:Producer a Aspire.Confluent:Kafka:Consumer. Následující fragment kódu je příkladem souboru appsettings.json, který konfiguruje některé z možností:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Vlastnosti Config částí konfigurace Aspire:Confluent:Kafka:Producer a Aspire.Confluent:Kafka:Consumer se vážou k instancím ProducerConfig a ConsumerConfigrespektive.

Confluent.Kafka.Consumer<TKey, TValue> vyžaduje, aby byla vlastnost ClientId nastavená, aby broker mohl sledovat offsety spotřebovaných zpráv.

Kompletní schéma integrace klienta Kafka JSON najdete v Aspire. Confluent.Kafka/ConfigurationSchema.json.

Použití vložených delegátů

Pro nastavení různých možností je k dispozici několik vložených delegátů.

KonfigurujteKafkaProducerSettings a KafkaConsumerSettings

Delegáta Action<KafkaProducerSettings> configureSettings můžete předat pro nastavení některých nebo všech možností přímo ve kódu, například zakázat kontroly stavu přímo v kódu.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Můžete nakonfigurovat vložený příjemce z kódu:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Nakonfigurujte ProducerBuilder<TKey, TValue> a ConsumerBuilder<TKey, TValue>

Pokud chcete konfigurovat tvůrce Confluent.Kafka, předejte Action<ProducerBuilder<TKey, TValue>> (nebo Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Pokud potřebujete získat přístup ke službě zaregistrované v kontejneru DI, můžete při registraci výrobců a spotřebitelů předat Action<IServiceProvider, ProducerBuilder<TKey, TValue>> nebo Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>:

Představte si následující příklad registrace producenta:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Další informace najdete v dokumentaci k rozhraní API ProducerBuilder<TKey, TValue> a ConsumerBuilder<TKey, TValue>.

Client kontroly stavu integrace

Integrace .NET.NET Aspire ve výchozím nastavení umožňují kontroly stavu pro všechny služby. Další informace viz .NET.NET Aspire přehled integrací.

Integrace .NET AspireApache Kafka zpracovává následující scénáře kontroly stavu:

  • Přidá kontrolu stavu Aspire.Confluent.Kafka.Producer, když je KafkaProducerSettings.DisableHealthChecksfalse.
  • Přidá kontrolu stavu Aspire.Confluent.Kafka.Consumer, když je KafkaConsumerSettings.DisableHealthChecksfalse.
  • Integruje se s HTTP koncovým bodem /health, který určuje, že všechny registrované kontroly stavu musí projít, aby byla aplikace považována za připravenou k přijetí provozu.

Pozorovatelnost a telemetrie

.NET .NET Aspire integrace automaticky nastaví konfigurace protokolování, trasování a metrik, které se někdy označují jako piliře pozorovatelnosti. Další informace o pozorovatelnosti a telemetrii integrace najdete v přehledu integrace .NET.NET Aspire. V závislosti na zálohovací službě můžou některé integrace podporovat pouze některé z těchto funkcí. Například některé integrace podporují protokolování a trasování, ale ne metriky. Funkce telemetrie je také možné zakázat pomocí technik uvedených v části Konfigurace.

Protokolování

Integrace .NET AspireApache Kafka používá následující kategorie protokolů:

  • Aspire.Confluent.Kafka

Trasování

Integrace .NET AspireApache Kafka negeneruje distribuované trasování.

Metriky

Integrace .NET AspireApache Kafka generuje následující metriky pomocí OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Viz také