Compartir vía


Inicio rápido: Envío o recepción de eventos en Event Hubs mediante Go

Azure Event Hubs es una plataforma de streaming de macrodatos y servicio de ingesta de eventos de gran escalabilidad capaz de recibir y procesar millones de eventos por segundo. Event Hubs puede procesar y almacenar eventos, datos o telemetría generados por dispositivos y software distribuido. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento. Para más información sobre Event Hubs, consulte Introducción a Event Hubs y Características de Event Hubs.

En este inicio rápido se describe cómo escribir aplicaciones de Go para enviar eventos a un centro de eventos o recibirlos de este.

Nota

Este inicio rápido se basa en ejemplos de GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La sección de envío de eventos se basa en el ejemplo example_producing_events_test.go, mientras que la sección de recepción se basa en el ejemplo example_processor_test.go. El código se ha simplificado para el inicio rápido y se han quitado todos los comentarios detallados, así que examine los ejemplos para obtener más detalles y explicaciones.

Requisitos previos

Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:

  • Go instalado de forma local. Siga estas instrucciones si es necesario.
  • Una cuenta de Azure activa. Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos. Use Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo.

Envío de eventos

En esta sección se muestra cómo crear una aplicación de Go para enviar eventos a un centro de eventos.

Instalación del paquete Go

Obtenga el paquete de Go para Event Hubs como se muestra en el ejemplo siguiente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Código para enviar eventos a un centro de eventos

Este es el código para enviar eventos a un centro de eventos. Los pasos principales del código son:

  1. Crear un cliente productor de Event Hubs mediante una cadena de conexión al espacio de nombres de Event Hubs y el nombre del centro de eventos.
  2. Crear un objeto por lotes y agregar eventos de ejemplo al lote.
  3. Enviar el lote de eventos a los eventos.

Importante

Reemplace NAMESPACE CONNECTION STRING por la cadena de conexión al espacio de nombres de Event Hubs y EVENT HUB NAME por el nombre del centro de eventos del código de ejemplo.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

No ejecute todavía la aplicación. Primero debe ejecutar la aplicación receptora y, luego, la aplicación remitente.

Recepción de eventos

Creación de una cuenta de almacenamiento y un contenedor

Los estados como, por ejemplo, las concesiones sobre particiones y puntos de comprobación de los eventos se comparten entre receptores mediante un contenedor de Azure Storage. Puede crear una cuenta de almacenamiento y un contenedor con el SDK para Go, pero también puede crearlos siguiendo las instrucciones de Acerca de las cuentas de almacenamiento de Azure.

Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:

  • Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
  • No use el contenedor ni la cuenta de almacenamiento para otras actividades.
  • La cuenta de almacenamiento debe estar en la misma región en la que se encuentra la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.

En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.

  • Espacio de nombres jerárquico
  • Eliminación temporal de blobs
  • Control de versiones

Paquetes de Go

Para recibir los mensajes, obtenga los paquetes de Go para Event Hubs, como se muestra en el ejemplo siguiente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Código para recibir eventos de un centro de eventos

Este es el código para recibir eventos de un centro de eventos. Los pasos principales del código son:

  1. Comprobar un objeto de almacén de puntos de control que representa la instancia de Azure Blob Storage que el centro de eventos usa para crear puntos de control.
  2. Crear un cliente consumidor de Event Hubs mediante una cadena de conexión al espacio de nombres de Event Hubs y el nombre del centro de eventos.
  3. Crear un procesador de eventos mediante el objeto cliente y el objeto de almacén de puntos de control. El procesador recibe y procesa eventos.
  4. Para cada partición del centro de eventos, crear un cliente de partición con processEvents como función para procesar eventos.
  5. Ejecutar todos los clientes de partición para recibir y procesar eventos.

Importante

Reemplace los siguientes valores de marcador de posición por valores reales:

  • AZURE STORAGE CONNECTION STRING por la cadena de conexión para la cuenta de almacenamiento de Azure.
  • BLOB CONTAINER NAME por el nombre del contenedor de blobs que creó en la cuenta de almacenamiento.
  • NAMESPACE CONNECTION STRING por la cadena de conexión para el espacio de nombres de Event Hubs.
  • EVENT HUB NAME por el nombre del centro de eventos del código de ejemplo.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Ejecución de las aplicaciones receptora y remitente

  1. Ejecute primero la aplicación receptora.

  2. Ejecute la aplicación remitente.

  3. Espere un minuto para ver la siguiente salida en la ventana receptora.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Pasos siguientes

Consulte ejemplos en GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.