Odesílání zpráv do front Azure Service Bus a příjem zpráv z front (Go)
V tomto kurzu se dozvíte, jak odesílat zprávy do front Azure Service Bus a přijímat je z front pomocí programovacího jazyka Go.
Azure Service Bus je plně spravovaný podnikový zprostředkovatel zpráv s frontami zpráv a možnostmi publikování a odběru. Service Bus se používá k oddělení aplikací a služeb od sebe a poskytuje distribuovaný, spolehlivý a vysoce výkonný přenos zpráv.
Balíček azservicebus sady Azure SDK pro Go umožňuje odesílat a přijímat zprávy z Azure Service Bus a pomocí programovacího jazyka Go.
Na konci tohoto kurzu budete umět: odeslat jednu zprávu nebo dávku zpráv do fronty, přijímat zprávy a nedoručované zprávy, které se nezpracují.
Požadavky
- Předplatné Azure. Můžete si aktivovat výhody sady Visual Studio nebo předplatitele MSDN nebo si zaregistrovat bezplatný účet.
- Pokud nemáte frontu, se kterou byste mohli pracovat, postupujte podle kroků v článku Vytvoření fronty služby Service Bus pomocí Azure Portal.
- Go verze 1.18 nebo vyšší
Vytvoření ukázkové aplikace
Začněte vytvořením nového modulu Go.
Vytvořte nový adresář pro modul s názvem
service-bus-go-how-to-use-queues
.V adresáři
azservicebus
inicializujte modul a nainstalujte požadované balíčky.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Vytvořte nový soubor s názvem
main.go
.
Ověření a vytvoření klienta
main.go
V souboru vytvořte novou funkci s názvem GetClient
a přidejte následující kód:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Funkce GetClient
vrátí nový azservicebus.Client
objekt vytvořený pomocí oboru názvů Azure Service Bus a přihlašovacích údajů. Obor názvů je poskytován proměnnou AZURE_SERVICEBUS_HOSTNAME
prostředí. Přihlašovací údaje se vytvoří pomocí azidentity.NewDefaultAzureCredential
funkce .
Pro místní vývoj DefaultAzureCredential
použil přístupový token z Azure CLI, který je možné vytvořit spuštěním az login
příkazu pro ověření v Azure.
Tip
K ověření pomocí připojovacího řetězce použijte funkci NewClientFromConnectionString .
Zasílání zpráv do fronty
main.go
V souboru vytvořte novou funkci s názvem SendMessage
a přidejte následující kód:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
má dva parametry: řetězec zprávy a azservicebus.Client
objekt. Potom vytvoří nový azservicebus.Sender
objekt a odešle zprávu do fronty. Pokud chcete odesílat hromadné zprávy, přidejte SendMessageBatch
do main.go
souboru funkci .
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
má dva parametry: řez zpráv a azservicebus.Client
objekt. Potom vytvoří nový azservicebus.Sender
objekt a odešle zprávy do fronty.
Příjem zpráv z fronty
Po odeslání zpráv do fronty je můžete přijímat pomocí azservicebus.Receiver
typu . Pokud chcete přijímat zprávy z fronty, přidejte GetMessage
do main.go
souboru funkci .
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
azservicebus.Client
vezme objekt a vytvoří nový azservicebus.Receiver
objekt. Pak přijme zprávy z fronty. Funkce Receiver.ReceiveMessages
přijímá dva parametry: kontext a počet zpráv, které se mají přijmout. Funkce Receiver.ReceiveMessages
vrátí řez azservicebus.ReceivedMessage
objektů.
Dále smyčka for
prochází zprávy a vytiskne text zprávy. Potom se CompleteMessage
zavolá funkce, která zprávu dokončí a odebere ji z fronty.
Zprávy, které překračují limity délky, jsou odeslány do neplatné fronty nebo nejsou úspěšně zpracovány, mohou být odeslány do fronty nedoručených zpráv. Pokud chcete odesílat zprávy do fronty nedoručených zpráv, přidejte SendDeadLetterMessage
do main.go
souboru funkci .
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
azservicebus.Client
přebírá objekt a azservicebus.ReceivedMessage
objekt. Potom zprávu odešle do fronty nedoručených zpráv. Funkce přebírá dva parametry: kontext a azservicebus.DeadLetterOptions
objekt. Funkce Receiver.DeadLetterMessage
vrátí chybu, pokud se zpráva nepodaří odeslat do fronty nedoručených zpráv.
Pokud chcete přijímat zprávy z fronty nedoručených zpráv, přidejte ReceiveDeadLetterMessage
do main.go
souboru funkci .
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
azservicebus.Client
vezme objekt a vytvoří nový azservicebus.Receiver
objekt s možnostmi pro frontu nedoručených zpráv. Pak obdrží zprávy z fronty nedoručených zpráv. Funkce pak přijme jednu zprávu z fronty nedoručených zpráv. Pak vypíše důvod nedoručených zpráv a popis této zprávy.
Ukázka kódu
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Spuštění kódu
Před spuštěním kódu vytvořte proměnnou prostředí s názvem AZURE_SERVICEBUS_HOSTNAME
. Nastavte hodnotu proměnné prostředí na obor názvů služby Service Bus.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Dále spusťte aplikaci spuštěním následujícího go run
příkazu:
go run main.go
Další kroky
Další informace najdete na následujících odkazech: