Wysyłanie komunikatów do i odbieranie komunikatów z kolejek Azure Service Bus (Go)
Z tego samouczka dowiesz się, jak wysyłać komunikaty do kolejek Azure Service Bus i odbierać je przy użyciu języka programowania Go.
Azure Service Bus jest w pełni zarządzanym brokerem komunikatów przedsiębiorstwa z kolejkami komunikatów i możliwościami publikowania/subskrybowania. Usługa Service Bus służy do oddzielenia od siebie aplikacji i usług, zapewniając rozproszony, niezawodny i wydajny transport komunikatów.
Pakiet azservicebus zestawu Azure SDK dla języka Go umożliwia wysyłanie i odbieranie komunikatów z Azure Service Bus oraz używanie języka programowania Go.
Po ukończeniu tego samouczka będziesz mieć możliwość wysyłania pojedynczego komunikatu lub partii komunikatów do kolejki, odbierania komunikatów i komunikatów utraconych, które nie są przetwarzane.
Wymagania wstępne
- Subskrypcja platformy Azure. Możesz aktywować korzyści dla subskrybentów programu Visual Studio lub MSDN lub utworzyć konto bezpłatne.
- Jeśli nie masz kolejki do pracy, wykonaj kroki opisane w artykule Używanie Azure Portal do tworzenia kolejki usługi Service Bus w celu utworzenia kolejki.
- Przejdź do wersji 1.18 lub nowszej
Tworzenie przykładowej aplikacji
Aby rozpocząć, utwórz nowy moduł Języka Go.
Utwórz nowy katalog dla modułu o nazwie
service-bus-go-how-to-use-queues
.W katalogu zainicjuj
azservicebus
moduł i zainstaluj wymagane pakiety.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Utwórz nowy plik o nazwie
main.go
.
Uwierzytelnianie i tworzenie klienta
main.go
W pliku utwórz nową funkcję o nazwie GetClient
i dodaj następujący kod:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Funkcja GetClient
zwraca nowy azservicebus.Client
obiekt utworzony przy użyciu Azure Service Bus przestrzeni nazw i poświadczeń. Przestrzeń nazw jest dostarczana przez zmienną środowiskową AZURE_SERVICEBUS_HOSTNAME
. A poświadczenie jest tworzone przy użyciu azidentity.NewDefaultAzureCredential
funkcji .
W przypadku programowania lokalnego DefaultAzureCredential
używany token dostępu z poziomu interfejsu wiersza polecenia platformy Azure, który można utworzyć, uruchamiając az login
polecenie w celu uwierzytelnienia na platformie Azure.
Porada
Aby uwierzytelnić się za pomocą parametrów połączenia, użyj funkcji NewClientFromConnectionString .
Wysyłanie komunikatów do kolejki
main.go
W pliku utwórz nową funkcję o nazwie SendMessage
i dodaj następujący kod:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
przyjmuje dwa parametry: ciąg komunikatu azservicebus.Client
i obiekt. Następnie tworzy nowy azservicebus.Sender
obiekt i wysyła komunikat do kolejki. Aby wysyłać komunikaty zbiorcze, dodaj SendMessageBatch
funkcję do main.go
pliku.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
przyjmuje dwa parametry: fragment komunikatów i azservicebus.Client
obiekt. Następnie tworzy nowy azservicebus.Sender
obiekt i wysyła komunikaty do kolejki.
Odbieranie komunikatów z kolejki
Po wysłaniu komunikatów do kolejki można je odbierać przy użyciu azservicebus.Receiver
typu . Aby odbierać komunikaty z kolejki, dodaj GetMessage
funkcję do main.go
pliku.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
azservicebus.Client
pobiera obiekt i tworzy nowy azservicebus.Receiver
obiekt. Następnie odbiera komunikaty z kolejki. Funkcja Receiver.ReceiveMessages
przyjmuje dwa parametry: kontekst i liczbę komunikatów do odebrania. Funkcja Receiver.ReceiveMessages
zwraca fragment azservicebus.ReceivedMessage
obiektów.
Następnie pętla for
iteruje komunikaty i drukuje treść komunikatu. Następnie funkcja jest wywoływana CompleteMessage
w celu ukończenia komunikatu, usuwając ją z kolejki.
Komunikaty, które przekraczają limity długości, są wysyłane do nieprawidłowej kolejki lub nie są pomyślnie przetwarzane, można wysłać do kolejki utraconych wiadomości. Aby wysłać komunikaty do kolejki utraconych wiadomości, dodaj SendDeadLetterMessage
funkcję do main.go
pliku.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
azservicebus.Client
przyjmuje obiekt i azservicebus.ReceivedMessage
obiekt. Następnie wysyła wiadomość do kolejki utraconych wiadomości. Funkcja przyjmuje dwa parametry: kontekst i azservicebus.DeadLetterOptions
obiekt. Funkcja Receiver.DeadLetterMessage
zwraca błąd, jeśli komunikat nie zostanie wysłany do kolejki utraconych wiadomości.
Aby odbierać komunikaty z kolejki utraconych wiadomości, dodaj ReceiveDeadLetterMessage
funkcję do main.go
pliku.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
azservicebus.Client
pobiera obiekt i tworzy nowy azservicebus.Receiver
obiekt z opcjami kolejki utraconych wiadomości. Następnie odbiera komunikaty z kolejki utraconych wiadomości. Następnie funkcja odbiera jeden komunikat z kolejki utraconych wiadomości. Następnie drukuje przyczynę martwych listów i opis dla tej wiadomości.
Przykładowy kod
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Uruchamianie kodu
Przed uruchomieniem kodu utwórz zmienną środowiskową o nazwie AZURE_SERVICEBUS_HOSTNAME
. Ustaw wartość zmiennej środowiskowej na przestrzeń nazw usługi Service Bus.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Następnie uruchom następujące go run
polecenie, aby uruchomić aplikację:
go run main.go
Następne kroki
Aby uzyskać więcej informacji, zapoznaj się z następującymi linkami: