Sdílet prostřednictvím


integrace .NET AspireApache Kafka

zahrnuje:integraci hostování a integraci Client

Apache Kafka je open source distribuovaná platforma streamování událostí. Je užitečné pro vytváření datových kanálů v reálném čase a streamovacích aplikací. Integrace .NET AspireApache Kafka umožňuje připojit se k existujícím instancím Kafka nebo vytvářet nové instance z .NET pomocí image kontejneru docker.io/confluentinc/confluent-local.

Integrace hostování

Apache Kafka integrace hostování modeluje server Kafka jako typ KafkaServerResource. Chcete-li získat přístup k tomuto typu, nainstalujte balíček NuGet 📦Aspire.Hosting.Kafka v projektu hostitele aplikace a poté jej přidejte pomocí buildera.

dotnet add package Aspire.Hosting.Kafka

Další informace najdete v tématu dotnet add package nebo Správa závislostí balíčků v aplikacích .NET.

Přidejte prostředek Kafka server

V projektu hostitele aplikace zavolejte AddKafka na instanci builder a přidejte prostředek server Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Když .NET.NET Aspire přidá obraz kontejneru do hostitele aplikace, jak je znázorněno v předchozím příkladu s obrazem docker.io/confluentinc/confluent-local, vytvoří na místním počítači novou instanci Kafka server. Do serverje přidán odkaz na Kafka kafka (proměnná ExampleProject). Prostředek server Kafka obsahuje výchozí porty.

Metoda WithReference nakonfiguruje připojení v ExampleProject s názvem "kafka". Další informace najdete v tématu životní cyklus prostředků kontejneru.

Spropitné

Pokud se raději připojíte k existujícímu Kafka server, volejte spíše AddConnectionString. Pro více informací viz Odkaz na existující prostředky.

Přidání uživatelského rozhraní Kafka

Pokud chcete k prostředku Kafka přidat uživatelské rozhraní server, použijte metodu WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Uživatelské rozhraní Kafka je bezplatné opensourcové webové uživatelské rozhraní pro monitorování a správu Apache Kafka clusterů. .NET .NET Aspire přidá do hostitele aplikace, na kterém běží uživatelské rozhraní Kafka, další image kontejneru docker.io/provectuslabs/kafka-ui.

Změna portu hostitele uživatelského rozhraní Kafka

Pokud chcete změnit port hostitele uživatelského rozhraní Kafka, zřetězte volání metody WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Uživatelské rozhraní Kafka je přístupné v http://localhost:9100 v předchozím příkladu.

Přidání prostředku Kafka server s datovým svazkem

Pokud chcete přidat datový svazek do prostředku Kafka server, zavolejte metodu WithDataVolume pro prostředek Kafka server:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Datový svazek slouží k zachování dat Kafka server mimo životní cyklus kontejneru. Datový svazek se připojí k cestě /var/lib/kafka/data v kontejneru server Kafka a pokud není zadaný parametr name, název se náhodně vygeneruje. Další informace o datových objemech a podrobnosti o tom, proč se upřednostňují před připojeními bind, najdete v dokumentaci Docker s názvem: Svazky.

Přidejte prostředek Kafka server s připojením dat pomocí bind mount

Pokud chcete přidat datový bind mount k prostředku Kafka server, zavolejte metodu WithDataBindMount.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Důležitý

Připojení typu bind dat mají omezenou funkčnost ve srovnání se svazky, které nabízejí lepší výkon, přenositelnost a zabezpečení, čímž jsou vhodnější pro produkční prostředí. Připojení pomocí vazby však umožňují přímý přístup a úpravy souborů na hostitelském systému, což je ideální pro vývoj a testování, kde jsou potřebné změny v reálném čase.

Připojení vazebného úložiště spoléhají na systém souborů hostitelského počítače, aby uchovala data Kafka server během restartů kontejneru. Připojení úložiště dat je připojeno na cestu C:\Kafka\Data ve Windows (nebo /Kafka/Data na Unix) na hostitelském počítači v kontejneru Kafka server. Další informace o připojeních datových vazeb najdete v dokumentaci Docker: Připojení vazby.

Hostování kontrol stavu integrace

Integrace hostování Kafka automaticky přidá kontrolu stavu prostředku server Kafka. Kontrola stavu ověřuje, že Kafka producent se zadaným názvem připojení se může připojit a uložit téma na Kafka server.

Integrace hostování spoléhá na balíček NuGet 📦 AspNetCore.HealthChecks.Kafka.

integrace Client

Pokud chcete začít s integrací .NET AspireApache Kafka, nainstalujte 📦Aspire.Confluent.Kafka balíček NuGet v projektu client, tedy projektu aplikace, která používá Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Přidání producenta Kafka

V souboru Program.cs projektu, který využívá client, zavolejte metodu rozšíření AddKafkaProducer, která zaregistruje IProducer<TKey, TValue> pro použití prostřednictvím kontejneru injektáže závislostí. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy k odeslání brokerovi. Tyto obecné parametry používají AddKafkaProducer k vytvoření instance ProducerBuilder<TKey, TValue>. Tato metoda také přebírá parametr názvu připojení.

builder.AddKafkaProducer<string, string>("messaging");

Potom můžete načíst instanci IProducer<TKey, TValue> pomocí injektáže závislostí. Pokud chcete například načíst producenta z kódu IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Další informace o pracovnících naleznete v sekci Služby pracovníků v .NET.

Přidání příjemce Kafka

Pokud chcete zaregistrovat IConsumer<TKey, TValue> pro použití prostřednictvím kontejneru pro injektáž závislostí, zavolejte metodu rozšíření AddKafkaConsumer v souboru Program.cs projektu, který clientvyužívá. Metoda přebírá dva obecné parametry odpovídající typu klíče a typu zprávy přijaté od zprostředkovatele. Tyto obecné parametry používají AddKafkaConsumer k vytvoření instance ConsumerBuilder<TKey, TValue>. Tato metoda také přebírá parametr názvu připojení.

builder.AddKafkaConsumer<string, string>("messaging");

Potom můžete načíst instanci IConsumer<TKey, TValue> pomocí injektáže závislostí. Pokud chcete například načíst příjemce z IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Přidání klíčových výrobců nebo příjemců Kafka

Můžou se zde vyskytovat situace, kdy chcete zaregistrovat více instancí producenta nebo příjemce s různými názvy připojení. Pokud chcete zaregistrovat klíčové producenty nebo uživatele Kafka, zavolejte příslušné rozhraní API:

Další informace o službách s klíčem najdete v části .NET injektáž závislostí: Služby s klíčem.

Konfigurace

Integrace .NET AspireApache Kafka poskytuje několik možností konfigurace připojení na základě požadavků a konvencí projektu.

Použijte připojovací řetězec

Při použití připojovacího řetězce z oddílu konfigurace ConnectionStrings můžete zadat název připojovacího řetězce při volání builder.AddKafkaProducer() nebo builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Pak se načte připojovací řetězec z oddílu konfigurace ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Hodnota připojovacího řetězce je nastavena na BootstrapServers vlastnost vytvořené IProducer<TKey, TValue> nebo IConsumer<TKey, TValue> instance. Další informace naleznete v tématu BootstrapServers.

Použití zprostředkovatelů konfigurace

Integrace .NET AspireApache Kafka podporuje Microsoft.Extensions.Configuration. Načte KafkaProducerSettings nebo KafkaConsumerSettings z konfigurace pomocí klíčů Aspire:Confluent:Kafka:Producer a Aspire.Confluent:Kafka:Consumer. Následující fragment kódu je příkladem souboru appsettings.json, který konfiguruje některé z možností:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Vlastnosti Config částí konfigurace Aspire:Confluent:Kafka:Producer a Aspire.Confluent:Kafka:Consumer se vážou k instancím ProducerConfig a ConsumerConfigrespektive.

Confluent.Kafka.Consumer<TKey, TValue> vyžaduje, aby byla vlastnost ClientId nastavená, aby broker mohl sledovat offsety spotřebovaných zpráv.

Kompletní schéma integrace client Kafka JSON najdete v tématu Aspire. Confluent.Kafka/ConfigurationSchema.json.

Použití vložených delegátů

Pro nastavení různých možností je k dispozici několik vložených delegátů.

KonfigurujteKafkaProducerSettings a KafkaConsumerSettings

Delegáta Action<KafkaProducerSettings> configureSettings můžete předat pro nastavení některých nebo všech možností přímo ve kódu, například zakázat kontroly stavu přímo v kódu.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Můžete nakonfigurovat vložený příjemce z kódu:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Nakonfigurujte ProducerBuilder<TKey, TValue> a ConsumerBuilder<TKey, TValue>

Pokud chcete konfigurovat tvůrce Confluent.Kafka, předejte Action<ProducerBuilder<TKey, TValue>> (nebo Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Pokud potřebujete získat přístup ke službě zaregistrované v kontejneru DI, můžete při registraci výrobců a spotřebitelů předat Action<IServiceProvider, ProducerBuilder<TKey, TValue>> nebo Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>:

Představte si následující příklad registrace producenta:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Další informace najdete v dokumentaci k rozhraní API ProducerBuilder<TKey, TValue> a ConsumerBuilder<TKey, TValue>.

Client kontroly stavu integrace

Integrace .NET.NET Aspire ve výchozím nastavení umožňují kontroly stavu pro všechny služby. Další informace viz .NET.NET Aspire přehled integrací.

Integrace .NET AspireApache Kafka zpracovává následující scénáře kontroly stavu:

  • Přidá kontrolu stavu Aspire.Confluent.Kafka.Producer, když je KafkaProducerSettings.DisableHealthChecksfalse.
  • Přidá kontrolu stavu Aspire.Confluent.Kafka.Consumer, když je KafkaConsumerSettings.DisableHealthChecksfalse.
  • Integruje se s HTTP koncovým bodem /health, který určuje, že všechny registrované kontroly stavu musí projít, aby byla aplikace považována za připravenou k přijetí provozu.

Pozorovatelnost a telemetrie

.NET .NET Aspire integrace automaticky nastaví konfigurace protokolování, trasování a metrik, které se někdy označují jako piliře pozorovatelnosti. Další informace o pozorovatelnosti a telemetrii integrace najdete v přehledu integrace .NET.NET Aspire. V závislosti na zálohovací službě můžou některé integrace podporovat pouze některé z těchto funkcí. Například některé integrace podporují protokolování a trasování, ale ne metriky. Funkce telemetrie je také možné zakázat pomocí technik uvedených v části Konfigurace.

Protokolování

Integrace .NET AspireApache Kafka používá následující kategorie protokolů:

  • Aspire.Confluent.Kafka

Trasování

Integrace .NET AspireApache Kafka negeneruje distribuované trasování.

Metriky

Integrace .NET AspireApache Kafka generuje následující metriky pomocí OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Viz také