интеграция .NET AspireApache Kafka
Включает:интеграция хостинга и Client интеграции
Apache Kafka — это платформа потоковой передачи распределенных событий с открытым кодом. Это полезно для создания конвейеров данных в режиме реального времени и приложений потоковой передачи. Интеграция .NET AspireApache Kafka позволяет подключаться к существующим экземплярам Kafka или создавать новые экземпляры из .NET с помощью образа контейнера docker.io/confluentinc/confluent-local
.
Интеграция хостинга
Модели интеграции хостинга Apache Kafka моделируют Kafka server как тип KafkaServerResource. Чтобы получить доступ к этому типу, установите 📦Aspire. Host.Kafka пакет NuGet в узле приложения проекта, а затем добавьте его с помощью построителя.
dotnet add package Aspire.Hosting.Kafka
Для получения дополнительной информации см. dotnet add package или Управление зависимостями пакетов в приложениях .NET.
Добавить ресурс Kafka server
В проекте хоста приложения вызовите AddKafka на экземпляре builder
, чтобы добавить ресурс Kafka server.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Когда .NET.NET Aspire добавляет образ контейнера в узел приложения, как показано в предыдущем примере с изображением docker.io/confluentinc/confluent-local
, он создает новый экземпляр Kafka server на локальном компьютере. Ссылка на Kafka server (переменная kafka
) добавляется в ExampleProject
. Ресурс Kafka server включает порты по умолчанию
Метод WithReference настраивает подключение в ExampleProject
с именем "kafka"
. Для получения дополнительной информации см. жизненный цикл ресурса контейнера .
Совет
Если вы предпочитаете подключиться к существующему Kafka server, вызовите AddConnectionString вместо этого. Дополнительные сведения см. в статье Справочник по существующим ресурсам.
Добавление пользовательского интерфейса Kafka
Чтобы добавить пользовательский интерфейс Kafka в ресурс Kafka server, вызовите метод WithKafkaUI:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Пользовательский интерфейс Kafka — это бесплатный веб-интерфейс с открытым исходным кодом для мониторинга кластеров Apache Kafka и управления ими.
.NET
.NET Aspire добавляет еще один образ контейнера docker.io/provectuslabs/kafka-ui
в хост приложения, на котором выполняется пользовательский интерфейс Kafka.
Изменение порта узла пользовательского интерфейса Kafka
Чтобы изменить порт узла пользовательского интерфейса Kafka, выполните цепочку вызова метода WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Интерфейс пользователя Kafka доступен по адресу http://localhost:9100
в приведенном выше примере.
Добавить ресурс Kafka server с объемом данных
Чтобы добавить том данных в ресурс Kafka server, вызовите метод WithDataVolume в ресурсе Kafka server:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Объем данных используется для сохранения данных Kafka server за пределами жизненного цикла контейнера. Том данных подключается по пути /var/lib/kafka/data
в контейнере Kafka server, и когда параметр name
не указан, имя случайно генерируется. Дополнительные сведения о томах данных и сведения о том, почему они предпочтительнее привязки, см. в Docker документации по томам.
Добавление ресурса Kafka server с подключением привязки данных
Чтобы добавить подключение привязки данных к ресурсу Kafka server, вызовите метод WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Важный
Подключения привязки данных имеют ограниченные функциональные возможности по сравнению с томами, которые обеспечивают более высокую производительность, переносимость и безопасность, что делает их более подходящими для рабочих сред. Однако привязываемые подключения позволяют напрямую получать доступ и изменять файлы в хост-системе, идеально подходит для разработки и тестирования, в которых требуются изменения в режиме реального времени.
Привязки данных зависят от файловой системы хост-компьютера, чтобы сохранять данные Kafka server при перезапусках контейнера. Точка монтирования данных закреплена по пути C:\Kafka\Data
в Windows (или /Kafka/Data
на Unix) на хост-компьютере в контейнере Kafka server. Дополнительные сведения о монтировании данных см. в документации по Docker: монтаж привязок.
Проверки состояния интеграции хостинга
Интеграция хостинга Kafka автоматически добавляет проверку работоспособности ресурса Kafka server. Проверка работоспособности проверяет, что производитель Kafka с указанным именем подключения может подключаться и сохранять тему в serverKafka.
Интеграция хостинга зависит от пакета NuGet 📦 AspNetCore.HealthChecks.Kafka.
интеграция Client
Чтобы приступить к работе с интеграцией .NET AspireApache Kafka, установите пакет NuGet 📦AspireConfluent.Kafka в проект, потребляющий client, то есть проект приложения, использующего Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Добавьте производителя Kafka
В файле Program.cs проекта, использующего client, вызовите метод расширения AddKafkaProducer, чтобы зарегистрировать IProducer<TKey, TValue>
для работы через контейнер зависимостей. Метод принимает два универсальных параметра, соответствующие типу ключа и типу сообщения для отправки брокеру. Эти общие параметры используются AddKafkaProducer
для создания экземпляра ProducerBuilder<TKey, TValue>
. Этот метод также принимает параметр имени подключения.
builder.AddKafkaProducer<string, string>("messaging");
Затем можно получить экземпляр IProducer<TKey, TValue>
с помощью внедрения зависимостей. Например, чтобы получить производителя из IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Для получения дополнительной информации о рабочих службах см. в .NET.
Добавьте потребителя Kafka
Чтобы зарегистрировать IConsumer<TKey, TValue>
для использования через контейнер внедрения зависимостей, вызовите метод расширения AddKafkaConsumer в файле Program.cs проекта, использующего client. Метод принимает два универсальных параметра, соответствующие типу ключа и типу сообщения, полученному от брокера. Эти общие параметры используются AddKafkaConsumer
для создания экземпляра ConsumerBuilder<TKey, TValue>
. Этот метод также принимает параметр имени подключения.
builder.AddKafkaConsumer<string, string>("messaging");
Затем можно получить экземпляр IConsumer<TKey, TValue>
с помощью внедрения зависимостей. Например, чтобы извлечь потребителя из IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Добавление ключевых производителей или потребителей Kafka
Могут возникнуть ситуации, когда требуется зарегистрировать несколько инстанций производителя или потребителя с различными именами подключений. Чтобы зарегистрировать производителей или потребителей Kafka с ключами, вызовите соответствующий API:
- AddKeyedKafkaProducer: регистрирует ключевого производителя Kafka.
- AddKeyedKafkaConsumer: регистрирует потребитель Kafka с ключом.
Дополнительные сведения о службах с ключами см. в разделе .NET внедрение зависимостей: службы с ключами.
Конфигурация
Интеграция .NET AspireApache Kafka предоставляет несколько вариантов настройки подключения на основе требований и соглашений проекта.
Используйте строку подключения
При использовании строки подключения из раздела конфигурации ConnectionStrings
можно указать имя строки подключения при вызове builder.AddKafkaProducer()
или builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Затем строка подключения извлекается из раздела конфигурации ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Значение строки подключения устанавливается в свойство BootstrapServers
создаваемого экземпляра IProducer<TKey, TValue>
или IConsumer<TKey, TValue>
. Дополнительные сведения см. в разделе BootstrapServers.
Использование поставщиков конфигураций
Интеграция .NET AspireApache Kafka поддерживает Microsoft.Extensions.Configuration. Он загружает KafkaProducerSettings или KafkaConsumerSettings из конфигурации соответственно с помощью ключей Aspire:Confluent:Kafka:Producer
и Aspire.Confluent:Kafka:Consumer
. Следующий фрагмент кода является примером файла appsettings.json, который настраивает некоторые параметры:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Свойства Config
разделов конфигурации Aspire:Confluent:Kafka:Producer
и Aspire.Confluent:Kafka:Consumer
соответственно привязываются к экземплярам ProducerConfig
и ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
требует, чтобы свойство ClientId
было задано, чтобы разрешить брокеру отслеживать смещения потребляемых сообщений.
Полный
Использование встроенных делегатов
Существует несколько встроенных делегатов, доступных для настройки различных параметров.
НастройкаKafkaProducerSettings
и KafkaConsumerSettings
Можно передать делегат Action<KafkaProducerSettings> configureSettings
, чтобы настроить некоторые или все параметры напрямую, например, отключить проверки состояния из кода.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Вы можете настроить встроенный потребитель из кода:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Настройка ProducerBuilder<TKey, TValue>
и ConsumerBuilder<TKey, TValue>
Чтобы настроить Confluent.Kafka
строителей, передайте Action<ProducerBuilder<TKey, TValue>>
(или Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
При регистрации производителей и потребителей, если необходимо получить доступ к службе, зарегистрированной в контейнере DI, можно передать Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
или Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
соответственно:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Рассмотрим следующий пример регистрации производителя:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Дополнительные сведения см. в документации по API ProducerBuilder<TKey, TValue>
и ConsumerBuilder<TKey, TValue>
.
Client проверки работоспособности интеграции
По умолчанию .NET.NET Aspire интеграции включают проверки работоспособности для всех сервисов. Дополнительные сведения см. в обзоре интеграции .NET.NET Aspire.
Интеграция .NET AspireApache Kafka обрабатывает следующие сценарии проверки работоспособности:
- Добавляет проверку работоспособности
Aspire.Confluent.Kafka.Producer
, когда KafkaProducerSettings.DisableHealthChecksfalse
. - Добавляет проверку работоспособности
Aspire.Confluent.Kafka.Consumer
, когда KafkaConsumerSettings.DisableHealthChecksfalse
. - Интегрируется с конечной точкой HTTP
/health
для указания, что все зарегистрированные проверки состояния должны быть успешными, чтобы приложение считалось готовым принять трафик.
Наблюдаемость и телеметрия
.NET
.NET Aspire интеграции автоматически настраивают конфигурации журналов, трассировки и метрик, которые иногда называются основами наблюдаемости. Дополнительные сведения об наблюдаемости интеграции и телеметрии см. в .NET.NET Aspire обзоре интеграции. В зависимости от резервной службы некоторые интеграции могут поддерживать только некоторые из этих функций. Например, некоторые интеграции поддерживают ведение журнала и трассировку, но не метрики. Функции телеметрии также можно отключить с помощью методов, представленных в разделе конфигурации
Лесозаготовка
Интеграция .NET AspireApache Kafka использует следующие категории журналов:
Aspire.Confluent.Kafka
Отслеживание
Интеграция .NET AspireApache Kafka не излучает распределенные трассировки.
Метрика
Интеграция .NET AspireApache Kafka выдает следующие метрики с помощью OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
См. также
.NET Aspire