intégration de .NET AspireApache Kafka
inclut :intégration d’hébergement et intégration Client
Apache Kafka est une plateforme de diffusion en continu d’événements distribués open source. Il est utile de créer des pipelines de données en temps réel et des applications de diffusion en continu. L’intégration .NET AspireApache Kafka vous permet de vous connecter à des instances Kafka existantes ou de créer de nouvelles instances à partir de .NET avec l’image conteneur docker.io/confluentinc/confluent-local
.
Intégration de l’hébergement
Le Apache Kafka hébergeant des modèles d’intégration un server Kafka en tant que type de KafkaServerResource. Pour accéder à ce type, installez le package NuGet 📦Aspire.Hosting.Kafka dans le projet hôte de l'application , puis ajoutez-le avec le générateur.
dotnet add package Aspire.Hosting.Kafka
Pour plus d’informations, consultez dotnet add package ou Gérer les dépendances de packages dans les applications .NET.
Ajouter une ressource Kafka server
Dans votre projet hôte d’application, appelez AddKafka sur l’instance de builder
pour ajouter une ressource kafka server :
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Lorsque .NET.NET Aspire ajoute une image conteneur à l’hôte d’application, comme illustré dans l’exemple précédent avec l’image docker.io/confluentinc/confluent-local
, il crée une instance de server Kafka sur votre ordinateur local. Une référence à votre server Kafka (la variable kafka
) est ajoutée à la ExampleProject
. La ressource kafka server inclut les ports par défaut
La méthode WithReference configure une connexion dans le ExampleProject
nommé "kafka"
. Pour plus d’informations, consultez Cycle de vie des ressources conteneur.
Pourboire
Si vous préférez vous connecter à un Kafka existant server, appelez plutôt AddConnectionString. Pour plus d’informations, consultez Référencer les ressources existantes.
Ajouter l’interface utilisateur Kafka
Pour ajouter l’interface utilisateur Kafka à la ressource Kafka server, appelez la méthode WithKafkaUI :
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L’interface utilisateur Kafka est une interface utilisateur web open source gratuite pour surveiller et gérer les clusters Apache Kafka.
.NET
.NET Aspire ajoute une autre image conteneur docker.io/provectuslabs/kafka-ui
à l’hôte d’application qui exécute l’interface utilisateur Kafka.
Modifier le port hôte de l’interface utilisateur Kafka
Pour modifier le port hôte de l’interface utilisateur Kafka, chaînez un appel à la méthode WithHostPort :
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L’interface utilisateur Kafka est accessible à http://localhost:9100
dans l’exemple précédent.
Ajouter la ressource Kafka server avec un volume de stockage de données
Pour ajouter un volume de données à la ressource server Kafka, appelez la méthode WithDataVolume sur la ressource server Kafka :
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Le volume de données est utilisé pour conserver les données Kafka server en dehors du cycle de vie de son conteneur. Le volume de données est monté sur le chemin /var/lib/kafka/data
dans le conteneur Kafka server, et si un paramètre name
n'est pas fourni, le nom est généré au hasard. Pour plus d’informations sur les volumes de données et sur la raison pour laquelle ils sont préférés par rapport aux montages de liaison , voir la documentation Docker : Volumes.
Ajouter une ressource Kafka server avec montage de données liées
Pour ajouter un montage de liaison de données à la ressource server Kafka, appelez la méthode WithDataBindMount :
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Important
Les montages de liaison de données ont des fonctionnalités limitées par rapport aux volumes , qui offrent de meilleures performances, la portabilité et la sécurité, les rendant ainsi plus adaptés aux environnements de production. Toutefois, les montages de liaison autorisent un accès direct et la modification des fichiers sur le système hôte, ce qui est idéal pour le développement et les tests nécessitant des modifications en temps réel.
Les montages de liaison de données s’appuient sur le système de fichiers de l’ordinateur hôte pour conserver les données Kafka server entre les redémarrages des conteneurs. Le montage de liaison de données est effectué sur le chemin d’accès C:\Kafka\Data
sous Windows (ou /Kafka/Data
sur Unix) sur l’ordinateur hôte, dans le conteneur Kafka server. Pour plus d’informations sur les montages de liaison de données, consultez Docker docs : montages de liaison.
Vérifications d’intégrité de l’intégration d’hébergement
L’intégration de l’hébergement Kafka ajoute automatiquement une vérification de l'état pour la ressource Kafka server. La vérification d’intégrité vérifie qu’un producteur Kafka avec le nom de connexion spécifié est en mesure de se connecter et d'enregistrer un sujet dans le Kafka server.
L’intégration de l’hébergement s’appuie sur le package NuGet 📦 AspNetCore.HealthChecks.Kafka.
intégration de Client
Pour commencer à utiliser l’intégration .NET AspireApache Kafka, installez le package NuGet 📦Aspire.Confluent.Kafka dans le projet consommateur client, c’est-à-dire le projet de l’application qui utilise le Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Ajouter un producteur Kafka
Dans le fichier Program.cs de votre projet clientqui consomme, appelez la méthode d’extension AddKafkaProducer pour enregistrer un IProducer<TKey, TValue>
à utiliser via le conteneur d’injection de dépendances. La méthode prend deux paramètres génériques correspondant au type de la clé et au type du message à envoyer au répartiteur. Ces paramètres génériques sont utilisés par AddKafkaProducer
pour créer une instance de ProducerBuilder<TKey, TValue>
. Cette méthode prend également le paramètre de nom de connexion.
builder.AddKafkaProducer<string, string>("messaging");
Vous pouvez ensuite récupérer l’instance IProducer<TKey, TValue>
à l’aide de l’injection de dépendances. Par exemple, pour récupérer le producteur depuis un IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Pour plus d’informations sur les travailleurs, consultez services Worker dans .NET.
Ajouter un consommateur Kafka
Pour inscrire un IConsumer<TKey, TValue>
à utiliser via le conteneur d’injection de dépendances, appelez la méthode d’extension AddKafkaConsumer dans le fichier Program.cs de votre projet consommant client. La méthode accepte deux paramètres génériques correspondant au type de la clé et au type du message à recevoir du répartiteur. Ces paramètres génériques sont utilisés par AddKafkaConsumer
pour créer une instance de ConsumerBuilder<TKey, TValue>
. Cette méthode prend également le paramètre de nom de connexion.
builder.AddKafkaConsumer<string, string>("messaging");
Vous pouvez ensuite récupérer l’instance IConsumer<TKey, TValue>
à l’aide de l’injection de dépendances. Par exemple, pour récupérer le consommateur depuis un IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Ajouter des producteurs ou des consommateurs Kafka avec clé
Il peut arriver que vous souhaitiez inscrire plusieurs instances de producteur ou de consommateur avec différents noms de connexion. Pour enregistrer les producteurs ou consommateurs Kafka avec clés, appelez l'API appropriée :
- AddKeyedKafkaProducer: inscrit un producteur Kafka clé.
- AddKeyedKafkaConsumer: Enregistre un consommateur Kafka avec clé.
Pour plus d’informations sur les services à clé, consultez .NET injection de dépendances : services à clé.
Configuration
L’intégration .NET AspireApache Kafka fournit plusieurs options pour configurer la connexion en fonction des exigences et des conventions de votre projet.
Utiliser une chaîne de connexion
Lorsque vous utilisez une chaîne de connexion à partir de la section de configuration ConnectionStrings
, vous pouvez fournir le nom de la chaîne de connexion lors de l’appel de builder.AddKafkaProducer()
ou de builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
Ensuite, la chaîne de connexion est récupérée à partir de la section de configuration ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
La valeur de la chaîne de connexion est définie sur la propriété BootstrapServers
de l’instance IProducer<TKey, TValue>
ou IConsumer<TKey, TValue>
produite. Pour plus d’informations, consultez bootstrapServers.
Utiliser des fournisseurs de configuration
L’intégration .NET AspireApache Kafka prend en charge Microsoft.Extensions.Configuration. Il charge les KafkaProducerSettings ou les KafkaConsumerSettings à partir de la configuration en utilisant respectivement les clés Aspire:Confluent:Kafka:Producer
et Aspire.Confluent:Kafka:Consumer
. L’extrait de code suivant est un exemple de fichier appsettings.json qui configure certaines des options :
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Les propriétés Config
des sections de configuration Aspire:Confluent:Kafka:Producer
et Aspire.Confluent:Kafka:Consumer
se lient respectivement aux instances de ProducerConfig
et de ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
nécessite que la propriété ClientId
soit définie pour permettre au répartiteur de suivre les décalages de messages consommés.
Pour obtenir le schéma complet de client d’intégration JSON Kafka, consultez Aspire. Confluent.Kafka/ConfigurationSchema.json.
Utiliser des délégués en ligne
Plusieurs délégués inline sont disponibles pour configurer différentes options.
ConfigurerKafkaProducerSettings
et KafkaConsumerSettings
Vous pouvez passer le délégué Action<KafkaProducerSettings> configureSettings
pour configurer certaines ou toutes les options en ligne, par exemple pour désactiver les vérifications de santé à partir du code :
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Vous pouvez configurer un consommateur en ligne à partir du code :
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configurer ProducerBuilder<TKey, TValue>
et ConsumerBuilder<TKey, TValue>
Pour configurer Confluent.Kafka
générateurs, passez un Action<ProducerBuilder<TKey, TValue>>
(ou Action<ConsumerBuilder<TKey, TValue>>
) :
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Lorsque vous inscrivez des producteurs et des consommateurs, si vous avez besoin d'accéder à un service enregistré dans le conteneur DI, vous pouvez passer un Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
ou un Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
respectivement :
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Prenons l’exemple d’inscription de producteur suivant :
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Pour plus d’informations, consultez la documentation de l’API ProducerBuilder<TKey, TValue>
et ConsumerBuilder<TKey, TValue>
.
Client vérifications d’intégrité de l’intégration
Par défaut, les intégrations .NET.NET Aspire activent les contrôles de santé pour tous les services. Pour plus d’informations, consultez .NET.NET Aspire vue d’ensemble des intégrations.
L'intégration .NET AspireApache Kafka gère les scénarios de vérification de l'état de santé suivants :
- Ajoute le contrôle d’intégrité
Aspire.Confluent.Kafka.Producer
lorsque KafkaProducerSettings.DisableHealthChecks estfalse
. - Ajoute le contrôle d’intégrité
Aspire.Confluent.Kafka.Consumer
lorsque KafkaConsumerSettings.DisableHealthChecks estfalse
. - S'intègre au point de terminaison HTTP
/health
, qui stipule que toutes les vérifications d'intégrité enregistrées doivent être validées pour que l'application soit considérée prête à accepter le trafic réseau.
Observabilité et télémétrie
.NET .NET Aspire intégrations configurent automatiquement les configurations de journalisation, de suivi et de métriques, parfois appelées les piliers de l’observabilité. Pour plus d’informations sur l’observabilité de l’intégration et la télémétrie, consultez .NET.NET Aspire vue d’ensemble des intégrations. Selon le service de stockage, certaines intégrations peuvent uniquement prendre en charge certaines de ces fonctionnalités. Par exemple, certaines intégrations prennent en charge la journalisation et le suivi, mais pas les métriques. Les fonctionnalités de télémétrie peuvent également être désactivées à l’aide des techniques présentées dans la section Configuration.
Exploitation forestière
L'intégration .NET AspireApache Kafka utilise les catégories de journal suivantes :
Aspire.Confluent.Kafka
Traçage
L’intégration .NET AspireApache Kafka n’émet pas de traces distribuées.
Métriques
L’intégration .NET AspireApache Kafka émet les métriques suivantes à l’aide de OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received