Azure Service Bus キューとの間でメッセージを送受信する (Go)
このチュートリアルでは、Go プログラミング言語を使用して Azure Service Bus キューとの間でメッセージを送受信する方法について説明します。
Azure Service Bus は、メッセージ キューと、パブリッシュとサブスクライブの機能を備えたフル マネージド エンタープライズ統合メッセージ ブローカーです。 Service Bus は、アプリケーションとサービスを相互に分離し、メッセージ転送の分散性、信頼性、パフォーマンスを高めるために使用されます。
Azure SDK for Go の azservicebus パッケージを使用すると、Go プログラミング言語を使用して Azure Service Bus からメッセージを送受信できます。
このチュートリアルを終了すると、1 つのメッセージまたはメッセージのバッチをキューに送信し、メッセージを受信し、処理されないメッセージを配信不能キュー送信できるようになります。
前提条件
- Azure サブスクリプション。 Visual Studio または MSDN のサブスクライバー特典を有効にするか、無料アカウントにサインアップしてください。
- 使用するキューがない場合は、「Azure portal を使用して Service Bus キューを作成する」の記事にある手順に従って、キューを作成します。
- Go バージョン 1.18 以降
サンプル アプリを作成する
まず、新しい Go モジュールを作成します。
このモジュール用に、
service-bus-go-how-to-use-queues
という名前の新しいディレクトリを作成します。azservicebus
ディレクトリでモジュールを初期化し、必要なパッケージをインストールします。go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
main.go
という名前で新しいファイルを作成します。
クライアントの認証と作成
main.go
ファイルで、GetClient
という名前の新しい関数を作成し、次のコードを追加します。
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
GetClient
関数は、Azure Service Bus 名前空間と資格情報を使用して作成された新しい azservicebus.Client
オブジェクトを返します。 名前空間は AZURE_SERVICEBUS_HOSTNAME
環境変数によって指定されます。 また、資格情報は azidentity.NewDefaultAzureCredential
関数を使用して作成されます。
ローカル開発では、DefaultAzureCredential
で Azure CLI からのアクセス トークンが使用されます。これは、az login
コマンドを実行して Azure に対する認証を行うことで作成できます。
ヒント
接続文字列で認証するには、NewClientFromConnectionString 関数を使用します。
メッセージをキューに送信する
main.go
ファイルで、SendMessage
という名前の新しい関数を作成し、次のコードを追加します。
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
は 2 つのパラメーターを受け取ります。メッセージ文字列と azservicebus.Client
オブジェクトです。 その後、新しい azservicebus.Sender
オブジェクトを作成し、メッセージをキューに送信します。 一括メッセージを送信するには、SendMessageBatch
関数を main.go
ファイルに追加します。
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
は 2 つのパラメーターを受け取ります。メッセージのスライスと azservicebus.Client
オブジェクトです。 その後、新しい azservicebus.Sender
オブジェクトを作成し、メッセージをキューに送信します。
キューからメッセージを受信する
キューにメッセージを送信したら、azservicebus.Receiver
型を使ってメッセージを受信できます。 キューからメッセージを受信するには、GetMessage
関数を main.go
ファイルに追加します。
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
は azservicebus.Client
オブジェクトを受け取り、新しい azservicebus.Receiver
オブジェクトを作成します。 その後、キューからメッセージを受信します。
Receiver.ReceiveMessages
関数は 2 つのパラメーターを受け取ります。コンテキストと、受信するメッセージの数です。
Receiver.ReceiveMessages
関数は、azservicebus.ReceivedMessage
オブジェクトのスライスを返します。
次に、for
ループによってメッセージが反復処理され、メッセージ本文が出力されます。 その後、メッセージを完了するために CompleteMessage
関数が呼び出され、キューからメッセージが削除されます。
長さの制限を超えたメッセージ、無効なキューに送信されたメッセージ、または正常に処理されないメッセージは、配信不能キューに送信できます。 配信不能キューにメッセージを送信するには、SendDeadLetterMessage
関数を main.go
ファイルに追加します。
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
は azservicebus.Client
オブジェクトと azservicebus.ReceivedMessage
オブジェクトを受け取ります。 その後、配信不能キューにメッセージを送信します。 この関数は 2 つのパラメーターを受け取ります。コンテキストと azservicebus.DeadLetterOptions
オブジェクトです。 メッセージを配信不能キューに送信できなかった場合、Receiver.DeadLetterMessage
関数はエラーを返します。
配信不能キューからメッセージを受信するには、ReceiveDeadLetterMessage
関数を main.go
ファイルに追加します。
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
は azservicebus.Client
オブジェクトを受け取り、配信不能キューのオプションを含んだ新しい azservicebus.Receiver
オブジェクトを作成します。 その後、配信不能キューからメッセージを受信します。 その後、この関数は配信不能キューから 1 つのメッセージを受信します。 その後、そのメッセージの配信不能の理由と説明を出力します。
サンプル コード
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
コードの実行
コードを実行する前に、AZURE_SERVICEBUS_HOSTNAME
という名前の環境変数を作成します。 環境変数の値を Service Bus 名前空間に設定します。
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
次に、次の go run
コマンドを実行してアプリを実行します。
go run main.go
次のステップ
詳細については、以下のリンクを参照してください。