Event Hubs .NET SDK (AMQP) を使用してイベントのストリーミング時に Avro スキーマを使用して検証する
このクイックスタートでは、Azure.Messaging.EventHubs .NET ライブラリを使用したスキーマ検証を使用して、イベント ハブとの間でイベントを送受信する方法について説明します。
Note
Azure スキーマ レジストリは Event Hubs の機能で、イベント駆動型およびメッセージング中心のアプリケーション用のスキーマの中央リポジトリとして機能します。 プロデューサーとコンシューマー アプリケーションに対し、スキーマを管理して共有することなく、データを交換できる柔軟性を提供します。 また、再利用可能なスキーマのための単純なガバナンス フレームワークが用意されており、グループ化構成体 (スキーマ グループ) を使用してスキーマ間のリレーションシップを定義します。 詳細については、Event Hubs の Azure スキーマ レジストリ に関するページを参照してください。
前提条件
Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。
このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。
- Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
-
Microsoft Visual Studio 2022。
Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能を利用しています。 以前のバージョンの C# 言語でライブラリを使うこともできますが、新しい構文は使用できません。 完全な構文を使用するには、.NET Core SDK 3.0 以上で、言語バージョンを
latest
に設定してコンパイルすることをお勧めします。 Visual Studio を使用している場合、Visual Studio 2019 より前のバージョンには、C# 8.0 プロジェクトをビルドするために必要なツールとの互換性がありません。 無料の Community エディションを含む Visual Studio 2019 は、[ こちら](https://visualstudio.microsoft.com/vs/) からダウンロードできます。
イベント ハブの作成
「Event Hubs 名前空間とイベント ハブを作成する」のクイックスタートの手順に従って、Event Hubs 名前空間とイベント ハブを作成します。 次に、接続文字列を取得する方法に関するページの手順に従って、使用している Event Hubs 名前空間への接続文字列を取得します。
現在のクイック スタートで使用する次の設定をメモします。
- Event Hubs 名前空間の接続文字列
- イベント ハブの名前
スキーマの作成
スキーマ レジストリを使用してスキーマを作成する方法に関するページの手順に従って、スキーマ グループとスキーマを作成します。
スキーマ レジストリ ポータルを使用して、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換性モードに [なし] を使用します。
そのスキーマ グループで、次のスキーマ コンテンツを使用して、スキーマ名
Microsoft.Azure.Data.SchemaRegistry.example.Order
を使用して新しい Avro スキーマを作成します。{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
スキーマ レジストリ閲覧者ロールにユーザーを追加する
名前空間レベルでスキーマ レジストリ閲覧者ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイックスタートでは必要ありません。
- [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
- [アクセス制御 (IAM)] ページのメニューで、[+ 追加] ->[ロールの割り当てを追加] を選択します。
- [割り当ての種類] ページで、[次へ] を選択します。
- [ロール] ページで、[スキーマ レジストリ閲覧者 (プレビュー)] を選択し、ページの下部にある [次へ] を選択します。
- [+ メンバーの選択] リンクを使用してユーザー アカウントをロールに追加し、[次へ] を選択します。
- [確認と割り当て] ページで、[確認と割り当て] を選択します。
スキーマ検証を使用してイベント ハブにイベントを生成する
イベント プロデューサー用のコンソール アプリケーションを作成する
- Visual Studio 2019 を起動します。
- [新しいプロジェクトの作成] を選択します。
** [新しいプロジェクトの作成]** ダイアログ ボックスで、次の手順に従います。このダイアログ ボックスが表示されない場合は、メニューで** [ファイル]** 、** [新規]** 、** [プロジェクト]** の順に選択します。プログラミング言語として
** [C#]** を選択します。アプリケーションの種類として [コンソール] を選択します。
結果リストから
** [コンソール アプリケーション]** を選択します。次に、 [次へ] を選択します。
- プロジェクト名として「OrderProducer」、ソリューション名として「SRQuickStart」と入力し、 [OK] を選択してプロジェクトを作成します。
Event Hubs NuGet パッケージの追加
メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。
次のコマンドを実行して Azure.Messaging.EventHubs と他の NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
次に示すように、プロデューサー アプリケーションを認証して、Visual Studio Azure に接続します。
名前空間レベルで
Schema Registry Reader
ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、「Event Hubs の Azure スキーマ レジストリ」をご覧ください。
Avro スキーマを使用したコード生成
- スキーマの作成に使用したのと同じコンテンツを使用して、
Order.avsc
という名前のファイルを作成します。 プロジェクトまたはソリューション フォルダーにファイルを保存します。 - その後、このスキーマ ファイルを使用して .NET のコードを生成できます。 コード生成には、avrogen などの外部コード生成ツールを使用できます。 たとえば、
avrogen -s .\Order.avsc .
を実行してコードを生成できます。 - コードを生成すると、
\Microsoft\Azure\Data\SchemaRegistry\example
フォルダーにOrder.cs
という名前のファイルが表示されます。 上記の Avro スキーマでは、Microsoft.Azure.Data.SchemaRegistry.example
名前空間に C# 型が生成されます。 -
OrderProducer
プロジェクトにOrder.cs
ファイルを追加します。
イベント ハブにイベントをシリアル化して送信するコードを記述する
次のコードを
Program.cs
ファイルに追加します。 詳細については、コードのコメントを参照してください。 コードの大まかな手順は次のとおりです。- イベント ハブにイベントを送信するために使用できるプロデューサー クライアントを作成します。
-
Order
オブジェクト内のデータをシリアル化および検証するために使用できるスキーマ レジストリ クライアントを作成します。 - 生成された
Order
型を使用して新しいOrder
オブジェクトを作成します。 - スキーマ レジストリ クライアントを使用して、
Order
オブジェクトをEventData
にシリアル化します。 - イベントのバッチを作成します。
- イベント データをイベント バッチに追加します。
- プロデューサー クライアントを使用して、イベントのバッチをイベント ハブに送信します。
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
次のプレースホルダーの値を実際の値に置き換えます。
-
EVENTHUBSNAMESPACECONNECTIONSTRING
- Event Hubs 名前空間の接続文字列 -
EVENTHUBNAME
- イベント ハブの名前 -
EVENTHUBSNAMESPACENAME
- Event Hubs 名前空間の名前 -
SCHEMAGROUPNAME
- スキーマ グループの名前
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
プロジェクトをビルドし、エラーがないことを確認します。
プログラムを実行し、確認メッセージが表示されるまで待ちます。
A batch of 1 order has been published.
Azure portal で、イベント ハブがイベントを受信したことを確認できます。 [メトリック] セクションで [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。
スキーマ検証を使用してイベント ハブからイベントを使用する
このセクションでは、イベント ハブからイベントを受信し、スキーマ レジストリを使用してイベント データを逆シリアル化する .NET Core コンソール アプリケーションを作成する方法について説明します。
追加の前提条件
- イベント プロセッサで使用するストレージ アカウントを作成します。
コンシューマー アプリケーションを作成する
- ソリューション エクスプローラー ウィンドウで、 [SRQuickStart] ソリューションを右クリックし、 [追加] をポイントして、 [新しいプロジェクト] を選択します。
** [コンソール アプリケーション]** を選択し、** [次へ]** を選択します。- [プロジェクト名] に「OrderConsumer」と入力し、 [作成] を選択します。
- ソリューション エクスプローラー ウィンドウで、 [OrderConsumer] を右クリックし、 [スタートアップ プロジェクトとして設定] を選択します。
Event Hubs NuGet パッケージの追加
メニューから [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。
[パッケージ マネージャー コンソール] ウィンドウで、 [既定のプロジェクト] に OrderConsumer が選択されていることを確認します。 選択されていない場合は、ドロップダウン リストを使用して OrderConsumer を選択します。
次のコマンドを実行して、必要な NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
次に示すように、プロデューサー アプリケーションを認証して、Visual Studio Azure に接続します。
名前空間レベルで
Schema Registry Reader
ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、「Event Hubs の Azure スキーマ レジストリ」をご覧ください。プロデューサー アプリの作成の一環として生成した
Order.cs
ファイルを OrderConsumer プロジェクトに追加します。OrderConsumer プロジェクトを右クリックし、[スタートアップ プロジェクトに設定] を選択します。
スキーマ レジストリを使用し、イベントを受信して逆シリアル化するコードを記述する
次のコードを
Program.cs
ファイルに追加します。 詳細については、コードのコメントを参照してください。 コードの大まかな手順は次のとおりです。- イベント ハブにイベントを送信するために使用できるコンシューマー クライアントを作成します。
- Azure BLOB ストレージ内に BLOB コンテナー用の BLOB コンテナー クライアントを作成します。
- イベント プロセッサ クライアントを作成し、イベント ハンドラーとエラー ハンドラーを登録します。
- イベント ハンドラーで、イベント データを
Order
オブジェクトに逆シリアル化するために使用できるスキーマ レジストリ クライアントを作成します。 - シリアライザーを使用して、イベント データを
Order
オブジェクトに逆シリアル化します。 - 受け取った注文に関する情報を印刷します。
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
次のプレースホルダーの値を実際の値に置き換えます。
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- Event Hubs 名前空間の接続文字列 -
EVENTHUBNAME
- イベント ハブの名前 -
EVENTHUBSNAMESPACENAME
- Event Hubs 名前空間の名前 -
SCHEMAGROUPNAME
- スキーマ グループの名前 -
AZURESTORAGECONNECTIONSTRING
- Azure ストレージ アカウントの接続文字列 -
BLOBCONTAINERNAME
- BLOB コンテナーの名前
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
プロジェクトをビルドし、エラーがないことを確認します。
受信側アプリを実行します。
イベントが受信されたことを示すメッセージが表示されます。
Received order with ID: 1234, amount: 45.29, description: First sample order.
これらのイベントは、前に送信側プログラムを実行してイベント ハブに送信した 3 つのイベントです。
サンプル
GitHub リポジトリの Readme に関する記事をご覧ください。
リソースをクリーンアップする
Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。