Condividi tramite


Hub eventi di Azure libreria client per JavaScript - versione 5.12.0

Hub eventi di Azure è un servizio pubblica-sottoscrizione altamente scalabile che può inserire milioni di eventi al secondo e trasmetterli a più consumer. In questo modo è possibile elaborare e analizzare le grandi quantità di dati prodotti dai dispositivi e dalle applicazioni connesse. Per altre informazioni su Hub eventi di Azure, è possibile esaminare: Che cos'è Hub eventi?

La libreria client Hub eventi di Azure consente di inviare e ricevere eventi nell'applicazione Node.js.

Collegamenti principali:

NOTA: se si usa la versione 2.1.0 o versione inferiore e si vuole eseguire la migrazione alla versione più recente di questo pacchetto, vedere la guida alla migrazione per passare da EventHubs V2 a EventHubs V5

Gli esempi per v2 e la documentazione sono ancora disponibili qui:

Codice sorgente per v2.1.0 | Pacchetto per v2.1.0 (npm) | Esempi per v2.1.0

Introduzione

Installare il pacchetto

Installare la libreria client di Hub eventi di Azure tramite npm

npm install @azure/event-hubs

Ambienti attualmente supportati

Per altre informazioni, vedere i criteri di supporto.

Prerequisiti

Configurare TypeScript

Gli utenti TypeScript devono avere installate definizioni di tipo Node:

npm install @types/node

È anche necessario abilitare compilerOptions.allowSyntheticDefaultImports nel tsconfig.json. Si noti che se è stato abilitato , allowSyntheticDefaultImports è abilitato compilerOptions.esModuleInteropper impostazione predefinita. Per altre informazioni, vedere Il manuale delle opzioni del compilatore di TypeScript .

JavaScript Bundle

Per usare questa libreria client nel browser, è prima necessario usare un bundler. Per informazioni dettagliate su come eseguire questa operazione, vedere la documentazione di raggruppamento.

Oltre a quanto descritto in questa libreria, questa libreria necessita anche di polifill aggiuntivi per i moduli predefiniti NodeJS seguenti per funzionare correttamente nei browser:

  • buffer
  • os
  • path
  • process

Raggruppamento con Webpack

Se si usa Webpack v5, è possibile installare le dipendenze di sviluppo seguenti

  • npm install --save-dev os-browserify path-browserify

aggiungere quindi quanto segue nel webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Raggruppamento con rollup

Se si usa rollup bundler, installare le dipendenze di sviluppo seguenti

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Includere quindi quanto segue nel rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Per altre informazioni sull'uso di polifill, consultare la documentazione del bundler preferito.

supporto React Native

Analogamente ai browser, React Native non supporta alcune API JavaScript usate da questa libreria SDK, quindi è necessario fornire polifill per loro. Per altre informazioni, vedere l'esempio di Messaggistica React Native con Expo.

Autenticare il client

L'interazione con Hub eventi inizia con un'istanza della classe EventHubConsumerClient o un'istanza della classe EventHubProducerClient. Esistono overload del costruttore per supportare diversi modi per creare istanze di queste classi, come illustrato di seguito:

Usare stringa di connessione per lo spazio dei nomi hub eventi

Uno degli overload del costruttore accetta un stringa di connessione del nome del modulo Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; e dell'entità nell'istanza di Hub eventi. È possibile creare un gruppo di consumer e ottenere il stringa di connessione e il nome dell'entità dal portale di Azure.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Usare stringa di connessione per i criteri nell'hub eventi

Un altro overload del costruttore accetta il stringa di connessione corrispondente ai criteri di accesso condiviso definiti direttamente nell'istanza di Hub eventi (e non nello spazio dei nomi Hub eventi). Questo stringa di connessione sarà del modulo Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. La differenza chiave nel formato stringa di connessione dell'overload del costruttore precedente è .;EntityPath=my-event-hub-name

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Usare lo spazio dei nomi di Hub eventi e l'identità di Azure

Questo overload del costruttore accetta il nome host e il nome dell'entità dell'istanza di Hub eventi e le credenziali che implementano l'interfaccia TokenCredential. In questo modo è possibile eseguire l'autenticazione usando un'entità Azure Active Directory. Esistono implementazioni dell'interfaccia TokenCredential disponibili nel pacchetto @azure/identity . Il nome host è del formato <yournamespace>.servicebus.windows.net. Quando si usa Azure Active Directory, l'entità deve essere assegnata a un ruolo che consente l'accesso a Hub eventi, ad esempio il ruolo Hub eventi di Azure proprietario dei dati. Per altre informazioni sull'uso dell'autorizzazione di Azure Active Directory con Hub eventi, vedere la documentazione associata.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Concetti chiave

  • Un produttore di Hub eventi è un'origine di dati di telemetria, informazioni di diagnostica, log di utilizzo o altri dati di log, come parte di una soluzione di dispositivo incorporata, un'applicazione per dispositivi mobili, un titolo di gioco in esecuzione in una console o in un altro dispositivo, una soluzione aziendale basata su client o server o un sito Web.

  • Un consumer dell'hub eventi raccoglie tali informazioni dall'hub eventi e lo elabora. L'elaborazione può comportare aggregazioni, calcoli complessi e applicazione di filtri. L'elaborazione può anche comportare la distribuzione o l'archiviazione delle informazioni in modo non elaborato o trasformato. I consumer di Hub eventi sono spesso parti di infrastruttura di piattaforma affidabili e su larga scala con funzionalità di analisi predefinite, ad esempio Analisi di flusso di Azure, Apache Spark o Apache Storm.

  • Una partizione è una sequenza ordinata di eventi che viene mantenuta in un hub eventi. Le partizioni sono un mezzo di organizzazione dei dati associato al parallelismo richiesto dai consumer di eventi. Hub eventi di Azure fornisce il flusso dei messaggi tramite un modello di consumer partizionato in cui ogni consumer legge solo un subset specifico, o partizione, del flusso di messaggi. Man mano che arrivano, i nuovi eventi vengono aggiunti alla fine di questa sequenza. Il numero di partizioni viene specificato al momento della creazione di un hub eventi e non può essere modificato.

  • Un gruppo di consumer è una visualizzazione di un intero hub eventi. I gruppi di consumer consentono a più applicazioni consumer di avere ognuna una vista separata del flusso di eventi e di leggere il flusso in modo indipendente, in base al proprio ritmo e dalla propria posizione. In una partizione possono esistere al massimo cinque lettori simultanei per gruppo di consumer. È tuttavia consigliabile che sia presente un solo consumer attivo in una specifica partizione e associazione di gruppi di consumer. Ogni lettore attivo riceve tutti gli eventi dalla partizione; Se nella stessa partizione sono presenti più lettori, riceveranno eventi duplicati.

Per altri concetti e discussioni più approfondite, vedere: Funzionalità di Hub eventi

Indicazioni sui tentativi

L'oggetto EventHubConsumerClient e EventHubProducerClient accettare options dove è possibile impostare l'oggetto retryOptions che consente di ottimizzare il modo in cui l'SDK gestisce gli errori temporanei. Esempi di errori temporanei includono problemi di rete o servizio temporanei.

Tentativi durante l'utilizzo di eventi

Se si verifica un errore temporaneo (ad esempio un problema di rete temporaneo) mentre l'SDK riceve eventi, riprova a ricevere eventi in base alle opzioni di ripetizione dei tentativi passate all'oggetto EventHubConsumerClient. Se vengono esauriti i tentativi massimi, verrà richiamata la processError funzione.

È possibile usare le impostazioni di ripetizione dei tentativi per controllare la velocità con cui vengono informati sui problemi temporanei, ad esempio un problema di connessione di rete. Ad esempio, se è necessario sapere quando si verifica un problema di rete immediatamente, è possibile ridurre i valori per maxRetries e retryDelayInMs.

Dopo aver eseguito la processError funzione, il client continua a ricevere eventi dalla partizione finché l'errore è stato riprovabile. In caso contrario, il client richiama la funzione fornita dall'utente processClose . Questa funzione viene richiamata anche quando si arresta la sottoscrizione o quando il client smette di leggere gli eventi dalla partizione corrente a causa della raccolta da un'altra istanza dell'applicazione come parte del bilanciamento del carico.

La funzione offre l'opportunità processClose di aggiornare i checkpoint se necessario. Dopo aver eseguito processClose, il client (o nel caso del bilanciamento del carico, un client da un'altra istanza dell'applicazione) richiama la funzione fornita processInitialize dall'utente per riprendere gli eventi di lettura dall'ultimo checkpoint aggiornato per la stessa partizione.

Se si desidera interrompere il tentativo di lettura degli eventi, è necessario chiamare close() il subscription metodo restituito dal subscribe metodo .

Esempio

Le sezioni seguenti forniscono frammenti di codice che coprono alcune delle attività comuni usando Hub eventi di Azure

Ispezionare un hub eventi

Molte operazioni dell'hub eventi vengono eseguite nell'ambito di una partizione specifica. Poiché le partizioni sono di proprietà dell'hub eventi, i nomi vengono assegnati al momento della creazione. Per comprendere quali partizioni sono disponibili, eseguire una query su Hub eventi usando uno dei due client disponibili: EventHubProducerClient o EventHubConsumerClient

Nell'esempio seguente viene usato un EventHubProducerClientoggetto .

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Pubblicare eventi in un hub eventi

Per pubblicare eventi, è necessario creare un oggetto EventHubProducerClient. Mentre nell'esempio seguente viene illustrato un modo per creare il client, vedere la sezione Autenticare il client per apprendere altri modi per creare un'istanza del client.

È possibile pubblicare eventi in una partizione specifica o consentire al servizio Hub eventi di decidere quali eventi di partizione devono essere pubblicati. È consigliabile usare il routing automatico quando la pubblicazione di eventi deve essere a disponibilità elevata o quando i dati degli eventi devono essere distribuiti uniformemente tra le partizioni. Nell'esempio seguente si sfrutta il routing automatico.

  • Create un EventDataBatch oggetto usando createBatch
  • Aggiungere eventi al batch usando il metodo tryAdd . È possibile eseguire questa operazione fino al raggiungimento del limite massimo di dimensioni del batch o fino a quando non si aggiunge il numero di eventi che si è piaciuti. Questo metodo restituirà false per indicare che non è possibile aggiungere più eventi al batch a causa della dimensione massima del batch raggiunta.
  • Inviare il batch di eventi usando il metodo sendBatch .

Nell'esempio seguente si tenta di inviare 10 eventi a Hub eventi di Azure.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Esistono opzioni che è possibile passare in fasi diverse per controllare il processo di invio di eventi a Hub eventi di Azure.

  • Il EventHubProducerClient costruttore accetta un parametro facoltativo di tipo EventHubClientOptions che è possibile usare per specificare opzioni come il numero di tentativi.
  • Il createBatch metodo accetta un parametro facoltativo di tipo CreateBatchOptions che è possibile usare per speicificare le dimensioni massime del batch supportate dal batch in fase di creazione.
  • Il sendBatch metodo accetta un parametro facoltativo di tipo SendBatchOptions che è possibile usare per specificare abortSignal per annullare l'operazione corrente.
  • Se si vuole inviare a una partizione specifica, un overload del sendBatch metodo consente di passare l'ID della partizione a cui inviare eventi. L'esempio Di controllo di un hub eventi precedente illustra come recuperare gli ID partizioni disponibili.

Nota: quando si usa Azure Stream Analytics, anche il corpo dell'evento inviato deve essere un oggetto JSON. ad esempio body: { "message": "Hello World" }

Usare gli eventi da un hub eventi

Per utilizzare gli eventi da un'istanza di Hub eventi, è anche necessario conoscere il gruppo di consumer che si vuole assegnare. Dopo aver appreso questo problema, è possibile creare un Oggetto EventHubConsumerClient. Mentre nell'esempio seguente viene illustrato un modo per creare il client, vedere la sezione Autenticare il client per apprendere altri modi per creare un'istanza del client.

Il subscribe metodo nel client ha overload che, combinati con il costruttore, può soddisfare diversi modi per utilizzare gli eventi:

Il subscribe metodo accetta un parametro facoltativo di tipo SubscriptionOptions che è possibile usare per specificare opzioni come maxBatchSize (numero di eventi da attendere) e maxWaitTimeInSeconds (quantità di tempo per attendere l'arrivo degli eventi maxBatchSize).

Usare gli eventi in un singolo processo

Iniziare creando un'istanza EventHubConsumerClientdi e quindi chiamare il subscribe() metodo su di esso per iniziare a usare gli eventi.

Il subscribe metodo accetta i callback per elaborare gli eventi quando vengono ricevuti da Hub eventi di Azure. Per interrompere la ricezione di eventi, è possibile chiamare close() l'oggetto subscribe() restituito dal metodo .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Usare gli eventi con carico bilanciato tra più processi

Hub eventi di Azure è in grado di gestire milioni di eventi al secondo. Per ridimensionare l'applicazione di elaborazione, è possibile eseguire più istanze dell'applicazione e bilanciare il carico tra di loro.

Iniziare creando un'istanza dell'oggetto EventHubConsumerClient usando uno degli overload del costruttore che accettano un CheckpointStoreoggetto e quindi chiamare il subscribe() metodo per iniziare a usare gli eventi. L'archivio checkpoint consente ai sottoscrittori all'interno di un gruppo di consumer di coordinare l'elaborazione tra più istanze dell'applicazione.

In questo esempio si userà l'oggetto BlobCheckpointStore@azure/eventhubs-checkpointstore-blob dal pacchetto che implementa la lettura/scrittura necessaria in un archivio durevole usando Archiviazione BLOB di Azure.

Il subscribe metodo accetta i callback per elaborare gli eventi quando vengono ricevuti da Hub eventi di Azure. Per interrompere la ricezione di eventi, è possibile chiamare close() l'oggetto subscribe() restituito dal metodo .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Per altre informazioni, vedere Bilanciare il carico di partizione in più istanze dell'applicazione .

Usare gli eventi da una singola partizione

Iniziare creando un'istanza EventHubConsumerClientdi e quindi chiamare il subscribe() metodo su di esso per iniziare a usare gli eventi. Passare l'ID della partizione di cui si vuole eseguire la destinazione al subscribe() metodo da utilizzare solo da tale partizione.

Nell'esempio seguente viene usata la prima partizione.

Il subscribe metodo accetta i callback per elaborare gli eventi quando vengono ricevuti da Hub eventi di Azure. Per interrompere la ricezione di eventi, è possibile chiamare close() l'oggetto subscribe() restituito dal metodo .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Usare EventHubConsumerClient per usare IotHub

È anche possibile usare EventHubConsumerClient per usare IotHub. Ciò è utile per ricevere i dati di telemetria di IotHub da EventHub collegato. Il stringa di connessione associato non avrà attestazioni, quindi l'invio di eventi non è possibile.

  • Si noti che la stringa di connessione deve essere per un endpoint compatibile con hub eventi ,ad esempio "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Risoluzione dei problemi

Dipendenze AMQP

La libreria hub eventi dipende dalla libreria rhea-promise per la gestione delle connessioni, l'invio e la ricezione di eventi tramite il protocollo AMQP .

Registrazione

È possibile impostare la variabile di ambiente per abilitare la AZURE_LOG_LEVEL registrazione su stderr:

export AZURE_LOG_LEVEL=verbose

Per istruzioni più dettagliate su come abilitare i log, è possibile esaminare la documentazione del pacchetto @azure/logger.

In alternativa, è possibile impostare la DEBUG variabile di ambiente per ottenere i log quando si usa questa libreria. Ciò può essere utile anche se si desidera generare log dalle dipendenze rhea-promise e rhea anche.

Nota: AZURE_LOG_LEVEL, se impostato, ha la precedenza su DEBUG. Non specificare librerie tramite DEBUG quando si specificano azure anche AZURE_LOG_LEVEL o chiamando setLogLevel.

  • Ottenere solo i log di debug a livello di info dall'SDK di Hub eventi.
export DEBUG=azure:*:info
  • Recupero dei log di debug dall'SDK di Hub eventi e dalla libreria a livello di protocollo.
export DEBUG=azure*,rhea*
  • Se non si è interessati a visualizzare i dati dell'evento non elaborati (che utilizza una grande quantità di spazio console/disco) è possibile impostare la DEBUG variabile di ambiente come indicato di seguito:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Se si è interessati solo agli errori e agli avvisi dell'SDK, è possibile impostare la DEBUG variabile di ambiente come indicato di seguito:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Passaggi successivi

Altro codice di esempio

Esaminare la directory degli esempi per esempi di come usare questa libreria per inviare e ricevere eventi da/a Hub eventi.

Contributo

Per contribuire a questa libreria, leggere la guida ai contributi per altre informazioni su come compilare e testare il codice.

Impression