Event Hubs .NET SDK(AMQP)를 사용하여 이벤트를 스트리밍할 때 Avro 스키마를 사용하여 유효성 검사
이 빠른 시작에서는 Azure.Messaging.EventHubs .NET 라이브러리를 사용하여 스키마 유효성 검사를 통해 이벤트 허브와 이벤트를 주고 받는 방법을 알아봅니다.
참고 항목
Azure 스키마 레지스트리는 스키마에 대한 중앙 리포지토리를 이벤트 구동 및 메시징 중심 애플리케이션에 제공하는 Event Hubs의 기능입니다. 생산자 및 소비자 애플리케이션이 스키마를 관리 및 공유하지 않고도 데이터를 교환할 수 있는 유연성을 제공합니다. 또한 재사용 가능한 스키마에 대한 간단한 거버넌스 프레임워크를 제공하고, 그룹화 구성(스키마 그룹)을 통해 스키마 간의 관계를 정의합니다. 자세한 내용은 Event Hubs의 Azure 스키마 레지스트리를 참조하세요.
필수 조건
Azure Event Hubs를 처음 사용하는 경우 이 빠른 시작을 수행하기 전에 Event Hubs 개요를 참조하세요.
이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.
- Azure 구독이 아직 없는 경우 시작하기 전에 체험 계정을 만듭니다.
- Microsoft Visual Studio 2022.
Azure Event Hubs 클라이언트 라이브러리는 C# 8.0에 도입된 새 기능을 사용합니다. 이전 C# 언어 버전으로 라이브러리를 계속 사용할 수 있지만 새 구문은 사용할 수 없습니다. 전체 구문을 사용하려면 .NET Core SDK 3.0 이상 및
latest
(으)로 설정된 언어 버전으로 컴파일하는 것이 좋습니다. Visual Studio를 사용하는 경우 Visual Studio 2019 이전 버전은 C# 8.0 프로젝트를 빌드하는 데 필요한 도구와 호환되지 않습니다. 체험 Community 버전을 비롯한 Visual Studio 2019는 여기서 다운로드할 수 있습니다.
이벤트 허브 만들기
빠른 시작: Event Hubs 네임스페이스 및 이벤트 허브 만들기의 지침에 따라 Event Hubs 네임스페이스와 이벤트 허브를 만듭니다. 그런 다음, 연결 문자열 가져오기의 지침에 따라 Event Hubs 네임스페이스에 대한 연결 문자열을 가져옵니다.
현재 빠른 시작에서 사용할 다음 설정을 적어둡니다.
- Event Hubs 네임스페이스에 대한 연결 문자열
- 이벤트 허브의 이름입니다.
스키마 만들기
스키마 레지스트리를 사용하여 스키마 만들기의 지침에 따라 스키마 그룹 및 스키마를 만듭니다.
스키마 레지스트리 포털을 사용하여 contoso-sg라는 스키마 그룹을 만듭니다. Avro는 직렬화 형식으로 사용하고 호환성 모드에서는 사용하지 않습니다.
해당 스키마 그룹에서 다음 스키마 콘텐츠를 사용하여 스키마 이름:
Microsoft.Azure.Data.SchemaRegistry.example.Order
으로 새 Avro 스키마를 만듭니다.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
스키마 레지스트리 읽기 권한자 역할에 사용자 추가
네임스페이스 수준에서 스키마 레지스트리 읽기 권한자 역할에 사용자 계정을 추가합니다. 스키마 레지스트리 기여자 역할을 사용할 수도 있지만 이 빠른 시작에서는 이것이 필요하지 않습니다.
- Event Hubs 네임스페이스 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.
- 액세스 제어(IAM) 페이지의 메뉴에서 + 추가 ->역할 할당 추가를 선택합니다.
- 할당 유형 페이지에서 다음을 선택합니다.
- 역할 페이지에서 스키마 레지스트리 읽기 권한자(미리 보기)를 선택한 다음, 페이지 아래쪽에서 다음을 선택합니다.
- + 구성원 선택 링크를 사용하여 사용자 계정을 역할에 추가하고 다음을 선택합니다.
- 검토 + 할당 페이지에서 검토 + 할당을 선택합니다.
스키마 유효성 검사를 사용하여 이벤트 허브에 이벤트 생성
이벤트 생산자용 콘솔 애플리케이션 만들기
- Visual Studio 2019를 시작합니다.
- 새 프로젝트 만들기를 선택합니다.
- 새 프로젝트 만들기 대화 상자에서 다음 단계를 수행합니다. 이 대화 상자가 표시되지 않으면 메뉴에서 파일을 선택하고 새로 만들기를 선택한 다음, 프로젝트를 선택합니다.
프로그래밍 언어로 C#을 선택합니다.
애플리케이션 유형으로 콘솔을 선택합니다.
결과 목록에서 콘솔 애플리케이션을 선택합니다.
그런 후에 다음을 선택합니다.
- 프로젝트 이름으로 OrderProducer, 솔루션 이름으로 SRQuickStart를 입력한 다음, 확인을 선택하여 프로젝트를 만듭니다.
Event Hubs NuGet 패키지 추가
메뉴에서 도구>NuGet 패키지 관리자>패키지 관리자 콘솔을 선택합니다.
다음 명령을 실행하여 Azure.Messaging.EventHubs 및 기타 NuGet 패키지를 설치합니다. ENTER를 눌러 마지막 명령을 실행합니다.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
생산자 애플리케이션을 인증하여 여기에 표시된 대로 Visual Studio를 통해 Azure에 연결합니다.
네임스페이스 수준에서
Schema Registry Reader
역할의 구성원인 사용자 계정을 사용하여 Azure에 로그인합니다. 스키마 레지스트리 역할에 대한 자세한 내용은 Event Hubs의 Azure 스키마 레지스트리를 참조하세요.
Avro 스키마를 사용하여 코드 생성
- 스키마를 만드는 데 사용한 것과 동일한 콘텐츠를 사용하여
Order.avsc
라는 파일을 만듭니다. 프로젝트 또는 솔루션 폴더에 파일을 저장합니다. - 그런 다음 이 스키마 파일을 사용하여 .NET에 대한 코드를 생성할 수 있습니다. 코드 생성을 위해 avrogen과 같은 외부 코드 생성 도구를 사용할 수 있습니다. 예를 들어
avrogen -s .\Order.avsc .
를 실행하여 코드를 생성할 수 있습니다. - 코드를 생성하면
\Microsoft\Azure\Data\SchemaRegistry\example
폴더에Order.cs
이라는 파일이 표시됩니다. 위의 Avro 스키마의 경우Microsoft.Azure.Data.SchemaRegistry.example
네임스페이스에 C# 형식을 생성 합니다. Order.cs
파일을OrderProducer
프로젝트에 추가합니다.
이벤트를 직렬화하고 이벤트 허브로 전송하는 코드를 작성합니다.
Program.cs
파일에 다음 코드를 추가합니다. 자세한 내용은 코드 주석을 참조하세요. 코드의 개략적인 단계는 다음과 같습니다.- 이벤트 허브에 이벤트를 보내는 데 사용할 수 있는 생산자 클라이언트를 만듭니다.
Order
개체의 데이터를 직렬화하고 유효성을 검사하는 데 사용할 수 있는 스키마 레지스트리 클라이언트를 만듭니다.- 생성된
Order
형식을 사용하여 새Order
개체를 만듭니다. - 스키마 레지스트리 클라이언트를 사용하여
Order
개체를EventData
로 직렬화합니다. - 이벤트 일괄 처리를 만듭니다.
- 이벤트 일괄 처리에 이벤트 데이터를 추가합니다.
- 생산자 클라이언트를 사용하여 이벤트 일괄 처리를 이벤트 허브로 보냅니다.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
다음 자리 표시자 값을 실제 값으로 바꿉니다.
EVENTHUBSNAMESPACECONNECTIONSTRING
- Event Hubs 네임스페이스에 대한 연결 문자열EVENTHUBNAME
- 이벤트 허브의 이름EVENTHUBSNAMESPACENAME
- Event Hubs 네임스페이스의 이름SCHEMAGROUPNAME
- 스키마 그룹의 이름
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
프로그램을 빌드하고 오류가 없는지 확인합니다.
프로그램을 실행하고 확인 메시지가 나타날 때까지 기다립니다.
A batch of 1 order has been published.
Azure Portal에서 이벤트 허브가 이벤트를 받았는지 확인할 수 있습니다. 메트릭 섹션에서 메시지 보기로 전환합니다. 페이지를 새로 고쳐 차트를 업데이트합니다. 메시지가 수신되었다는 내용이 표시될 때까지 몇 초 정도 걸릴 수 있습니다.
스키마 유효성 검사를 사용하여 이벤트 허브의 이벤트 사용
이 섹션에서는 이벤트 허브에서 이벤트를 수신하고 스키마 레지스트리를 사용하여 이벤트 데이터를 역직렬화하는 .NET Core 콘솔 애플리케이션을 작성하는 방법을 보여줍니다.
추가 필수 조건
- 이벤트 프로세서를 사용할 스토리지 계정을 만듭니다.
소비자 애플리케이션 만들기
- 솔루션 탐색기 창에서 SRQuickStart 솔루션을 마우스 오른쪽 단추로 클릭하고, 추가를 가리킨 다음, 새 프로젝트를 선택합니다.
- 콘솔 애플리케이션을 선택하고 다음을 선택합니다.
- 프로젝트 이름에 OrderConsumer를 입력하고 만들기를 선택합니다.
- 솔루션 탐색기 창에서 OrderConsumer를 마우스 오른쪽 단추로 클릭하고 시작 프로젝트로 설정을 선택합니다.
Event Hubs NuGet 패키지 추가
메뉴에서 도구>NuGet 패키지 관리자>패키지 관리자 콘솔을 선택합니다.
패키지 관리자 콘솔 창에서 기본 프로젝트로 OrderConsumer가 선택되어 있는지 확인합니다. 그렇지 않은 경우 드롭다운 목록을 사용하여 OrderConsumer를 선택합니다.
다음 명령을 실행하여 필요한 NuGet 패키지를 설치합니다. ENTER를 눌러 마지막 명령을 실행합니다.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
생산자 애플리케이션을 인증하여 여기에 표시된 대로 Visual Studio를 통해 Azure에 연결합니다.
네임스페이스 수준에서
Schema Registry Reader
역할의 구성원인 사용자 계정을 사용하여 Azure에 로그인합니다. 스키마 레지스트리 역할에 대한 자세한 내용은 Event Hubs의 Azure 스키마 레지스트리를 참조하세요.생산자 앱을 만드는 과정의 일부로 생성한
Order.cs
파일을 OrderConsumer 프로젝트에 추가합니다.OrderConsumer 프로젝트를 마우스 오른쪽 단추로 클릭하고 시작 프로젝트로 설정을 선택합니다.
스키마 레지스트리를 사용하여 이벤트를 수신하고 역직렬화하는 코드 작성
Program.cs
파일에 다음 코드를 추가합니다. 자세한 내용은 코드 주석을 참조하세요. 코드의 개략적인 단계는 다음과 같습니다.- 이벤트 허브에 이벤트를 보내는 데 사용할 수 있는 소비자 클라이언트를 만듭니다.
- Azure Blob Storage의 Blob 컨테이너에 대한 Blob 컨테이너 클라이언트를 만듭니다.
- 이벤트 프로세서 클라이언트를 만들고 이벤트 및 오류 처리기를 등록합니다.
- 이벤트 처리기에서 이벤트 데이터를
Order
개체로 역직렬화하는 데 사용할 수 있는 스키마 레지스트리 클라이언트를 만듭니다. - 직렬 변환기를 사용하여 이벤트 데이터를
Order
개체로 역직렬화합니다. - 받은 주문에 대한 정보를 인쇄합니다.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be used as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
다음 자리 표시자 값을 실제 값으로 바꿉니다.
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- Event Hubs 네임스페이스에 대한 연결 문자열EVENTHUBNAME
- 이벤트 허브의 이름EVENTHUBSNAMESPACENAME
- Event Hubs 네임스페이스의 이름SCHEMAGROUPNAME
- 스키마 그룹의 이름AZURESTORAGECONNECTIONSTRING
- Azure 스토리지 계정에 대한 연결 문자열BLOBCONTAINERNAME
- Blob 컨테이너의 이름
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
프로그램을 빌드하고 오류가 없는지 확인합니다.
수신기 애플리케이션을 실행합니다.
이벤트가 수신되었다는 메시지가 표시됩니다.
Received order with ID: 1234, amount: 45.29, description: First sample order.
이러한 이벤트는 앞에서 송신기 프로그램을 실행하여 이벤트 허브로 보낸 세 개 이벤트입니다.
샘플
GitHub 리포지토리의 추가 정보 문서를 참조하세요.
리소스 정리
Event Hubs 네임스페이스를 삭제하거나 네임스페이스가 포함된 리소스 그룹을 삭제합니다.