다음을 통해 공유


빠른 시작: Go를 사용하여 Event Hubs에서 이벤트 보내기 또는 받기

Azure Event Hubs는 초당 수백만 개의 이벤트를 수신하여 처리할 수 있는 빅 데이터 스트리밍 플랫폼이자 이벤트 수집 서비스입니다. Event Hubs는 분산된 소프트웨어와 디바이스에서 생성된 이벤트, 데이터 또는 원격 분석을 처리하고 저장할 수 있습니다. Event Hub로 전송된 데이터는 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 변환하고 저장할 수 있습니다. Event Hubs에 대한 자세한 개요는 Event Hubs 개요Event Hubs 기능을 참조하세요.

이 빠른 시작에서는 이벤트 허브와 이벤트를 주고 받는 Go 애플리케이션을 작성하는 방법을 설명합니다.

참고 항목

이 빠른 시작은 GitHub(https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs)의 샘플을 기준으로 합니다. 전송 이벤트 섹션은 example_producing_events_test.go 샘플을 기반으로 하고 수신 이벤트 섹션은 example_processor_test.go 샘플을 기반으로 합니다. 빠른 시작을 위해 코드가 간소화되고 모든 자세한 주석이 제거되므로 자세한 내용과 설명은 샘플을 참조하세요.

필수 조건

이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.

  • 로컬로 설치된 Go. 필요한 경우 다음 지침을 따릅니다.
  • 활성 Azure 계정. Azure 구독이 없는 경우 시작하기 전에 체험 계정을 만듭니다.
  • Event Hubs 네임스페이스 및 이벤트 허브 만들기 Azure Portal을 사용하여 Event Hubs 형식의 네임스페이스를 만들고, 애플리케이션에서 이벤트 허브와 통신하는 데 필요한 관리 자격 증명을 얻습니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다.

이벤트 보내기

이 섹션에서는 이벤트 허브로 이벤트를 전송하는 Go 애플리케이션을 만드는 방법을 보여줍니다.

Go 패키지 설치

다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

이벤트 허브로 이벤트를 보내는 코드

다음은 이벤트 허브에 이벤트를 보내는 코드입니다. 코드의 주요 단계는 다음과 같습니다.

  1. Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 생산자 클라이언트를 만듭니다.
  2. 일괄 처리 개체를 만들고 일괄 처리에 샘플 이벤트를 추가합니다.
  3. 이벤트로 이벤트 일괄 처리를 보냅니다.

Important

NAMESPACE CONNECTION STRING을 Event Hubs 네임스페이스에 대한 연결 문자열로 바꾸고 EVENT HUB NAME을 샘플 코드의 이벤트 허브 이름으로 바꿉니다.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

애플리케이션을 아직 실행하지는 마세요. 먼저 수신자 앱을 실행한 다음, 발신자 앱을 실행해야 합니다.

이벤트 수신

Storage 계정 및 컨테이너 만들기

이벤트의 파티션 및 검사점 임대와 같은 상태는 Azure Storage 컨테이너를 사용하여 수신기 간에 공유됩니다. Go SDK를 사용하여 스토리지 계정 및 컨테이너를 만들 수 있지만, Azure Storage 계정 정보의 지침에 따라 새로 만들 수도 있습니다.

Azure Blob Storage를 검사점 저장소로 사용할 때 다음 권장 사항을 따릅니다.

  • 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
  • 컨테이너를 다른 용도로 사용하지 말고 스토리지 계정을 다른 용도로 사용하지 마세요.
  • 스토리지 계정은 배포된 애플리케이션이 있는 지역과 동일한 지역에 있어야 합니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.

Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.

  • 계층 구조 네임스페이스
  • Blob 일시 삭제
  • 버전 관리

Go 패키지

메시지를 받으려면 다음 예제와 같이 Event Hubs에 대한 Go 패키지를 가져옵니다.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

이벤트 허브에서 이벤트를 받는 코드

다음은 이벤트 허브에서 이벤트를 받는 코드입니다. 코드의 주요 단계는 다음과 같습니다.

  1. 검사점 지정을 위해 이벤트 허브에서 사용하는 Azure Blob Storage를 나타내는 검사점 저장소 개체를 확인합니다.
  2. Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 소비자 클라이언트를 만듭니다.
  3. 클라이언트 개체와 검사점 저장소 개체를 사용하여 이벤트 프로세서를 만듭니다. 이 프로세서는 이벤트를 수신하고 처리합니다.
  4. 이벤트 허브의 각 파티션에 대해 processEvents를 이벤트를 처리하는 함수로 사용하여 파티션 클라이언트를 만듭니다.
  5. 모든 파티션 클라이언트를 실행하여 이벤트를 수신하고 처리합니다.

Important

다음 자리 표시자 값을 실제 값으로 바꿉니다.

  • AZURE STORAGE CONNECTION STRING을 Azure 스토리지 계정의 연결 문자열로 바꿉니다.
  • BLOB CONTAINER NAME을 스토리지 계정에서 만든 Blob 컨테이너의 이름으로 바꿉니다.
  • NAMESPACE CONNECTION STRING을 Event Hubs 네임스페이스의 연결 문자열로 바꿉니다.
  • EVENT HUB NAME을 샘플 코드의 이벤트 허브 이름으로 바꿉니다.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

수신자 및 발신자 앱 실행

  1. 먼저 수신자 앱을 실행합니다.

  2. 발신자 앱을 실행합니다.

  3. 수신자 창에서 다음 출력이 표시될 때까지 1분 정도 기다립니다.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

다음 단계

GitHub(https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs)의 샘플을 참조하세요.