Partager via


intégration de .NET AspireApache Kafka

inclut :intégration d’hébergement et intégration Client

Apache Kafka est une plateforme de diffusion en continu d’événements distribués open source. Il est utile de créer des pipelines de données en temps réel et des applications de diffusion en continu. L’intégration .NET AspireApache Kafka vous permet de vous connecter à des instances Kafka existantes ou de créer de nouvelles instances à partir de .NET avec l’image conteneur docker.io/confluentinc/confluent-local.

Intégration de l’hébergement

Le Apache Kafka hébergeant des modèles d’intégration un server Kafka en tant que type de KafkaServerResource. Pour accéder à ce type, installez le package NuGet 📦Aspire.Hosting.Kafka dans le projet hôte de l'application , puis ajoutez-le avec le générateur.

dotnet add package Aspire.Hosting.Kafka

Pour plus d’informations, consultez dotnet add package ou Gérer les dépendances de packages dans les applications .NET.

Ajouter une ressource Kafka server

Dans votre projet hôte d’application, appelez AddKafka sur l’instance de builder pour ajouter une ressource kafka server :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Lorsque .NET.NET Aspire ajoute une image conteneur à l’hôte d’application, comme illustré dans l’exemple précédent avec l’image docker.io/confluentinc/confluent-local, il crée une instance de server Kafka sur votre ordinateur local. Une référence à votre server Kafka (la variable kafka) est ajoutée à la ExampleProject. La ressource kafka server inclut les ports par défaut

La méthode WithReference configure une connexion dans le ExampleProject nommé "kafka". Pour plus d’informations, consultez Cycle de vie des ressources conteneur.

Pourboire

Si vous préférez vous connecter à un Kafka existant server, appelez plutôt AddConnectionString. Pour plus d’informations, consultez Référencer les ressources existantes.

Ajouter l’interface utilisateur Kafka

Pour ajouter l’interface utilisateur Kafka à la ressource Kafka server, appelez la méthode WithKafkaUI :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

L’interface utilisateur Kafka est une interface utilisateur web open source gratuite pour surveiller et gérer les clusters Apache Kafka. .NET .NET Aspire ajoute une autre image conteneur docker.io/provectuslabs/kafka-ui à l’hôte d’application qui exécute l’interface utilisateur Kafka.

Modifier le port hôte de l’interface utilisateur Kafka

Pour modifier le port hôte de l’interface utilisateur Kafka, chaînez un appel à la méthode WithHostPort :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

L’interface utilisateur Kafka est accessible à http://localhost:9100 dans l’exemple précédent.

Ajouter la ressource Kafka server avec un volume de stockage de données

Pour ajouter un volume de données à la ressource server Kafka, appelez la méthode WithDataVolume sur la ressource server Kafka :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Le volume de données est utilisé pour conserver les données Kafka server en dehors du cycle de vie de son conteneur. Le volume de données est monté sur le chemin /var/lib/kafka/data dans le conteneur Kafka server, et si un paramètre name n'est pas fourni, le nom est généré au hasard. Pour plus d’informations sur les volumes de données et sur la raison pour laquelle ils sont préférés par rapport aux montages de liaison , voir la documentation Docker : Volumes.

Ajouter une ressource Kafka server avec montage de données liées

Pour ajouter un montage de liaison de données à la ressource server Kafka, appelez la méthode WithDataBindMount :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Important

Les montages de liaison de données ont des fonctionnalités limitées par rapport aux volumes , qui offrent de meilleures performances, la portabilité et la sécurité, les rendant ainsi plus adaptés aux environnements de production. Toutefois, les montages de liaison autorisent un accès direct et la modification des fichiers sur le système hôte, ce qui est idéal pour le développement et les tests nécessitant des modifications en temps réel.

Les montages de liaison de données s’appuient sur le système de fichiers de l’ordinateur hôte pour conserver les données Kafka server entre les redémarrages des conteneurs. Le montage de liaison de données est effectué sur le chemin d’accès C:\Kafka\Data sous Windows (ou /Kafka/Data sur Unix) sur l’ordinateur hôte, dans le conteneur Kafka server. Pour plus d’informations sur les montages de liaison de données, consultez Docker docs : montages de liaison.

Vérifications d’intégrité de l’intégration d’hébergement

L’intégration de l’hébergement Kafka ajoute automatiquement une vérification de l'état pour la ressource Kafka server. La vérification d’intégrité vérifie qu’un producteur Kafka avec le nom de connexion spécifié est en mesure de se connecter et d'enregistrer un sujet dans le Kafka server.

L’intégration de l’hébergement s’appuie sur le package NuGet 📦 AspNetCore.HealthChecks.Kafka.

intégration de Client

Pour commencer à utiliser l’intégration .NET AspireApache Kafka, installez le package NuGet 📦Aspire.Confluent.Kafka dans le projet consommateur client, c’est-à-dire le projet de l’application qui utilise le Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Ajouter un producteur Kafka

Dans le fichier Program.cs de votre projet clientqui consomme, appelez la méthode d’extension AddKafkaProducer pour enregistrer un IProducer<TKey, TValue> à utiliser via le conteneur d’injection de dépendances. La méthode prend deux paramètres génériques correspondant au type de la clé et au type du message à envoyer au répartiteur. Ces paramètres génériques sont utilisés par AddKafkaProducer pour créer une instance de ProducerBuilder<TKey, TValue>. Cette méthode prend également le paramètre de nom de connexion.

builder.AddKafkaProducer<string, string>("messaging");

Vous pouvez ensuite récupérer l’instance IProducer<TKey, TValue> à l’aide de l’injection de dépendances. Par exemple, pour récupérer le producteur depuis un IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Pour plus d’informations sur les travailleurs, consultez services Worker dans .NET.

Ajouter un consommateur Kafka

Pour inscrire un IConsumer<TKey, TValue> à utiliser via le conteneur d’injection de dépendances, appelez la méthode d’extension AddKafkaConsumer dans le fichier Program.cs de votre projet consommant client. La méthode accepte deux paramètres génériques correspondant au type de la clé et au type du message à recevoir du répartiteur. Ces paramètres génériques sont utilisés par AddKafkaConsumer pour créer une instance de ConsumerBuilder<TKey, TValue>. Cette méthode prend également le paramètre de nom de connexion.

builder.AddKafkaConsumer<string, string>("messaging");

Vous pouvez ensuite récupérer l’instance IConsumer<TKey, TValue> à l’aide de l’injection de dépendances. Par exemple, pour récupérer le consommateur depuis un IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Ajouter des producteurs ou des consommateurs Kafka avec clé

Il peut arriver que vous souhaitiez inscrire plusieurs instances de producteur ou de consommateur avec différents noms de connexion. Pour enregistrer les producteurs ou consommateurs Kafka avec clés, appelez l'API appropriée :

Pour plus d’informations sur les services à clé, consultez .NET injection de dépendances : services à clé.

Configuration

L’intégration .NET AspireApache Kafka fournit plusieurs options pour configurer la connexion en fonction des exigences et des conventions de votre projet.

Utiliser une chaîne de connexion

Lorsque vous utilisez une chaîne de connexion à partir de la section de configuration ConnectionStrings, vous pouvez fournir le nom de la chaîne de connexion lors de l’appel de builder.AddKafkaProducer() ou de builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

Ensuite, la chaîne de connexion est récupérée à partir de la section de configuration ConnectionStrings :

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

La valeur de la chaîne de connexion est définie sur la propriété BootstrapServers de l’instance IProducer<TKey, TValue> ou IConsumer<TKey, TValue> produite. Pour plus d’informations, consultez bootstrapServers.

Utiliser des fournisseurs de configuration

L’intégration .NET AspireApache Kafka prend en charge Microsoft.Extensions.Configuration. Il charge les KafkaProducerSettings ou les KafkaConsumerSettings à partir de la configuration en utilisant respectivement les clés Aspire:Confluent:Kafka:Producer et Aspire.Confluent:Kafka:Consumer. L’extrait de code suivant est un exemple de fichier appsettings.json qui configure certaines des options :

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Les propriétés Config des sections de configuration Aspire:Confluent:Kafka:Producer et Aspire.Confluent:Kafka:Consumer se lient respectivement aux instances de ProducerConfig et de ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> nécessite que la propriété ClientId soit définie pour permettre au répartiteur de suivre les décalages de messages consommés.

Pour obtenir le schéma complet de client d’intégration JSON Kafka, consultez Aspire. Confluent.Kafka/ConfigurationSchema.json.

Utiliser des délégués en ligne

Plusieurs délégués inline sont disponibles pour configurer différentes options.

ConfigurerKafkaProducerSettings et KafkaConsumerSettings

Vous pouvez passer le délégué Action<KafkaProducerSettings> configureSettings pour configurer certaines ou toutes les options en ligne, par exemple pour désactiver les vérifications de santé à partir du code :

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Vous pouvez configurer un consommateur en ligne à partir du code :

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configurer ProducerBuilder<TKey, TValue> et ConsumerBuilder<TKey, TValue>

Pour configurer Confluent.Kafka générateurs, passez un Action<ProducerBuilder<TKey, TValue>> (ou Action<ConsumerBuilder<TKey, TValue>>) :

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Lorsque vous inscrivez des producteurs et des consommateurs, si vous avez besoin d'accéder à un service enregistré dans le conteneur DI, vous pouvez passer un Action<IServiceProvider, ProducerBuilder<TKey, TValue>> ou un Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> respectivement :

Prenons l’exemple d’inscription de producteur suivant :

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Pour plus d’informations, consultez la documentation de l’API ProducerBuilder<TKey, TValue> et ConsumerBuilder<TKey, TValue>.

Client vérifications d’intégrité de l’intégration

Par défaut, les intégrations .NET.NET Aspire activent les contrôles de santé pour tous les services. Pour plus d’informations, consultez .NET.NET Aspire vue d’ensemble des intégrations.

L'intégration .NET AspireApache Kafka gère les scénarios de vérification de l'état de santé suivants :

  • Ajoute le contrôle d’intégrité Aspire.Confluent.Kafka.Producer lorsque KafkaProducerSettings.DisableHealthChecks est false.
  • Ajoute le contrôle d’intégrité Aspire.Confluent.Kafka.Consumer lorsque KafkaConsumerSettings.DisableHealthChecks est false.
  • S'intègre au point de terminaison HTTP /health, qui stipule que toutes les vérifications d'intégrité enregistrées doivent être validées pour que l'application soit considérée prête à accepter le trafic réseau.

Observabilité et télémétrie

.NET .NET Aspire intégrations configurent automatiquement les configurations de journalisation, de suivi et de métriques, parfois appelées les piliers de l’observabilité. Pour plus d’informations sur l’observabilité de l’intégration et la télémétrie, consultez .NET.NET Aspire vue d’ensemble des intégrations. Selon le service de stockage, certaines intégrations peuvent uniquement prendre en charge certaines de ces fonctionnalités. Par exemple, certaines intégrations prennent en charge la journalisation et le suivi, mais pas les métriques. Les fonctionnalités de télémétrie peuvent également être désactivées à l’aide des techniques présentées dans la section Configuration.

Exploitation forestière

L'intégration .NET AspireApache Kafka utilise les catégories de journal suivantes :

  • Aspire.Confluent.Kafka

Traçage

L’intégration .NET AspireApache Kafka n’émet pas de traces distribuées.

Métriques

L’intégration .NET AspireApache Kafka émet les métriques suivantes à l’aide de OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Voir aussi