.NET Aspire Apache Kafka 統合
Apache Kafka は、オープンソースの分散イベント ストリーミング プラットフォームです。 リアルタイム のデータ パイプラインとストリーミング アプリケーションを構築する場合に便利です。
.NET Aspire
Apache Kafka 統合を使用すると、既存の Kafka インスタンスに接続したり、.NETを使用して docker.io/confluentinc/confluent-local
から新しいインスタンスを作成したりできます。
ホスティング統合
統合をホストする Apache Kafka は、Kafka サーバーを KafkaServerResource の種類としてモデル化します。 この種類にアクセスするには、📦Aspireをインストールします。Hosting.Kafka NuGet パッケージを アプリ ホスト プロジェクトに追加し、ビルダーと共に追加します。
dotnet add package Aspire.Hosting.Kafka
詳細については、「dotnet パッケージ の追加」または「.NET アプリケーションでのパッケージの依存関係の管理」を参照してください。
Kafka サーバー リソースを追加する
アプリ ホスト プロジェクトで、AddKafka インスタンスの builder
を呼び出して、Kafka サーバー リソースを追加します。
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
.NET
.NET Aspire 前の例で示したように、docker.io/confluentinc/confluent-local
イメージを使用してコンテナー イメージをアプリ ホストに追加すると、ローカル コンピューターに新しい Kafka サーバー インスタンスが作成されます。 Kafka サーバー (kafka
変数) への参照が ExampleProject
に追加されます。 Kafka サーバー リソースに既定のポートが含まれている
WithReference メソッドは、ExampleProject
という名前の "kafka"
で接続を構成します。 詳細については、「コンテナー リソースのライフサイクルの」を参照してください。
アドバイス
既存の Kafka サーバーに接続する場合は、代わりに AddConnectionString を呼び出します。 詳細については、「既存のリソースを参照する」を参照してください。
Kafka UI の追加
Kafka UI を Kafka サーバー リソースに追加するには、WithKafkaUI メソッドを呼び出します。
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI は、Apache Kafka クラスターを監視および管理するための無料のオープン ソース Web UI です。
.NET
.NET Aspire、Kafka UI を実行する別のコンテナー イメージ docker.io/provectuslabs/kafka-ui
をアプリ ホストに追加します。
Kafka UI ホスト ポートを変更する
Kafka UI ホスト ポートを変更するには、WithHostPort メソッドの呼び出しをチェーンします。
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI には、前の例の http://localhost:9100
でアクセスできます。
データ ボリュームを使用して Kafka サーバー リソースを追加する
Kafka サーバー リソースにデータ ボリュームを追加するには、Kafka サーバー リソースで WithDataVolume メソッドを呼び出します。
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
データ ボリュームは、Kafka サーバー のデータをコンテナーのライフサイクル外に保持するために使用されます。 データ ボリュームは Kafka サーバー コンテナーの /var/lib/kafka/data
パスにマウントされ、name
パラメーターが指定されていない場合、名前はランダムに生成されます。 データ ボリュームの詳細と、マウントのバインド
データ バインド マウントを使用して Kafka サーバー リソースを追加する
Kafka サーバー リソースにデータ バインド マウントを追加するには、WithDataBindMount メソッドを呼び出します。
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
大事な
データ バインド マウント、パフォーマンス、移植性、およびセキュリティが向上し、運用環境に適した ボリュームと比較して機能が制限されています。 ただし、バインド マウントを使用すると、ホスト システム上のファイルに直接アクセスして変更できるため、リアルタイムの変更が必要な開発とテストに最適です。
データ バインド マウントは、ホスト マシンのファイルシステムに依存して、コンテナーの再起動の間に Kafka サーバー データを保持します。 データ バインド マウントは、Kafka サーバー コンテナー内のホスト コンピューター上の Windows 上の C:\Kafka\Data
(または /Kafka/Data
上の Unix) パスにマウントされます。 データ バインド マウントの詳細については、「ドキュメント Docker: バインド マウント」を参照してください。
ホスティング統合の正常性チェック
Kafka ホスティング統合では、Kafka サーバー リソースの正常性チェックが自動的に追加されます。 正常性チェックでは、指定された接続名を持つ Kafka プロデューサーが、Kafka サーバーにトピックを接続して永続化できることを確認します。
ホスティング統合は、📦 AspNetCore.HealthChecks.Kafka NuGet パッケージに依存します。
Client 統合
.NET Aspire Apache Kafka 統合を開始するには、📦 クライアントを使用するアプリケーションのプロジェクト、つまりクライアントを消費するプロジェクトにおいて、Apache Kafka NuGet パッケージをインストールします。
dotnet add package Aspire.Confluent.Kafka
Kafka プロデューサーを追加する
クライアントを使用するプロジェクトの Program.cs ファイルで、AddKafkaProducer 拡張メソッドを呼び出して、依存関係挿入コンテナーを介して使用する IProducer<TKey, TValue>
を登録します。 このメソッドは、キーの型とブローカーに送信するメッセージの型に対応する 2 つのジェネリック パラメーターを受け取ります。 これらのジェネリック パラメーターは、AddKafkaProducer
のインスタンスを作成するために ProducerBuilder<TKey, TValue>
によって使用されます。 このメソッドは、接続名パラメーターも受け取ります。
builder.AddKafkaProducer<string, string>("messaging");
その後、依存関係の挿入を使用して IProducer<TKey, TValue>
インスタンスを取得できます。 たとえば、IHostedService
からプロデューサーを取得するには、
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
詳細については「
Kafka コンシューマーを追加する
依存関係挿入コンテナーを使用して使用する IConsumer<TKey, TValue>
を登録するには、クライアントを使用するプロジェクトの AddKafkaConsumer ファイルで Program.cs 拡張メソッドを呼び出します。 このメソッドは、キーの型とブローカーから受信するメッセージの型に対応する 2 つのジェネリック パラメーターを受け取ります。 これらのジェネリック パラメーターは、AddKafkaConsumer
のインスタンスを作成するために ConsumerBuilder<TKey, TValue>
によって使用されます。 このメソッドは、接続名パラメーターも受け取ります。
builder.AddKafkaConsumer<string, string>("messaging");
その後、依存関係の挿入を使用して IConsumer<TKey, TValue>
インスタンスを取得できます。 たとえば、IHostedService
からコンシューマーを取得するには、
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
キー付き Kafka プロデューサーまたはコンシューマーを追加する
複数のプロデューサー インスタンスまたはコンシューマー インスタンスを異なる接続名で登録したい場合があります。 キー付き Kafka プロデューサーまたはコンシューマーを登録するには、適切な API を呼び出します。
- AddKeyedKafkaProducer: キー付き Kafka プロデューサーを登録します。
- AddKeyedKafkaConsumer: キー付き Kafka コンシューマーを登録します。
キー付きサービスの詳細については、「.NET 依存関係の挿入: キー付きサービスの」を参照してください。
構成
.NET Aspire Apache Kafka 統合には、プロジェクトの要件と規則に基づいて接続を構成するための複数のオプションが用意されています。
接続文字列を使用する
ConnectionStrings
構成セクションの接続文字列を使用する場合は、builder.AddKafkaProducer()
または builder.AddKafkaProducer()
を呼び出すときに接続文字列の名前を指定できます。
builder.AddKafkaProducer<string, string>("kafka-producer");
その後、接続文字列は ConnectionStrings
構成セクションから取得されます。
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
接続文字列の値は、生成された BootstrapServers
または IProducer<TKey, TValue>
インスタンスの IConsumer<TKey, TValue>
プロパティに設定されます。 詳細については、「BootstrapServers」を参照してください。
構成プロバイダーを使用する
.NET Aspire
Apache Kafka 統合では、Microsoft.Extensions.Configurationがサポートされます。
KafkaProducerSettings キーと KafkaConsumerSettings キーをそれぞれ使用して、構成から Aspire:Confluent:Kafka:Producer
または Aspire.Confluent:Kafka:Consumer
を読み込みます。 次のスニペットは、いくつかのオプションを構成する appsettings.json ファイルの例です。
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Config
構成セクションと Aspire:Confluent:Kafka:Producer
構成セクションの両方の Aspire.Confluent:Kafka:Consumer
プロパティは、それぞれ ProducerConfig
と ConsumerConfig
のインスタンスにバインドされます。
Confluent.Kafka.Consumer<TKey, TValue>
では、ブローカーが消費したメッセージ オフセットを追跡できるように、ClientId
プロパティを設定する必要があります。
完全な Kafka クライアント統合 JSON スキーマについては、Aspireを参照してください。Confluent.Kafka/ConfigurationSchema.json.
インライン デリゲートを使用する
さまざまなオプションを構成するために使用できるインライン デリゲートがいくつかあります。
KafkaProducerSettings
と KafkaConsumerSettings
を構成する
Action<KafkaProducerSettings> configureSettings
デリゲートを渡して、コードから正常性チェックを無効にするなど、一部またはすべてのオプションをインラインで設定できます。
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
コードからコンシューマーをインラインで構成できます。
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue>
と ConsumerBuilder<TKey, TValue>
を構成する
Confluent.Kafka
ビルダーを構成するには、Action<ProducerBuilder<TKey, TValue>>
(または Action<ConsumerBuilder<TKey, TValue>>
) を渡します。
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
プロデューサーとコンシューマーを登録するときに、DI コンテナーに登録されているサービスにアクセスする必要がある場合は、それぞれ Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
または Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
を渡すことができます。
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
次のプロデューサー登録の例を考えてみましょう。
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
詳細については、ProducerBuilder<TKey, TValue>
および ConsumerBuilder<TKey, TValue>
API のドキュメントを参照してください。
Client統合の正常性チェック
既定では、.NET.NET Aspire 統合により、すべてのサービス 正常性チェック が有効になります。 詳細については、.NET.NET Aspire 統合の概要を参照してください。
.NET Aspire Apache Kafka 統合では、次の正常性チェック シナリオが処理されます。
-
Aspire.Confluent.Kafka.Producer
が KafkaProducerSettings.DisableHealthChecksされたときに、false
正常性チェックを追加します。 -
Aspire.Confluent.Kafka.Consumer
が KafkaConsumerSettings.DisableHealthChecksされたときに、false
正常性チェックを追加します。 -
/health
HTTP エンドポイントと統合されます。このエンドポイントは、アプリがトラフィックを受け入れる準備ができていると見なされるために、登録されているすべての正常性チェックに合格する必要があります。
可観測性とテレメトリ
伐採
.NET Aspire Apache Kafka 統合では、次のログ カテゴリが使用されます。
Aspire.Confluent.Kafka
トレーシング
.NET Aspire Apache Kafka 統合では、分散トレースは出力されません。
メトリック
.NET Aspire Apache Kafka 統合では、OpenTelemetryを使用して次のメトリックが出力されます。
Aspire.Confluent.Kafka
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
参照
- Apache Kafka
- コンフルエント
- Confluent Kafka .NET クライアント ドキュメント
- .NET .NET Aspire 統合
- .NET Aspire GitHub リポジトリ
.NET Aspire