Поделиться через


интеграция .NET AspireApache Kafka

Включает:интеграция хостинга и Client интеграции

Apache Kafka — это платформа потоковой передачи распределенных событий с открытым кодом. Это полезно для создания конвейеров данных в режиме реального времени и приложений потоковой передачи. Интеграция .NET AspireApache Kafka позволяет подключаться к существующим экземплярам Kafka или создавать новые экземпляры из .NET с помощью образа контейнера docker.io/confluentinc/confluent-local.

Интеграция хостинга

Модели интеграции хостинга Apache Kafka моделируют Kafka server как тип KafkaServerResource. Чтобы получить доступ к этому типу, установите 📦Aspire. Host.Kafka пакет NuGet в узле приложения проекта, а затем добавьте его с помощью построителя.

dotnet add package Aspire.Hosting.Kafka

Для получения дополнительной информации см. dotnet add package или Управление зависимостями пакетов в приложениях .NET.

Добавить ресурс Kafka server

В проекте хоста приложения вызовите AddKafka на экземпляре builder, чтобы добавить ресурс Kafka server.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Когда .NET.NET Aspire добавляет образ контейнера в узел приложения, как показано в предыдущем примере с изображением docker.io/confluentinc/confluent-local, он создает новый экземпляр Kafka server на локальном компьютере. Ссылка на Kafka server (переменная kafka) добавляется в ExampleProject. Ресурс Kafka server включает порты по умолчанию

Метод WithReference настраивает подключение в ExampleProject с именем "kafka". Для получения дополнительной информации см. жизненный цикл ресурса контейнера .

Совет

Если вы предпочитаете подключиться к существующему Kafka server, вызовите AddConnectionString вместо этого. Дополнительные сведения см. в статье Справочник по существующим ресурсам.

Добавление пользовательского интерфейса Kafka

Чтобы добавить пользовательский интерфейс Kafka в ресурс Kafka server, вызовите метод WithKafkaUI:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Пользовательский интерфейс Kafka — это бесплатный веб-интерфейс с открытым исходным кодом для мониторинга кластеров Apache Kafka и управления ими. .NET .NET Aspire добавляет еще один образ контейнера docker.io/provectuslabs/kafka-ui в хост приложения, на котором выполняется пользовательский интерфейс Kafka.

Изменение порта узла пользовательского интерфейса Kafka

Чтобы изменить порт узла пользовательского интерфейса Kafka, выполните цепочку вызова метода WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Интерфейс пользователя Kafka доступен по адресу http://localhost:9100 в приведенном выше примере.

Добавить ресурс Kafka server с объемом данных

Чтобы добавить том данных в ресурс Kafka server, вызовите метод WithDataVolume в ресурсе Kafka server:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Объем данных используется для сохранения данных Kafka server за пределами жизненного цикла контейнера. Том данных подключается по пути /var/lib/kafka/data в контейнере Kafka server, и когда параметр name не указан, имя случайно генерируется. Дополнительные сведения о томах данных и сведения о том, почему они предпочтительнее привязки, см. в Docker документации по томам.

Добавление ресурса Kafka server с подключением привязки данных

Чтобы добавить подключение привязки данных к ресурсу Kafka server, вызовите метод WithDataBindMount:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Важный

Подключения привязки данных имеют ограниченные функциональные возможности по сравнению с томами, которые обеспечивают более высокую производительность, переносимость и безопасность, что делает их более подходящими для рабочих сред. Однако привязываемые подключения позволяют напрямую получать доступ и изменять файлы в хост-системе, идеально подходит для разработки и тестирования, в которых требуются изменения в режиме реального времени.

Привязки данных зависят от файловой системы хост-компьютера, чтобы сохранять данные Kafka server при перезапусках контейнера. Точка монтирования данных закреплена по пути C:\Kafka\Data в Windows (или /Kafka/Data на Unix) на хост-компьютере в контейнере Kafka server. Дополнительные сведения о монтировании данных см. в документации по Docker: монтаж привязок.

Проверки состояния интеграции хостинга

Интеграция хостинга Kafka автоматически добавляет проверку работоспособности ресурса Kafka server. Проверка работоспособности проверяет, что производитель Kafka с указанным именем подключения может подключаться и сохранять тему в serverKafka.

Интеграция хостинга зависит от пакета NuGet 📦 AspNetCore.HealthChecks.Kafka.

интеграция Client

Чтобы приступить к работе с интеграцией .NET AspireApache Kafka, установите пакет NuGet 📦AspireConfluent.Kafka в проект, потребляющий client, то есть проект приложения, использующего Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Добавьте производителя Kafka

В файле Program.cs проекта, использующего client, вызовите метод расширения AddKafkaProducer, чтобы зарегистрировать IProducer<TKey, TValue> для работы через контейнер зависимостей. Метод принимает два универсальных параметра, соответствующие типу ключа и типу сообщения для отправки брокеру. Эти общие параметры используются AddKafkaProducer для создания экземпляра ProducerBuilder<TKey, TValue>. Этот метод также принимает параметр имени подключения.

builder.AddKafkaProducer<string, string>("messaging");

Затем можно получить экземпляр IProducer<TKey, TValue> с помощью внедрения зависимостей. Например, чтобы получить производителя из IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Для получения дополнительной информации о рабочих службах см. в .NET.

Добавьте потребителя Kafka

Чтобы зарегистрировать IConsumer<TKey, TValue> для использования через контейнер внедрения зависимостей, вызовите метод расширения AddKafkaConsumer в файле Program.cs проекта, использующего client. Метод принимает два универсальных параметра, соответствующие типу ключа и типу сообщения, полученному от брокера. Эти общие параметры используются AddKafkaConsumer для создания экземпляра ConsumerBuilder<TKey, TValue>. Этот метод также принимает параметр имени подключения.

builder.AddKafkaConsumer<string, string>("messaging");

Затем можно получить экземпляр IConsumer<TKey, TValue> с помощью внедрения зависимостей. Например, чтобы извлечь потребителя из IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Добавление ключевых производителей или потребителей Kafka

Могут возникнуть ситуации, когда требуется зарегистрировать несколько инстанций производителя или потребителя с различными именами подключений. Чтобы зарегистрировать производителей или потребителей Kafka с ключами, вызовите соответствующий API:

Дополнительные сведения о службах с ключами см. в разделе .NET внедрение зависимостей: службы с ключами.

Конфигурация

Интеграция .NET AspireApache Kafka предоставляет несколько вариантов настройки подключения на основе требований и соглашений проекта.

Используйте строку подключения

При использовании строки подключения из раздела конфигурации ConnectionStrings можно указать имя строки подключения при вызове builder.AddKafkaProducer() или builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Затем строка подключения извлекается из раздела конфигурации ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

Значение строки подключения устанавливается в свойство BootstrapServers создаваемого экземпляра IProducer<TKey, TValue> или IConsumer<TKey, TValue>. Дополнительные сведения см. в разделе BootstrapServers.

Использование поставщиков конфигураций

Интеграция .NET AspireApache Kafka поддерживает Microsoft.Extensions.Configuration. Он загружает KafkaProducerSettings или KafkaConsumerSettings из конфигурации соответственно с помощью ключей Aspire:Confluent:Kafka:Producer и Aspire.Confluent:Kafka:Consumer. Следующий фрагмент кода является примером файла appsettings.json, который настраивает некоторые параметры:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Свойства Config разделов конфигурации Aspire:Confluent:Kafka:Producer и Aspire.Confluent:Kafka:Consumer соответственно привязываются к экземплярам ProducerConfig и ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> требует, чтобы свойство ClientId было задано, чтобы разрешить брокеру отслеживать смещения потребляемых сообщений.

Полный схемы интеграции с Kafka см. в разделе . Confluent.Kafka/ConfigurationSchema..

Использование встроенных делегатов

Существует несколько встроенных делегатов, доступных для настройки различных параметров.

НастройкаKafkaProducerSettings и KafkaConsumerSettings

Можно передать делегат Action<KafkaProducerSettings> configureSettings, чтобы настроить некоторые или все параметры напрямую, например, отключить проверки состояния из кода.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Вы можете настроить встроенный потребитель из кода:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Настройка ProducerBuilder<TKey, TValue> и ConsumerBuilder<TKey, TValue>

Чтобы настроить Confluent.Kafka строителей, передайте Action<ProducerBuilder<TKey, TValue>> (или Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

При регистрации производителей и потребителей, если необходимо получить доступ к службе, зарегистрированной в контейнере DI, можно передать Action<IServiceProvider, ProducerBuilder<TKey, TValue>> или Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> соответственно:

Рассмотрим следующий пример регистрации производителя:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Дополнительные сведения см. в документации по API ProducerBuilder<TKey, TValue> и ConsumerBuilder<TKey, TValue>.

Client проверки работоспособности интеграции

По умолчанию .NET.NET Aspire интеграции включают проверки работоспособности для всех сервисов. Дополнительные сведения см. в обзоре интеграции .NET.NET Aspire.

Интеграция .NET AspireApache Kafka обрабатывает следующие сценарии проверки работоспособности:

  • Добавляет проверку работоспособности Aspire.Confluent.Kafka.Producer, когда KafkaProducerSettings.DisableHealthChecksfalse.
  • Добавляет проверку работоспособности Aspire.Confluent.Kafka.Consumer, когда KafkaConsumerSettings.DisableHealthChecksfalse.
  • Интегрируется с конечной точкой HTTP /health для указания, что все зарегистрированные проверки состояния должны быть успешными, чтобы приложение считалось готовым принять трафик.

Наблюдаемость и телеметрия

.NET .NET Aspire интеграции автоматически настраивают конфигурации журналов, трассировки и метрик, которые иногда называются основами наблюдаемости. Дополнительные сведения об наблюдаемости интеграции и телеметрии см. в .NET.NET Aspire обзоре интеграции. В зависимости от резервной службы некоторые интеграции могут поддерживать только некоторые из этих функций. Например, некоторые интеграции поддерживают ведение журнала и трассировку, но не метрики. Функции телеметрии также можно отключить с помощью методов, представленных в разделе конфигурации .

Лесозаготовка

Интеграция .NET AspireApache Kafka использует следующие категории журналов:

  • Aspire.Confluent.Kafka

Отслеживание

Интеграция .NET AspireApache Kafka не излучает распределенные трассировки.

Метрика

Интеграция .NET AspireApache Kafka выдает следующие метрики с помощью OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

См. также