Rychlý start: Odesílání událostí nebo příjem událostí ze služby Event Hubs pomocí Go
Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro ingestování událostí, která je schopná přijmout a zpracovat miliony událostí za sekundu. Služba Event Hubs dokáže zpracovávat a ukládat události, data nebo telemetrické údaje produkované distribuovaným softwarem a zařízeními. Data odeslaná do centra událostí je možné transformovat a uložit pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů pro dávkové zpracování a ukládání. Podrobnější přehled služby Event Hubs najdete v tématech Přehled služby Event Hubs a Funkce služby Event Hubs.
Tento rychlý start popisuje, jak psát aplikace Go pro odesílání událostí do centra událostí nebo příjem událostí z centra událostí.
Poznámka:
Tento rychlý start vychází z ukázek na GitHubu na webu https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Oddíl události odeslání je založený na ukázce example_producing_events_test.go a příjem vychází z ukázky example_processor_test.go . Kód je pro rychlý start zjednodušený a odeberou se všechny podrobné komentáře, takže se podívejte na ukázky, kde najdete další podrobnosti a vysvětlení.
Požadavky
K dokončení tohoto rychlého startu potřebujete následující požadavky:
- Go nainstalovaný místně. V případě potřeby postupujte podle těchto pokynů .
- Aktivní účet Azure Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
- Vytvořte obor názvů služby Event Hubs a centrum událostí. Pomocí webu Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.
Odesílání událostí
V této části se dozvíte, jak vytvořit aplikaci Go pro odesílání událostí do centra událostí.
Instalace balíčku Go
Získejte balíček Go pro službu Event Hubs, jak je znázorněno v následujícím příkladu.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kód pro odesílání událostí do centra událostí
Tady je kód pro odesílání událostí do centra událostí. Hlavní kroky v kódu jsou:
- Vytvořte klienta producenta služby Event Hubs pomocí připojovací řetězec do oboru názvů služby Event Hubs a názvu centra událostí.
- Vytvořte dávkový objekt a přidejte do dávky ukázkové události.
- Odešle dávku událostí do těchto událostí.
Důležité
Nahraďte NAMESPACE CONNECTION STRING
připojovací řetězec do oboru názvů služby Event Hubs a EVENT HUB NAME
názvem centra událostí v ukázkovém kódu.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Aplikaci zatím nespousíte. Nejdřív musíte spustit aplikaci příjemce a pak aplikaci odesílatele.
Příjem událostí
Vytvoření účtu úložiště a kontejneru
Stav, například zapůjčení oddílů a kontrolních bodů v událostech, se sdílí mezi příjemci pomocí kontejneru Azure Storage. Pomocí sady Go SDK můžete vytvořit účet úložiště a kontejner, ale můžete ho vytvořit také podle pokynů v části O účtech úložiště Azure.
Při používání služby Azure Blob Storage jako úložiště kontrolních bodů postupujte podle těchto doporučení:
- Pro každou skupinu příjemců použijte samostatný kontejner. Můžete použít stejný účet úložiště, ale pro každou skupinu použít jeden kontejner.
- Nepoužívejte kontejner pro nic jiného a nepoužívejte účet úložiště pro nic jiného.
- Účet úložiště by měl být ve stejné oblasti jako nasazená aplikace. Pokud je aplikace místní, zkuste zvolit nejbližší možnou oblast.
Na stránce účtu úložiště na webu Azure Portal v části Blob Service se ujistěte, že jsou zakázaná následující nastavení.
- Hierarchický obor názvů
- Obnovitelné odstranění objektu blob
- Vytváření verzí
Balíčky Go
Pokud chcete přijímat zprávy, získejte balíčky Go pro službu Event Hubs, jak je znázorněno v následujícím příkladu.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kód pro příjem událostí z centra událostí
Tady je kód pro příjem událostí z centra událostí. Hlavní kroky v kódu jsou:
- Zkontrolujte objekt úložiště kontrolních bodů, který představuje službu Azure Blob Storage používanou centrem událostí pro vytváření kontrolních bodů.
- Vytvořte klienta příjemce služby Event Hubs pomocí připojovací řetězec do oboru názvů služby Event Hubs a názvu centra událostí.
- Vytvořte procesor událostí pomocí objektu klienta a objektu úložiště kontrolních bodů. Procesor přijímá a zpracovává události.
- Pro každý oddíl v centru událostí vytvořte klienta oddílu s processEvents jako funkcí pro zpracování událostí.
- Spuštěním všech klientů oddílů můžete přijímat a zpracovávat události.
Důležité
Nahraďte následující zástupné hodnoty skutečnými hodnotami:
AZURE STORAGE CONNECTION STRING
s připojovací řetězec pro váš účet úložiště AzureBLOB CONTAINER NAME
s názvem kontejneru objektů blob, který jste vytvořili v účtu úložištěNAMESPACE CONNECTION STRING
s připojovací řetězec pro váš obor názvů služby Event HubsEVENT HUB NAME
název centra událostí v ukázkovém kódu.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Spuštění aplikací pro příjemce a odesílatele
Nejprve spusťte aplikaci příjemce.
Spusťte aplikaci odesílatele.
Počkejte minutu, než se v okně příjemce zobrazí následující výstup.
Processing 2 event(s) Event received with body hello Event received with body world
Další kroky
Podívejte se na ukázky na GitHubu na adrese https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.