빠른 시작: Go를 사용하여 Event Hubs에서 이벤트 보내기 또는 받기
Azure Event Hubs는 초당 수백만 개의 이벤트를 수신하여 처리할 수 있는 빅 데이터 스트리밍 플랫폼이자 이벤트 수집 서비스입니다. Event Hubs는 분산된 소프트웨어와 디바이스에서 생성된 이벤트, 데이터 또는 원격 분석을 처리하고 저장할 수 있습니다. Event Hub로 전송된 데이터는 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 변환하고 저장할 수 있습니다. Event Hubs에 대한 자세한 개요는 Event Hubs 개요 및 Event Hubs 기능을 참조하세요.
이 빠른 시작에서는 이벤트 허브와 이벤트를 주고 받는 Go 애플리케이션을 작성하는 방법을 설명합니다.
참고 항목
이 빠른 시작은 GitHub(https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs)의 샘플을 기준으로 합니다. 전송 이벤트 섹션은 example_producing_events_test.go 샘플을 기반으로 하고 수신 이벤트 섹션은 example_processor_test.go 샘플을 기반으로 합니다. 빠른 시작을 위해 코드가 간소화되고 모든 자세한 주석이 제거되므로 자세한 내용과 설명은 샘플을 참조하세요.
필수 조건
이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.
- 로컬로 설치된 Go. 필요한 경우 다음 지침을 따릅니다.
- 활성 Azure 계정. Azure 구독이 없는 경우 시작하기 전에 체험 계정을 만듭니다.
- Event Hubs 네임스페이스 및 이벤트 허브 만들기 Azure Portal을 사용하여 Event Hubs 형식의 네임스페이스를 만들고, 애플리케이션에서 이벤트 허브와 통신하는 데 필요한 관리 자격 증명을 얻습니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다.
이벤트 보내기
이 섹션에서는 이벤트 허브로 이벤트를 전송하는 Go 애플리케이션을 만드는 방법을 보여줍니다.
Go 패키지 설치
다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
이벤트 허브로 이벤트를 보내는 코드
다음은 이벤트 허브에 이벤트를 보내는 코드입니다. 코드의 주요 단계는 다음과 같습니다.
- Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 생산자 클라이언트를 만듭니다.
- 일괄 처리 개체를 만들고 일괄 처리에 샘플 이벤트를 추가합니다.
- 이벤트로 이벤트 일괄 처리를 보냅니다.
Important
NAMESPACE CONNECTION STRING
을 Event Hubs 네임스페이스에 대한 연결 문자열로 바꾸고 EVENT HUB NAME
을 샘플 코드의 이벤트 허브 이름으로 바꿉니다.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
애플리케이션을 아직 실행하지는 마세요. 먼저 수신자 앱을 실행한 다음, 발신자 앱을 실행해야 합니다.
이벤트 수신
Storage 계정 및 컨테이너 만들기
이벤트의 파티션 및 검사점 임대와 같은 상태는 Azure Storage 컨테이너를 사용하여 수신기 간에 공유됩니다. Go SDK를 사용하여 스토리지 계정 및 컨테이너를 만들 수 있지만, Azure Storage 계정 정보의 지침에 따라 새로 만들 수도 있습니다.
Azure Blob Storage를 검사점 저장소로 사용할 때 다음 권장 사항을 따릅니다.
- 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
- 컨테이너를 다른 용도로 사용하지 말고 스토리지 계정을 다른 용도로 사용하지 마세요.
- 스토리지 계정은 배포된 애플리케이션이 있는 지역과 동일한 지역에 있어야 합니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.
Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.
- 계층 구조 네임스페이스
- Blob 일시 삭제
- 버전 관리
Go 패키지
메시지를 받으려면 다음 예제와 같이 Event Hubs에 대한 Go 패키지를 가져옵니다.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
이벤트 허브에서 이벤트를 받는 코드
다음은 이벤트 허브에서 이벤트를 받는 코드입니다. 코드의 주요 단계는 다음과 같습니다.
- 검사점 지정을 위해 이벤트 허브에서 사용하는 Azure Blob Storage를 나타내는 검사점 저장소 개체를 확인합니다.
- Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 소비자 클라이언트를 만듭니다.
- 클라이언트 개체와 검사점 저장소 개체를 사용하여 이벤트 프로세서를 만듭니다. 이 프로세서는 이벤트를 수신하고 처리합니다.
- 이벤트 허브의 각 파티션에 대해 processEvents를 이벤트를 처리하는 함수로 사용하여 파티션 클라이언트를 만듭니다.
- 모든 파티션 클라이언트를 실행하여 이벤트를 수신하고 처리합니다.
Important
다음 자리 표시자 값을 실제 값으로 바꿉니다.
AZURE STORAGE CONNECTION STRING
을 Azure 스토리지 계정의 연결 문자열로 바꿉니다.BLOB CONTAINER NAME
을 스토리지 계정에서 만든 Blob 컨테이너의 이름으로 바꿉니다.NAMESPACE CONNECTION STRING
을 Event Hubs 네임스페이스의 연결 문자열로 바꿉니다.EVENT HUB NAME
을 샘플 코드의 이벤트 허브 이름으로 바꿉니다.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
수신자 및 발신자 앱 실행
먼저 수신자 앱을 실행합니다.
발신자 앱을 실행합니다.
수신자 창에서 다음 출력이 표시될 때까지 1분 정도 기다립니다.
Processing 2 event(s) Event received with body hello Event received with body world
다음 단계
GitHub(https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs)의 샘플을 참조하세요.