Compartir vía


Envío y recepción de mensajes con destino y origen en colas de Azure Service Bus (Go)

En este tutorial, se muestra cómo enviar y recibir mensajes de colas de Azure Service Bus mediante el lenguaje de programación Go.

Azure Service Bus es un agente de mensajes empresarial totalmente administrado que incluye colas de mensajes y capacidades de publicación/suscripción. Service Bus se usa para desacoplar aplicaciones y servicios entre sí, proporcionando un transporte de mensajes distribuido, confiable y de alto rendimiento.

El paquete azservicebus de Azure SDK para Go permite enviar y recibir mensajes de Azure Service Bus y usar el lenguaje de programación Go.

Al final de este tutorial, podrá enviar un único mensaje o un lote de mensajes a una cola y recibir mensajes y mensajes fallidos que no se procesan.

Requisitos previos

Crear la aplicación de ejemplo

Para empezar, cree un nuevo módulo de Go.

  1. Cree un directorio para el módulo denominado service-bus-go-how-to-use-queues.

  2. En el directorio azservicebus, inicialice el módulo e instale los paquetes necesarios.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Cree un nuevo archivo llamado main.go.

Autenticación y creación de un cliente

En el archivo main.go, cree una nueva función denominada GetClient y agregue el código siguiente:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

La función GetClient devuelve un nuevo objeto azservicebus.Client que se crea mediante un espacio de nombres y una credencial de Azure Service Bus. La variable de entorno AZURE_SERVICEBUS_HOSTNAME proporciona el espacio de nombres. Y la credencial se crea mediante la función azidentity.NewDefaultAzureCredential.

Para el desarrollo local, DefaultAzureCredential usó el token de acceso de la CLI de Azure, que se puede crear mediante la ejecución del comando az login para autenticarse en Azure.

Sugerencia

Para autenticarse con una cadena de conexión, use la función NewClientFromConnectionString.

mensajes a una cola

En el archivo main.go, cree una nueva función denominada SendMessage y agregue el código siguiente:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage toma dos parámetros: una cadena de mensaje y un objeto azservicebus.Client. A continuación, crea un nuevo objeto azservicebus.Sender y envía el mensaje a la cola. Para enviar mensajes de forma masiva, agregue la función SendMessageBatch al archivo main.go.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch toma dos parámetros: un segmento de mensajes y un objeto azservicebus.Client. A continuación, crea un nuevo objeto azservicebus.Sender y envía los mensajes a la cola.

mensajes de una cola

Después de enviar mensajes a la cola, puede recibirlos con el tipo azservicebus.Receiver. Para recibir mensajes de una cola, agregue la función GetMessage al archivo main.go.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage toma un objeto azservicebus.Client y crea un nuevo objeto azservicebus.Receiver. A continuación, recibe los mensajes de la cola. La función Receiver.ReceiveMessages toma dos parámetros: un contexto y el número de mensajes que se van a recibir. La función Receiver.ReceiveMessages devuelve un segmento de objetos azservicebus.ReceivedMessage.

A continuación, un bucle for recorre en iteración los mensajes e imprime el cuerpo del mensaje. Luego se llama a la función CompleteMessage para completar el mensaje, quitándolo de la cola.

Los mensajes que superan los límites de longitud se envían a una cola no válida, o si no se procesan correctamente, se pueden enviar a la cola de mensajes fallidos. Para enviar mensajes a la cola de mensajes fallidos, agregue la función SendDeadLetterMessage al archivo main.go.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage toma un objeto azservicebus.Client y un objeto azservicebus.ReceivedMessage. A continuación, envía el mensaje a la cola de mensajes fallidos. La función toma dos parámetros: un contexto y un objeto azservicebus.DeadLetterOptions. La función Receiver.DeadLetterMessage devuelve un error si el mensaje no se puede enviar a la cola de mensajes fallidos.

Para recibir mensajes de la cola de mensajes fallidos, agregue la función ReceiveDeadLetterMessage al archivo main.go.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage toma un objeto azservicebus.Client y crea un nuevo objeto azservicebus.Receiver con opciones para la cola de mensajes fallidos. A continuación, recibe los mensajes de la cola de mensajes fallidos. La función recibe luego un mensaje de la cola de mensajes fallidos. A continuación, imprime el motivo y la descripción de los mensajes fallidos para ese mensaje.

Código de ejemplo

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Ejecución del código

Antes de ejecutar el código, cree una variable de entorno llamada AZURE_SERVICEBUS_HOSTNAME. Establezca el valor de la variable de entorno en el espacio de nombres del Service Bus.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

A continuación, ejecute el siguiente comando go run para ejecutar la aplicación:

go run main.go

Pasos siguientes

Para más información, consulte los siguientes vínculos: