Skicka meddelanden till och ta emot meddelanden från Azure Service Bus köer (Go)
I den här självstudien får du lära dig hur du skickar meddelanden till och tar emot meddelanden från Azure Service Bus köer med programmeringsspråket Go.
Azure Service Bus är en fullständigt hanterad meddelandekö för företag med meddelandeköer och funktioner för publicering/prenumeration. Service Bus används för att frikoppla program och tjänster från varandra, vilket ger en distribuerad, tillförlitlig och högpresterande meddelandetransport.
Med Azure SDK för Go:s azservicebus-paket kan du skicka och ta emot meddelanden från Azure Service Bus och använda programmeringsspråket Go.
I slutet av den här självstudien kan du: skicka ett enda meddelande eller en batch med meddelanden till en kö, ta emot meddelanden och meddelanden med obeställbara meddelanden som inte bearbetas.
Förutsättningar
- En Azure-prenumeration. Du kan aktivera dina Visual Studio- eller MSDN-prenumerantförmåner eller registrera dig för ett kostnadsfritt konto.
- Om du inte har någon kö att arbeta med följer du stegen i artikeln Använd Azure Portal för att skapa en Service Bus-kö för att skapa en kö.
- Go version 1.18 eller senare
Skapa exempelappen
Börja med att skapa en ny Go-modul.
Skapa en ny katalog för modulen med namnet
service-bus-go-how-to-use-queues
.Initiera modulen
azservicebus
i katalogen och installera de paket som krävs.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Skapa en ny fil med namnet
main.go
.
Autentisera och skapa en klient
main.go
I filen skapar du en ny funktion med namnet GetClient
och lägger till följande kod:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Funktionen GetClient
returnerar ett nytt azservicebus.Client
objekt som skapas med hjälp av ett Azure Service Bus namnområde och en autentiseringsuppgift. Namnområdet tillhandahålls av AZURE_SERVICEBUS_HOSTNAME
miljövariabeln. Och autentiseringsuppgifterna skapas med hjälp azidentity.NewDefaultAzureCredential
av funktionen .
För lokal utveckling DefaultAzureCredential
används åtkomsttoken från Azure CLI, som kan skapas genom att köra kommandot för att autentisera az login
till Azure.
Tips
Om du vill autentisera med en anslutningssträng använder du funktionen NewClientFromConnectionString .
Skicka meddelanden till en kö
main.go
I filen skapar du en ny funktion med namnet SendMessage
och lägger till följande kod:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
tar två parametrar: en meddelandesträng och ett azservicebus.Client
objekt. Sedan skapas ett nytt azservicebus.Sender
objekt och meddelandet skickas till kön. Om du vill skicka massmeddelanden lägger du till funktionen main.go
i SendMessageBatch
filen.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
tar två parametrar: en del av meddelanden och ett azservicebus.Client
objekt. Sedan skapas ett nytt azservicebus.Sender
objekt och meddelanden skickas till kön.
Ta emot meddelanden från en kö
När du har skickat meddelanden till kön kan du ta emot dem med typen azservicebus.Receiver
. Om du vill ta emot meddelanden från en kö lägger du till funktionen main.go
i GetMessage
filen.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
tar ett azservicebus.Client
objekt och skapar ett nytt azservicebus.Receiver
objekt. Den tar sedan emot meddelandena från kön. Funktionen Receiver.ReceiveMessages
tar två parametrar: en kontext och antalet meddelanden som ska ta emot. Funktionen Receiver.ReceiveMessages
returnerar en sektor med azservicebus.ReceivedMessage
objekt.
Därefter itererar en for
loop genom meddelandena och skriver ut meddelandetexten. Sedan anropas CompleteMessage
funktionen för att slutföra meddelandet och ta bort det från kön.
Meddelanden som överskrider längdgränserna, skickas till en ogiltig kö eller inte bearbetas kan skickas till kön med obeställbara meddelanden. Om du vill skicka meddelanden till kön med obeställbara meddelanden lägger du till funktionen main.go
i SendDeadLetterMessage
filen.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
tar ett azservicebus.Client
objekt och ett azservicebus.ReceivedMessage
objekt. Meddelandet skickas sedan till kön med obeställbara meddelanden. Funktionen tar två parametrar: en kontext och ett azservicebus.DeadLetterOptions
objekt. Funktionen Receiver.DeadLetterMessage
returnerar ett fel om meddelandet inte kan skickas till kön med obeställbara meddelanden.
Om du vill ta emot meddelanden från kön med obeställbara meddelanden lägger du till funktionen main.go
i ReceiveDeadLetterMessage
filen.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
tar ett azservicebus.Client
objekt och skapar ett nytt azservicebus.Receiver
objekt med alternativ för kön med obeställbara meddelanden. Den tar sedan emot meddelandena från kön med obeställbara meddelanden. Funktionen tar sedan emot ett meddelande från kön med obeställbara meddelanden. Sedan skriver den ut orsaken till och beskrivningen av det meddelandet.
Exempelkod
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Kör koden
Innan du kör koden skapar du en miljövariabel med namnet AZURE_SERVICEBUS_HOSTNAME
. Ange miljövariabelns värde till Service Bus-namnområdet.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Kör sedan följande go run
kommando för att köra appen:
go run main.go
Nästa steg
Mer information finns på följande länkar: