Schnellstart: Senden von Ereignissen an oder Empfangen von Ereignissen aus Event Hubs mithilfe von Go
Azure Event Hubs ist eine Big Data-Streamingplattform und ein Ereigniserfassungsdienst, der pro Sekunde Millionen von Ereignissen empfangen und verarbeiten kann. Event Hubs kann Ereignisse, Daten oder Telemetriedaten, die von verteilter Software und verteilten Geräten erzeugt wurden, verarbeiten und speichern. An einen Event Hub gesendete Daten können transformiert und mit einem beliebigen Echtzeitanalyse-Anbieter oder Batchverarbeitungs-/Speicheradapter gespeichert werden. Eine ausführliche Übersicht über Event Hubs finden Sie unter Was ist Azure Event Hubs? und Event Hubs-Features im Überblick.
In diesem Schnellstart wird beschrieben, wie Sie Go-Anwendungen schreiben, die Ereignisse an einen Event Hub senden oder von diesem empfangen.
Hinweis
Dieser Schnellstart basiert auf Beispielen auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Der Abschnitt Sendeereignisse basiert auf dem Beispiel example_producing_events_test.go, und das Empfangsbeispiel auf dem Beispiel example_processor_test.go. Der Code wurde für die Schnellstartanleitung vereinfacht, und alle ausführlichen Kommentare wurden entfernt. Sehen Sie sich also die Beispiele an, um weitere Details und Erläuterungen zu erhalten.
Voraussetzungen
Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:
- Eine lokale Go-Installation. Befolgen Sie bei Bedarf diese Anweisungen.
- Ein aktives Azure-Konto. Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
- Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie das Azure-Portal, um einen Namespace des Typs „Event Hubs“ zu erstellen, und rufen Sie die Verwaltungsanmeldeinformationen ab, die Ihre Anwendung für die Kommunikation mit dem Event Hub benötigt. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub.
Senden von Ereignissen
In diesem Abschnitt erfahren Sie, wie Sie eine Go-Anwendung zum Senden von Ereignissen an einen Event Hub erstellen.
Installieren des Go-Pakets
Rufen Sie das Go-Paket für Event Hubs ab, wie im folgenden Beispiel gezeigt.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Code zum Senden von Ereignissen an einen Event Hub
Der folgende Code dient zum Senden von Ereignissen an einen Event Hub. Die wichtigsten Schritte im Code sind:
- Erstellen eines Event Hubs-Producerclients mithilfe einer Verbindungszeichenfolge zum Event Hubs-Namespace und dem Event Hub-Namen.
- Erstellen eines Batchobjekts und Hinzufügen von Beispielereignissen zum Batch.
- Senden des Ereignisbatchs an die Ereignisse.
Wichtig
Ersetzen Sie NAMESPACE CONNECTION STRING
durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace und EVENT HUB NAME
durch den Event Hub-Namen im Beispielcode.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Führen Sie die Anwendung noch nicht aus. Sie müssen zuerst die Empfänger-App und dann die Sender-App ausführen.
Empfangen von Ereignissen
Erstellen eines Storage-Kontos und -Containers
Angaben zum Status, z.B. Leases auf Partitionen und Prüfpunkte in den Ereignissen, werden über einen Azure Storage-Container zwischen den Empfängern freigegeben. Sie können ein Speicherkonto und einen Container erstellen, indem Sie das Go SDK verwenden oder die Anweisungen unter Informationen zu Azure-Speicherkonten befolgen.
Befolgen Sie die folgenden Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:
- Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
- Verwenden Sie weder den Container noch das Speicherkonto für andere Zwecke.
- Das Speicherkonto sollte sich in derselben Region befinden, in der sich die bereitgestellte Anwendung befindet. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.
Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.
- Hierarchischer Namespace
- Vorläufiges Löschen von Blobs
- Versionsverwaltung
Go-Pakete
Rufen Sie zum Empfangen der Nachrichten die Go-Pakete für Event Hubs ab, wie im folgenden Beispiel gezeigt.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Code zum Empfangen von Ereignissen von einem Event Hub
Der folgende Code dient zum Empfangen von Ereignissen von einem Event Hub. Die wichtigsten Schritte im Code sind:
- Überprüfen Sie ein Prüfpunktspeicherobjekt, das die Azure Blob Storage-Instanz darstellt, die vom Event Hub für Prüfpunkterstellung verwendet wird.
- Erstellen eines Event Hubs-Consumerclients mithilfe einer Verbindungszeichenfolge zum Event Hubs-Namespace und dem Event Hub-Namen.
- Erstellen Sie mithilfe des Clientobjekts und des Prüfpunktspeicherobjekts einen Ereignisprozessor. Der Prozessor empfängt und verarbeitet Ereignisse.
- Erstellen Sie für jede Partition im Event Hub einen Partitionsclient mit processEvents als Funktion zum Verarbeiten von Ereignissen.
- Führen Sie alle Partitionsclients aus, um Ereignisse zu empfangen und zu verarbeiten.
Wichtig
Ersetzen Sie die folgenden Platzhalterwerte durch die tatsächlichen Werte:
AZURE STORAGE CONNECTION STRING
durch die Verbindungszeichenfolge für Ihr Azure-Speicherkonto.BLOB CONTAINER NAME
durch den Namen des Blobcontainers, den Sie im Speicherkonto erstellt haben.NAMESPACE CONNECTION STRING
durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace.EVENT HUB NAME
durch den Event Hub-Namen im Beispielcode.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Ausführen von Empfänger- und Sender-Apps
Führen Sie den Empfänger zuerst aus.
Führen Sie die Sender-App aus.
Warten Sie eine Minute, bis die folgende Ausgabe im Empfängerfenster angezeigt wird.
Processing 2 event(s) Event received with body hello Event received with body world
Nächste Schritte
Weitere Informationen finden Sie in den Beispielen auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.