Enviar e receber mensagens de filas de Azure Service Bus (Go)
Neste tutorial, irá aprender a enviar mensagens e a receber mensagens de filas Azure Service Bus com a linguagem de programação Go.
Azure Service Bus é um mediador de mensagens empresariais totalmente gerido com filas de mensagens e capacidades de publicação/subscrição. O Service Bus é utilizado para desassociar aplicações e serviços entre si, fornecendo um transporte de mensagens distribuído, fiável e de elevado desempenho.
O pacote azservicebus do SDK do Azure para Go permite-lhe enviar e receber mensagens de Azure Service Bus e utilizar a linguagem de programação Go.
No final deste tutorial, poderá: enviar uma única mensagem ou lote de mensagens para uma fila, receber mensagens e mensagens não entregues que não são processadas.
Pré-requisitos
- Uma subscrição do Azure. Pode ativar os benefícios de subscritor do Visual Studio ou MSDN ou inscrever-se numa conta gratuita.
- Se não tiver uma fila para trabalhar, siga os passos no artigo Utilizar portal do Azure para criar uma fila do Service Bus para criar uma fila.
- Go versão 1.18 ou superior
Criar a aplicação de exemplo
Para começar, crie um novo módulo Go.
Crie um novo diretório para o módulo com o nome
service-bus-go-how-to-use-queues
.azservicebus
No diretório, inicialize o módulo e instale os pacotes necessários.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Crie um novo ficheiro com o nome
main.go
.
Autenticar e criar um cliente
No ficheiro, crie uma nova função com o main.go
nome GetClient
e adicione o seguinte código:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
A GetClient
função devolve um novo azservicebus.Client
objeto criado com um espaço de nomes Azure Service Bus e uma credencial. O espaço de nomes é fornecido pela variável de AZURE_SERVICEBUS_HOSTNAME
ambiente. E a credencial é criada com a azidentity.NewDefaultAzureCredential
função .
Para o desenvolvimento local, utilizou o DefaultAzureCredential
token de acesso da CLI do Azure, que pode ser criado ao executar o az login
comando para autenticar no Azure.
Dica
Para autenticar com uma cadeia de ligação, utilize a função NewClientFromConnectionString .
Enviar mensagens para uma fila
No ficheiro, crie uma nova função com o main.go
nome SendMessage
e adicione o seguinte código:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
utiliza dois parâmetros: uma cadeia de mensagem e um azservicebus.Client
objeto. Em seguida, cria um novo azservicebus.Sender
objeto e envia a mensagem para a fila. Para enviar mensagens em massa, adicione a SendMessageBatch
função ao seu main.go
ficheiro.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
utiliza dois parâmetros: um setor de mensagens e um azservicebus.Client
objeto. Em seguida, cria um novo azservicebus.Sender
objeto e envia as mensagens para a fila.
Receber mensagens de uma fila
Depois de enviar mensagens para a fila, pode recebê-las com o azservicebus.Receiver
tipo . Para receber mensagens de uma fila, adicione a GetMessage
função ao seu main.go
ficheiro.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
utiliza um azservicebus.Client
objeto e cria um novo azservicebus.Receiver
objeto. Em seguida, recebe as mensagens da fila. A Receiver.ReceiveMessages
função utiliza dois parâmetros: um contexto e o número de mensagens a receber. A Receiver.ReceiveMessages
função devolve um setor de azservicebus.ReceivedMessage
objetos.
Em seguida, um for
ciclo itera através das mensagens e imprime o corpo da mensagem. Em seguida, a CompleteMessage
função é chamada para concluir a mensagem, removendo-a da fila.
As mensagens que excedem os limites de comprimento, são enviadas para uma fila inválida ou não são processadas com êxito podem ser enviadas para a fila de mensagens não entregues. Para enviar mensagens para a fila de mensagens não entregues, adicione a SendDeadLetterMessage
função ao seu main.go
ficheiro.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
utiliza um azservicebus.Client
objeto e um azservicebus.ReceivedMessage
objeto. Em seguida, envia a mensagem para a fila de mensagens não entregues. A função utiliza dois parâmetros: um contexto e um azservicebus.DeadLetterOptions
objeto. A Receiver.DeadLetterMessage
função devolve um erro se a mensagem não for enviada para a fila de mensagens não entregues.
Para receber mensagens da fila de mensagens não entregues, adicione a ReceiveDeadLetterMessage
função ao seu main.go
ficheiro.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
utiliza um azservicebus.Client
objeto e cria um novo azservicebus.Receiver
objeto com opções para a fila de mensagens não entregues. Em seguida, recebe as mensagens da fila de mensagens não entregues. Em seguida, a função recebe uma mensagem da fila de mensagens não entregues. Em seguida, imprime o motivo da carta não entregue e a descrição dessa mensagem.
Código de exemplo
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Executar o código
Antes de executar o código, crie uma variável de ambiente com o nome AZURE_SERVICEBUS_HOSTNAME
. Defina o valor da variável de ambiente como o espaço de nomes do Service Bus.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Em seguida, execute o seguinte go run
comando para executar a aplicação:
go run main.go
Passos seguintes
Para obter mais informações, consulte as seguintes ligações: