次の方法で共有


.NET Aspire Apache Kafka 統合

含まれるもの:ホスティング統合Client 統合

Apache Kafka は、オープンソースの分散イベント ストリーミング プラットフォームです。 リアルタイム のデータ パイプラインとストリーミング アプリケーションを構築する場合に便利です。 .NET Aspire Apache Kafka 統合を使用すると、既存の Kafka インスタンスに接続したり、docker.io/confluentinc/confluent-localを使用して から新しいインスタンスを作成したりできます。

ホスティング統合

Apache Kafka ホスティング統合では、Kafka server が KafkaServerResource の種類としてモデル化されます。 この種類にアクセスするには、📦Aspireをインストールします。Hosting.Kafka NuGet パッケージを アプリ ホスト プロジェクトに追加し、ビルダーと共に追加します。

dotnet add package Aspire.Hosting.Kafka

詳細については、「dotnet パッケージ の追加」または「.NET アプリケーションでのパッケージの依存関係の管理」を参照してください。

Kafka server リソースを追加する

アプリ ホスト プロジェクトで、AddKafka インスタンスの builder を呼び出して、Kafka server リソースを追加します。

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

.NET .NET Aspire 前の例に示すように、docker.io/confluentinc/confluent-local イメージでコンテナー イメージをアプリ ホストに追加すると、ローカル コンピューターに新しい Kafka server インスタンスが作成されます。 Kafka server (kafka 変数) への参照が ExampleProjectに追加されます。 Kafka server リソースには既定のポートが含まれています

WithReference メソッドは、ExampleProjectという名前の "kafka" で接続を構成します。 詳細については、「コンテナー リソースのライフサイクルの」を参照してください。

アドバイス

既存の Kafka serverに接続する場合は、代わりに AddConnectionString 呼び出します。 詳細については、「既存のリソースを参照する」を参照してください。

Kafka UI の追加

Kafka UI を Kafka server リソースに追加するには、WithKafkaUI メソッドを呼び出します。

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI は、Apache Kafka クラスターを監視および管理するための無料のオープン ソース Web UI です。 .NET .NET Aspire、Kafka UI を実行する別のコンテナー イメージ docker.io/provectuslabs/kafka-ui をアプリ ホストに追加します。

Kafka UI ホスト ポートを変更する

Kafka UI ホスト ポートを変更するには、WithHostPort メソッドの呼び出しをチェーンします。

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI には、前の例の http://localhost:9100 でアクセスできます。

データ ボリュームを使用して Kafka server リソースを追加する

Kafka server リソースにデータ ボリュームを追加するには、Kafka WithDataVolume リソースで server メソッドを呼び出します。

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

データ ボリュームは、Kafka server データをコンテナーのライフサイクル外に保持するために使用されます。 データ ボリュームは Kafka /var/lib/kafka/data コンテナーの server パスにマウントされ、name パラメーターが指定されていない場合、名前はランダムに生成されます。 データ ボリュームの詳細と、マウントのバインド 優先される理由の詳細については、「 ドキュメント: ボリューム」を参照してください。

データ バインド マウントを使用して Kafka server リソースを追加する

Kafka server リソースにデータ バインド マウントを追加するには、WithDataBindMount メソッドを呼び出します。

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

大事な

データ バインド マウント、パフォーマンス、移植性、およびセキュリティが向上し、運用環境に適した ボリュームと比較して機能が制限されています。 ただし、バインド マウントを使用すると、ホスト システム上のファイルに直接アクセスして変更できるため、リアルタイムの変更が必要な開発とテストに最適です。

データ バインド マウントは、ホスト マシンのファイルシステムに依存して、コンテナーの再起動時に Kafka server データを保持します。 データ バインド マウントは、Kafka C:\Kafka\Data コンテナー内のホスト コンピューター上の Windows 上の /Kafka/Data (または Unixで server) パスにマウントされます。 データ バインド マウントの詳細については、「ドキュメント Docker: バインド マウント」を参照してください。

ホスティング統合の正常性チェック

Kafka ホスティング統合により、Kafka server リソースの正常性チェックが自動的に追加されます。 正常性チェックでは、指定された接続名を持つ Kafka プロデューサーが、Kafka serverにトピックを接続して保持できることを確認します。

ホスティング統合は、📦 AspNetCore.HealthChecks.Kafka NuGet パッケージに依存します。

Client 統合

.NET Aspire Apache Kafka 統合を開始するには、📦Aspireをインストールします。そのためには、Confluent.Kafka NuGet パッケージを、clientを利用するプロジェクト、つまり Apache Kafkaclientを使用するアプリケーション用のプロジェクトにインストールしてください。

dotnet add package Aspire.Confluent.Kafka

Kafka プロデューサーを追加する

Program.cs-consuming プロジェクトの client ファイルで、AddKafkaProducer 拡張メソッドを呼び出して、依存関係挿入コンテナー経由で使用する IProducer<TKey, TValue> を登録します。 このメソッドは、キーの型とブローカーに送信するメッセージの型に対応する 2 つのジェネリック パラメーターを受け取ります。 これらのジェネリック パラメーターは、AddKafkaProducerのインスタンスを作成するために ProducerBuilder<TKey, TValue> によって使用されます。 このメソッドは、接続名パラメーターも受け取ります。

builder.AddKafkaProducer<string, string>("messaging");

その後、依存関係の挿入を使用して IProducer<TKey, TValue> インスタンスを取得できます。 たとえば、IHostedServiceからプロデューサーを取得するには、

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

詳細については「にある ワーカーサービス」をご覧ください。

Kafka コンシューマーを追加する

依存関係挿入コンテナーを使用して使用する IConsumer<TKey, TValue> を登録するには、AddKafkaConsumerを使用するプロジェクトの Program.cs ファイルで client 拡張メソッドを呼び出します。 このメソッドは、キーの型とブローカーから受信するメッセージの型に対応する 2 つのジェネリック パラメーターを受け取ります。 これらのジェネリック パラメーターは、AddKafkaConsumerのインスタンスを作成するために ConsumerBuilder<TKey, TValue> によって使用されます。 このメソッドは、接続名パラメーターも受け取ります。

builder.AddKafkaConsumer<string, string>("messaging");

その後、依存関係の挿入を使用して IConsumer<TKey, TValue> インスタンスを取得できます。 たとえば、IHostedServiceからコンシューマーを取得するには、

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

キー付き Kafka プロデューサーまたはコンシューマーを追加する

複数のプロデューサー インスタンスまたはコンシューマー インスタンスを異なる接続名で登録したい場合があります。 キー付き Kafka プロデューサーまたはコンシューマーを登録するには、適切な API を呼び出します。

キー付きサービスの詳細については、「.NET 依存関係の挿入: キー付きサービスの」を参照してください。

構成

.NET Aspire Apache Kafka 統合には、プロジェクトの要件と規則に基づいて接続を構成するための複数のオプションが用意されています。

接続文字列を使用する

ConnectionStrings 構成セクションの接続文字列を使用する場合は、builder.AddKafkaProducer() または builder.AddKafkaProducer()を呼び出すときに接続文字列の名前を指定できます。

builder.AddKafkaProducer<string, string>("kafka-producer");

その後、接続文字列は ConnectionStrings 構成セクションから取得されます。

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

接続文字列の値は、生成された BootstrapServers または IProducer<TKey, TValue> インスタンスの IConsumer<TKey, TValue> プロパティに設定されます。 詳細については、「BootstrapServers」を参照してください。

構成プロバイダーを使用する

.NET Aspire Apache Kafka 統合では、Microsoft.Extensions.Configurationがサポートされます。 KafkaProducerSettings キーと KafkaConsumerSettings キーをそれぞれ使用して、構成から Aspire:Confluent:Kafka:Producer または Aspire.Confluent:Kafka:Consumer を読み込みます。 次のスニペットは、いくつかのオプションを構成する appsettings.json ファイルの例です。

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Config 構成セクションと Aspire:Confluent:Kafka:Producer 構成セクションの両方の Aspire.Confluent:Kafka:Consumer プロパティは、それぞれ ProducerConfigConsumerConfigのインスタンスにバインドされます。

Confluent.Kafka.Consumer<TKey, TValue> では、ブローカーが消費したメッセージ オフセットを追跡できるように、ClientId プロパティを設定する必要があります。

完全な Kafka client 統合 JSON スキーマについては、Aspireを参照してください。Confluent.Kafka/ConfigurationSchema。json.

インライン デリゲートを使用する

さまざまなオプションを構成するために使用できるインライン デリゲートがいくつかあります。

KafkaProducerSettingsKafkaConsumerSettings を構成する

Action<KafkaProducerSettings> configureSettings デリゲートを渡して、コードから正常性チェックを無効にするなど、一部またはすべてのオプションをインラインで設定できます。

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

コードからコンシューマーをインラインで構成できます。

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue> を構成する

Confluent.Kafka ビルダーを構成するには、Action<ProducerBuilder<TKey, TValue>> (または Action<ConsumerBuilder<TKey, TValue>>) を渡します。

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

プロデューサーとコンシューマーを登録するときに、DI コンテナーに登録されているサービスにアクセスする必要がある場合は、それぞれ Action<IServiceProvider, ProducerBuilder<TKey, TValue>> または Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> を渡すことができます。

次のプロデューサー登録の例を考えてみましょう。

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

詳細については、ProducerBuilder<TKey, TValue> および ConsumerBuilder<TKey, TValue> API のドキュメントを参照してください。

Client統合の正常性チェック

既定では、.NET.NET Aspire 統合により、すべてのサービス 正常性チェック が有効になります。 詳細については、.NET.NET Aspire 統合の概要を参照してください。

.NET Aspire Apache Kafka 統合では、次の正常性チェック シナリオが処理されます。

  • Aspire.Confluent.Kafka.ProducerKafkaProducerSettings.DisableHealthChecksされたときに、false 正常性チェックを追加します。
  • Aspire.Confluent.Kafka.ConsumerKafkaConsumerSettings.DisableHealthChecksされたときに、false 正常性チェックを追加します。
  • /health HTTP エンドポイントと統合されます。このエンドポイントは、アプリがトラフィックを受け入れる準備ができていると見なされるために、登録されているすべての正常性チェックに合格する必要があります。

可観測性とテレメトリ

統合により、ログ記録、トレース、メトリックの構成が自動的に設定されます。これは、監視の柱 とも呼ばれます。 統合の可観測性とテレメトリの詳細については、統合の概要 参照してください。 バッキング サービスによっては、一部の統合でこれらの機能の一部のみがサポートされる場合があります。 たとえば、一部の統合ではログ記録とトレースがサポートされますが、メトリックはサポートされません。 テレメトリ機能は、「構成」セクションに記載されている手法を使用して無効にすることもできます。

伐採

.NET Aspire Apache Kafka 統合では、次のログ カテゴリが使用されます。

  • Aspire.Confluent.Kafka

トレーシング

.NET Aspire Apache Kafka 統合では、分散トレースは出力されません。

メトリック

.NET Aspire Apache Kafka 統合では、OpenTelemetryを使用して次のメトリックが出力されます。

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

参照