Compartir a través de


Biblioteca cliente de Azure Event Hubs para JavaScript: versión 5.12.1

Azure Event Hubs es un servicio de publicación y suscripción altamente escalable que puede ingerir millones de eventos por segundo y transmitirlos a varios consumidores. Esto le permite procesar y analizar las grandes cantidades de datos generados por los dispositivos y aplicaciones conectados. Si desea obtener más información sobre Azure Event Hubs, puede que desee revisar: ¿Qué es Event Hubs?

La biblioteca cliente de Azure Event Hubs permite enviar y recibir eventos en la aplicación Node.js.

Vínculos clave:

NOTA: si usa la versión 2.1.0 o posterior y desea migrar a la versión más reciente de este paquete, consulte nuestra guía de migración de para pasar de EventHubs V2 a EventHubs V5

Los ejemplos de la versión 2 y la documentación siguen estando disponibles aquí:

código fuente de v2.1.0 | Package for v2.1.0 (npm) | Samples for v2.1.0

Empezar

Instalación del paquete

Instalación de la biblioteca cliente de Azure Event Hubs mediante npm

npm install @azure/event-hubs

Entornos admitidos actualmente

Consulte nuestra de directiva de soporte técnico de para obtener más información.

Prerrequisitos

Configurar TypeScript

Los usuarios de TypeScript deben tener instaladas definiciones de tipo de nodo:

npm install @types/node

También debe habilitar compilerOptions.allowSyntheticDefaultImports en el tsconfig.json. Tenga en cuenta que si ha habilitado compilerOptions.esModuleInterop, allowSyntheticDefaultImports está habilitado de forma predeterminada. Consulte manual de opciones del compilador de TypeScript para obtener más información.

Paquete de JavaScript

Para usar esta biblioteca cliente en el explorador, primero debe usar un agrupador. Para obtener más información sobre cómo hacerlo, consulte nuestra documentación de agrupación de .

Además de lo que se describe allí, esta biblioteca también necesita polirrellenes adicionales para los siguientes módulos integrados principales de NodeJS para poder funcionar correctamente en los exploradores:

  • buffer
  • os
  • path
  • process

Agrupación con Webpack

Si usa Webpack v5, puede instalar las siguientes dependencias de desarrollo.

  • npm install --save-dev os-browserify path-browserify

a continuación, agregue lo siguiente a la webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Agrupación con acumulación

Si usa el empaquetador acumulativo, instale las siguientes dependencias de desarrollo.

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

A continuación, incluya lo siguiente en el rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Consulte la documentación de su empaquetador favorito para obtener más información sobre el uso de polyfills.

Compatibilidad con React Native

De forma similar a los exploradores, React Native no admite algunas API de JavaScript que usa esta biblioteca del SDK, por lo que debe proporcionar polyfills para ellos. Consulte el ejemplo nativo de Messaging React con Expo para obtener más detalles.

Autenticación del cliente

La interacción con Event Hubs comienza con una instancia de la clase EventHubConsumerClient o una instancia de la clase EventHubProducerClient. Hay sobrecargas de constructor para admitir diferentes formas de crear instancias de estas clases, como se muestra a continuación:

Uso de la cadena de conexión para el espacio de nombres de Event Hubs

Una de las sobrecargas del constructor toma una cadena de conexión del formulario Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; y el nombre de entidad a la instancia del centro de eventos. Puede crear un grupo de consumidores y obtener la cadena de conexión, así como el nombre de entidad de la Azure Portal.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Uso de la cadena de conexión para la directiva en el centro de eventos

Otra sobrecarga del constructor toma la cadena de conexión correspondiente a la directiva de acceso compartido que ha definido directamente en la instancia del centro de eventos (y no en el espacio de nombres de Event Hubs). Esta cadena de conexión tendrá el formato Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. La diferencia clave en el formato de cadena de conexión de la sobrecarga del constructor anterior es el ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Uso del espacio de nombres de Event Hubs e Identidad de Azure

Esta sobrecarga del constructor toma el nombre de host y el nombre de entidad de la instancia del centro de eventos y las credenciales que implementan la interfaz TokenCredential. Esto le permite autenticarse mediante una entidad de seguridad de Azure Active Directory. Hay implementaciones de la interfaz TokenCredential disponible en el paquete @azure/identity. El nombre de host es del formato <yournamespace>.servicebus.windows.net. Al usar Azure Active Directory, la entidad de seguridad debe tener asignado un rol que permita el acceso a Event Hubs, como el rol Propietario de datos de Azure Event Hubs. Para más información sobre el uso de la autorización de Azure Active Directory con Event Hubs, consulte la documentación asociada.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Conceptos clave

  • Un productor de Event Hubs es un origen de datos de telemetría, información de diagnóstico, registros de uso u otros datos de registro, como parte de una solución de dispositivo incrustado, una aplicación de dispositivo móvil, un título de juego que se ejecuta en una consola u otro dispositivo, alguna solución empresarial basada en cliente o servidor, o un sitio web.

  • Un consumidor de Event Hubs recoge dicha información del centro de eventos y la procesa. El procesamiento puede implicar la agregación, el cálculo complejo y el filtrado. El procesamiento también puede implicar la distribución o el almacenamiento de la información de forma sin procesar o transformada. Los consumidores de Event Hubs suelen ser partes de infraestructura de plataforma sólidas y de gran escala con funcionalidades de análisis integradas, como Azure Stream Analytics, Apache Spark o Apache Storm.

  • Una partición es una secuencia ordenada de eventos que se mantiene en un centro de eventos. Las particiones son un medio de organización de datos asociada al paralelismo requerido por los consumidores de eventos. Azure Event Hubs proporciona streaming de mensajes a través de un patrón de consumidor con particiones en el que cada consumidor solo lee un subconjunto específico o partición de la secuencia de mensajes. A medida que llegan eventos más recientes, se agregan al final de esta secuencia. El número de particiones se especifica en el momento en que se crea un centro de eventos y no se puede cambiar.

  • Un grupo de consumidores es una vista de todo un centro de eventos. Los grupos de consumidores permiten que varias aplicaciones de consumo tengan una vista independiente de la secuencia de eventos y leer la secuencia de forma independiente a su propio ritmo y desde su propia posición. Puede haber como máximo 5 lectores simultáneos en una partición por grupo de consumidores; sin embargo, se recomienda que solo haya un consumidor activo para una partición determinada y el emparejamiento de grupos de consumidores. Cada lector activo recibe todos los eventos de su partición; Si hay varios lectores en la misma partición, recibirán eventos duplicados.

Para obtener más conceptos y una explicación más detallada, consulte: características de Event Hubs

Instrucciones sobre reintentos

El EventHubConsumerClient y EventHubProducerClient aceptan options donde puede establecer el retryOptions que le permite ajustar cómo controla el SDK los errores transitorios. Algunos ejemplos de errores transitorios incluyen problemas de red o servicio temporales.

Reintentos al consumir eventos

Si se encuentra un error transitorio (por ejemplo, un problema de red temporal) mientras el SDK recibe eventos, reintentará la recepción de eventos en función de las opciones de reintento pasadas al EventHubConsumerClient. Si se agotan los intentos máximos de reintento, se invocará la función processError.

Puede usar la configuración de reintento para controlar la rapidez con la que se le informa sobre problemas temporales, como un problema de conexión de red. Por ejemplo, si necesita saber cuándo hay un problema de red inmediatamente, puede reducir los valores de maxRetries y retryDelayInMs.

Después de ejecutar la función processError, el cliente sigue recibiendo eventos de la partición siempre que el error fuera un reintento. De lo contrario, el cliente invoca la función processClose proporcionada por el usuario. Esta función también se invoca cuando detiene la suscripción o cuando el cliente deja de leer eventos de la partición actual debido a que otra instancia de la aplicación lo recoge como parte del equilibrio de carga.

La función processClose proporciona una oportunidad para actualizar los puntos de control si es necesario. Después de ejecutar processClose, el cliente (o en el caso del equilibrio de carga, un cliente de otra instancia de la aplicación) invocará la función processInitialize proporcionada por el usuario para reanudar la lectura de eventos del último punto de control actualizado para la misma partición.

Si desea dejar de intentar leer eventos, debe llamar a close() en el subscription devuelto por el método subscribe.

Ejemplos

En las secciones siguientes se proporcionan fragmentos de código que cubren algunas de las tareas comunes mediante Azure Event Hubs.

Inspección de un centro de eventos

Muchas operaciones del centro de eventos tienen lugar dentro del ámbito de una partición específica. Dado que las particiones son propiedad del centro de eventos, sus nombres se asignan en el momento de la creación. Para comprender qué particiones están disponibles, consulte el centro de eventos mediante cualquiera de los dos clientes disponibles: EventHubProducerClient o EventHubConsumerClient

En el ejemplo siguiente, se usa un EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publicación de eventos en un centro de eventos

Para publicar eventos, deberá crear un EventHubProducerClient. Aunque en el ejemplo siguiente se muestra una manera de crear el cliente, consulte la sección Autenticar el cliente para obtener otras formas de crear instancias del cliente.

Puede publicar eventos en una partición específica o permitir que el servicio Event Hubs decida qué eventos de partición se deben publicar. Se recomienda usar el enrutamiento automático cuando la publicación de eventos debe ser de alta disponibilidad o cuando los datos de eventos se deben distribuir uniformemente entre las particiones. En el ejemplo siguiente, aprovecharemos el enrutamiento automático.

  • Cree un objeto EventDataBatch mediante el createBatch
  • Agregue eventos al lote mediante el método tryAdd. Puede hacerlo hasta que se alcance el límite máximo de tamaño del lote o hasta que haya terminado de agregar el número de eventos que quiera, lo que ocurra primero. Este método devolvería false para indicar que no se pueden agregar más eventos al lote debido a que se alcanza el tamaño máximo del lote.
  • Envíe el lote de eventos mediante el método sendBatch.

En el ejemplo siguiente, intentamos enviar 10 eventos a Azure Event Hubs.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    const wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Hay opciones que puede pasar en diferentes fases para controlar el proceso de envío de eventos a Azure Event Hubs.

  • El constructor EventHubProducerClient toma un parámetro opcional de tipo EventHubClientOptions que puede usar para especificar opciones como el número de reintentos.
  • El método createBatch toma un parámetro opcional de tipo CreateBatchOptions que puede usar para especificar el tamaño máximo de lote admitido por el lote que se va a crear.
  • El método sendBatch toma un parámetro opcional de tipo SendBatchOptions que puede usar para especificar abortSignal para cancelar la operación actual.
  • En caso de que desee enviar a una partición específica, una sobrecarga del método sendBatch permite pasar el identificador de la partición a la que enviar eventos. El Inspeccionar un centro de eventos ejemplo anterior muestra cómo capturar los identificadores de las particiones disponibles.

Nota: al trabajar con Azure Stream Analytics, el cuerpo del evento que se envía también debe ser un objeto JSON. Por ejemplo: body: { "message": "Hello World" }

Consumo de eventos desde un centro de eventos

Para consumir eventos de una instancia del centro de eventos, también debe saber a qué grupo de consumidores desea dirigirse. Una vez que sepa esto, está listo para crear un EventHubConsumerClient. Aunque en el ejemplo siguiente se muestra una manera de crear el cliente, consulte la sección Autenticar el cliente para obtener otras formas de crear instancias del cliente.

El método subscribe en el cliente tiene sobrecargas que, combinadas con el constructor, pueden satisfacer varias maneras de consumir eventos:

El método subscribe toma un parámetro opcional de tipo SubscriptionOptions que se puede usar para especificar opciones como maxBatchSize (número de eventos para esperar) y maxWaitTimeInSeconds (cantidad de tiempo para esperar a que lleguen eventos maxBatchSize).

Consumo de eventos en un único proceso

Empiece por crear una instancia del EventHubConsumerClienty, a continuación, llame al método subscribe() en él para empezar a consumir eventos.

El método subscribe toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar a close() en el objeto devuelto por el método subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Consumo de eventos con equilibrio de carga en varios procesos

Azure Event Hubs es capaz de tratar millones de eventos por segundo. Para escalar la aplicación de procesamiento, puede ejecutar varias instancias de la aplicación y equilibrar la carga entre sí.

Comience creando una instancia del EventHubConsumerClient mediante una de las sobrecargas del constructor que toman un CheckpointStorey, a continuación, llame al método subscribe() para empezar a consumir eventos. El almacén de puntos de control permitirá a los suscriptores dentro de un grupo de consumidores coordinar el procesamiento entre varias instancias de la aplicación.

En este ejemplo, usaremos el BlobCheckpointStore del paquete @azure/eventhubs-checkpointstore-blob que implementa las lecturas y escrituras necesarias en un almacén duradero mediante Azure Blob Storage.

El método subscribe toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar a close() en el objeto devuelto por el método subscribe().

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Consulte Equilibrar la carga de particiones en varias instancias de la aplicación para obtener más información.

Consumo de eventos de una sola partición

Empiece por crear una instancia del EventHubConsumerClienty, a continuación, llame al método subscribe() en él para empezar a consumir eventos. Pase el identificador de la partición a la que desea dirigirse al método subscribe() para consumir solo desde esa partición.

En el ejemplo siguiente, se usa la primera partición.

El método subscribe toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar a close() en el objeto devuelto por el método subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Uso de EventHubConsumerClient para trabajar con IotHub

También puede usar EventHubConsumerClient para trabajar con IotHub. Esto resulta útil para recibir datos de telemetría de IotHub desde el centro de eventos vinculado. La cadena de conexión asociada no tendrá notificaciones de envío, por lo que no es posible enviar eventos.

  • Tenga en cuenta que la cadena de conexión debe ser para un punto de conexión compatible con Event Hubs (por ejemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Solución de problemas

Dependencias de AMQP

La biblioteca de Event Hubs depende de la biblioteca de de rhea-promise para administrar conexiones, enviar y recibir eventos a través del protocolo de AMQP de .

Registro

Puede establecer la variable de entorno AZURE_LOG_LEVEL para habilitar el registro en stderr:

export AZURE_LOG_LEVEL=verbose

Para obtener instrucciones más detalladas sobre cómo habilitar los registros, puede consultar los documentos del paquete de @azure/registrador.

También puede establecer la variable de entorno DEBUG para obtener registros al usar esta biblioteca. Esto puede ser útil si también desea emitir registros de las dependencias rhea-promise y rhea.

Nota: AZURE_LOG_LEVEL, si se establece, tiene prioridad sobre DEBUG. No especifique ninguna biblioteca de azure a través de DEBUG al especificar también AZURE_LOG_LEVEL ni llamar a setLogLevel.

  • Obtener solo los registros de depuración de nivel de información del SDK de Event Hubs.
export DEBUG=azure:*:info
  • Obtención de registros de depuración del SDK de Event Hubs y la biblioteca de nivel de protocolo.
export DEBUG=azure*,rhea*
  • Si no está interesado en ver los datos de eventos sin procesar (que consume una gran cantidad de espacio en disco o consola), puede establecer la variable de entorno DEBUG de la siguiente manera:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Si solo está interesado en errores y advertencias del SDK , puede establecer la variable de entorno DEBUG de la siguiente manera:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Pasos siguientes

Más código de ejemplo

Eche un vistazo al directorio de ejemplos de para obtener ejemplos detallados sobre cómo usar esta biblioteca para enviar y recibir eventos hacia y desde Event Hubs.

Contribuyendo

Si desea contribuir a esta biblioteca, lea la guía de contribución de para obtener más información sobre cómo compilar y probar el código.

impresiones