Compartilhar via


Hubs de Eventos do Azure biblioteca de clientes para JavaScript – versão 5.12.0

Hubs de Eventos do Azure é um serviço de publicação-assinatura altamente escalonável que pode ingerir milhões de eventos por segundo e transmiti-los para vários consumidores. Isso permite processar e analisar as grandes quantidades de dados produzidos por seus dispositivos e aplicativos conectados. Se você quiser saber mais sobre Hubs de Eventos do Azure, convém examinar: O que são os Hubs de Eventos?

A biblioteca de clientes Hubs de Eventos do Azure permite que você envie e receba eventos em seu aplicativo Node.js.

Links principais:

OBSERVAÇÃO: se você estiver usando a versão 2.1.0 ou inferior e quiser migrar para a versão mais recente deste pacote, examine nosso guia de migração para migrar do EventHubs V2 para o EventHubs V5

Exemplos de v2 e documentação ainda estão disponíveis aqui:

Código-fonte para v2.1.0 | Pacote para v2.1.0 (npm) | Exemplos para v2.1.0

Introdução

Instalar o pacote

Instalar a biblioteca de clientes Hubs de Eventos do Azure usando o npm

npm install @azure/event-hubs

Ambientes com suporte no momento

Confira nossa política de suporte para mais detalhes.

Pré-requisitos

Configurar TypeScript

Os usuários do TypeScript precisam ter definições de tipo de nó instaladas:

npm install @types/node

Você também precisa habilitar compilerOptions.allowSyntheticDefaultImports em seu tsconfig.json. Observe que, se você tiver habilitado compilerOptions.esModuleInterop, allowSyntheticDefaultImports será habilitado por padrão. Consulte o manual de opções do compilador do TypeScript para obter mais informações.

Pacote JavaScript

Para usar essa biblioteca de clientes no navegador, primeiro você precisa usar um empacotador. Para obter detalhes sobre como fazer isso, consulte nossa documentação de agrupamento.

Além do que está descrito lá, essa biblioteca também precisa de polifills adicionais para os seguintes módulos internos do NodeJS Core para funcionar corretamente nos navegadores:

  • buffer
  • os
  • path
  • process

Agrupar com o Webpack

Se você estiver usando o Webpack v5, poderá instalar as dependências de desenvolvimento a seguir

  • npm install --save-dev os-browserify path-browserify

em seguida, adicione o seguinte à sua webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Agrupando com rollup

Se você estiver usando o pacote cumulativo rollup, instale as dependências de desenvolvimento a seguir

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Em seguida, inclua o seguinte em seu rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Consulte a documentação do seu empacotador favorito para obter mais informações sobre como usar polyfills.

Suporte React Native

Semelhante aos navegadores, React Native não dá suporte a alguma API JavaScript usada por essa biblioteca do SDK, portanto, você precisa fornecer polyfills para eles. Consulte o exemplo React Native mensagens com a Expo para obter mais detalhes.

Autenticar o cliente

A interação com os Hubs de Eventos começa com uma instância da classe EventHubConsumerClient ou uma instância da classe EventHubProducerClient . Há sobrecargas de construtor para dar suporte a diferentes maneiras de instanciar essas classes, conforme mostrado abaixo:

Usar cadeia de conexão para o namespace dos Hubs de Eventos

Uma das sobrecargas do construtor leva um cadeia de conexão do formulário Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; e do nome da entidade para sua instância do Hub de Eventos. Você pode criar um grupo de consumidores e obter o cadeia de conexão, bem como o nome da entidade do portal do Azure.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Usar cadeia de conexão para política no Hub de Eventos

Outra sobrecarga do construtor usa o cadeia de conexão correspondente à política de acesso compartilhado que você definiu diretamente na instância do Hub de Eventos (e não no namespace dos Hubs de Eventos). Esse cadeia de conexão será do formulário Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. A principal diferença no formato cadeia de conexão da sobrecarga do construtor anterior é o ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Usar o namespace dos Hubs de Eventos e a Identidade do Azure

Essa sobrecarga do construtor usa o nome do host e o nome da entidade da instância e da credencial do Hub de Eventos que implementa a interface TokenCredential. Isso permite que você se autentique usando uma entidade de segurança do Azure Active Directory. Há implementações da TokenCredential interface disponível no pacote de @azure/identidade . O nome do host é do formato <yournamespace>.servicebus.windows.net. Ao usar o Azure Active Directory, sua entidade de segurança deve receber uma função que permita o acesso aos Hubs de Eventos, como a função proprietário de dados Hubs de Eventos do Azure. Para obter mais informações sobre como usar a autorização do Azure Active Directory com hubs de eventos, consulte a documentação associada.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Principais conceitos

  • Um produtor do Hub de Eventos é uma fonte de dados de telemetria, informações de diagnóstico, logs de uso ou outros dados de log, como parte de uma solução de dispositivo inserido, um aplicativo de dispositivo móvel, um título de jogo em execução em um console ou outro dispositivo, alguma solução de negócios baseada em cliente ou servidor ou um site da Web.

  • Um consumidor do Hub de Eventos pega essas informações no Hub de Eventos e as processa. O processamento pode envolver agregação, computação complexa e filtragem. O processamento também pode envolver a distribuição ou o armazenamento das informações de maneira bruta ou transformada. Os consumidores do Hub de Eventos geralmente são partes de infraestrutura de plataforma robustas e de alta escala com recursos de análise integrados, como Azure Stream Analytics, Apache Spark ou Apache Storm.

  • Uma partição é uma sequência ordenada de eventos que é mantida em um Hub de Eventos. As partições são um meio de organização de dados associada ao paralelismo exigido pelos consumidores de eventos. Os Hubs de Eventos do Azure fornecem streaming de mensagens por meio de um padrão de consumidor particionado no qual cada consumidor lê apenas um subconjunto específico, ou partição, do fluxo de mensagens. À medida que novos eventos chegam, eles são adicionados ao final dessa sequência. O número de partições é especificado no momento em que um Hub de Eventos é criado e não pode ser alterado.

  • Um grupo de consumidores é uma exibição de um Hub de Eventos inteiro. Os grupos de consumidores habilitam vários aplicativos de consumo para que cada um tenha um modo de exibição do fluxo de evento separado e para ler o fluxo de forma independente em seu próprio ritmo e com seus próprio deslocamentos. Pode haver no máximo cinco leitores simultâneos em uma partição por grupo de consumidores; no entanto, é recomendável que haja apenas um consumidor ativo para um determinado emparelhamento de partição e grupo de consumidores. Cada leitor ativo recebe todos os eventos de sua partição; Se houver vários leitores na mesma partição, eles receberão eventos duplicados.

Para obter mais conceitos e discussões mais profundas, consulte: Recursos dos Hubs de Eventos

Diretrizes em torno de repetições

O EventHubConsumerClient e EventHubProducerClient aceitam options onde você pode definir o retryOptions que permite ajustar como o SDK lida com erros transitórios. Exemplos de erros transitórios incluem problemas temporários de rede ou serviço.

Repetições ao consumir eventos

Se um erro transitório (por exemplo, um problema temporário de rede) for encontrado enquanto o SDK estiver recebendo eventos, ele tentará novamente receber eventos com base nas opções de repetição passadas para o EventHubConsumerClient. Se as tentativas máximas de repetição forem esgotadas, a processError função será invocada.

Você pode usar as configurações de repetição para controlar a rapidez com que é informado sobre problemas temporários, como um problema de conexão de rede. Por exemplo, se você precisar saber quando há um problema de rede imediatamente, poderá reduzir os valores para maxRetries e retryDelayInMs.

Depois de executar a processError função, o cliente continua recebendo eventos da partição, desde que o erro tenha sido repetível. Caso contrário, o cliente invocará a função fornecida pelo processClose usuário. Essa função também é invocada quando você interrompe a assinatura ou quando o cliente para de ler eventos da partição atual devido à sua captação por outra instância do aplicativo como parte do balanceamento de carga.

A processClose função oferece uma oportunidade de atualizar pontos de verificação, se necessário. Depois de processCloseexecutar , o cliente (ou no caso de balanceamento de carga, um cliente de outra instância do aplicativo) invocará a função fornecida pelo processInitialize usuário para retomar a leitura de eventos do último ponto de verificação atualizado para a mesma partição.

Se você quiser parar de tentar ler eventos, deverá chamar close() no subscription retornado pelo subscribe método .

Exemplos

As seções a seguir fornecem snippets de código que abrangem algumas das tarefas comuns usando Hubs de Eventos do Azure

Inspecione um Hub de Eventos

Muitas operações do Hub de Eventos ocorrem dentro do escopo de uma partição específica. Como as partições pertencem ao Hub de Eventos, seus nomes são atribuídos no momento da criação. Para entender quais partições estão disponíveis, consulte o Hub de Eventos usando um dos dois clientes disponíveis: EventHubProducerClient ou EventHubConsumerClient

No exemplo abaixo, estamos usando um EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Envio de eventos para um Hub de Eventos

Para publicar eventos, você precisará criar um EventHubProducerClient. Embora o exemplo abaixo mostre uma maneira de criar o cliente, consulte a seção Autenticar o cliente para saber outras maneiras de instanciar o cliente.

Você pode publicar eventos em uma partição específica ou permitir que o serviço de Hubs de Eventos decida em quais eventos de partição devem ser publicados. É recomendável usar o roteamento automático quando a publicação de eventos precisa estar altamente disponível ou quando os dados de evento devem ser distribuídos uniformemente entre as partições. No exemplo abaixo, aproveitaremos o roteamento automático.

  • Create um EventDataBatch objeto usando o createBatch
  • Adicione eventos ao lote usando o método tryAdd . Você pode fazer isso até que o limite máximo de tamanho do lote seja atingido ou até terminar de adicionar o número de eventos que você gostou, o que vier primeiro. Esse método retornaria false para indicar que não é possível adicionar mais eventos ao lote devido ao tamanho máximo do lote que está sendo atingido.
  • Envie o lote de eventos usando o método sendBatch .

No exemplo abaixo, tentamos enviar 10 eventos para Hubs de Eventos do Azure.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Há opções que você pode passar em diferentes estágios para controlar o processo de envio de eventos para Hubs de Eventos do Azure.

  • O EventHubProducerClient construtor usa um parâmetro opcional do tipo EventHubClientOptions que você pode usar para especificar opções como o número de repetições.
  • O createBatch método usa um parâmetro opcional do tipo CreateBatchOptions que você pode usar para speicificar o tamanho máximo do lote com suporte pelo lote que está sendo criado.
  • O sendBatch método usa um parâmetro opcional do tipo SendBatchOptions que você pode usar para especificar abortSignal para cancelar a operação atual.
  • Caso deseje enviar para uma partição específica, uma sobrecarga do sendBatch método permite que você passe a ID da partição para a qual enviar eventos. O exemplo Inspecionar um Hub de Eventos acima mostra como buscar as IDs de partições disponíveis.

Observação: ao trabalhar com o Azure Stream Analytics, o corpo do evento que está sendo enviado também deve ser um objeto JSON. Por exemplo: body: { "message": "Hello World" }

Consumir eventos de um Hub de Eventos

Para consumir eventos de uma instância do Hub de Eventos, você também precisa saber qual grupo de consumidores deseja direcionar. Depois de saber disso, você estará pronto para criar um EventHubConsumerClient. Embora o exemplo abaixo mostre uma maneira de criar o cliente, consulte a seção Autenticar o cliente para saber outras maneiras de instanciar o cliente.

O subscribe método no cliente tem sobrecargas que, combinadas com o construtor, podem atender a várias maneiras de consumir eventos:

O subscribe método usa um parâmetro opcional do tipo SubscriptionOptions que você pode usar para especificar opções como maxBatchSize (número de eventos a aguardar) e maxWaitTimeInSeconds (quantidade de tempo para aguardar a chegada dos eventos maxBatchSize).

Consumir eventos em um único processo

Comece criando uma instância do EventHubConsumerCliente chame o subscribe() método para começar a consumir eventos.

O subscribe método recebe retornos de chamada para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para interromper o recebimento de eventos, você pode chamar close() no objeto retornado pelo subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Consumir eventos com balanceamento de carga em vários processos

Hubs de Eventos do Azure é capaz de lidar com milhões de eventos por segundo. Para dimensionar seu aplicativo de processamento, você pode executar várias instâncias do aplicativo e fazer com que ele equilibre a carga entre si.

Comece criando uma instância do EventHubConsumerClient usando uma das sobrecargas do construtor que usam um CheckpointStoree chame o subscribe() método para começar a consumir eventos. O repositório de ponto de verificação permitirá que os assinantes em um grupo de consumidores coordenem o processamento entre várias instâncias do aplicativo.

Neste exemplo, usaremos o BlobCheckpointStore do @azure/eventhubs-checkpointstore-blob pacote que implementa as leituras/gravações necessárias em um repositório durável usando Armazenamento de Blobs do Azure.

O subscribe método recebe retornos de chamada para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para interromper o recebimento de eventos, você pode chamar close() no objeto retornado pelo subscribe() método .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Confira Balancear a carga de partição em várias instâncias do aplicativo para saber mais.

Consumir eventos de uma única partição

Comece criando uma instância do EventHubConsumerCliente chame o subscribe() método para começar a consumir eventos. Passe a ID da partição que você deseja direcionar para o subscribe() método para consumir somente dessa partição.

No exemplo abaixo, estamos usando a primeira partição.

O subscribe método recebe retornos de chamada para processar eventos à medida que são recebidos de Hubs de Eventos do Azure. Para interromper o recebimento de eventos, você pode chamar close() no objeto retornado pelo subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Usar EventHubConsumerClient para trabalhar com o IotHub

Você também pode usar EventHubConsumerClient para trabalhar com o IotHub. Isso é útil para receber dados de telemetria do IotHub do EventHub vinculado. O cadeia de conexão associado não terá declarações de envio, portanto, o envio de eventos não é possível.

  • Observe que o cadeia de conexão precisa ser para um ponto de extremidade compatível com o Hub de Eventos (por exemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Solução de problemas

Dependências amqp

A biblioteca de Hubs de Eventos depende da biblioteca rhea-promise para gerenciar conexões, enviar e receber eventos pelo protocolo AMQP .

Registro em log

Você pode definir a variável de ambiente para habilitar o AZURE_LOG_LEVEL registro em log como stderr:

export AZURE_LOG_LEVEL=verbose

Para obter instruções mais detalhadas sobre como habilitar logs, veja os documentos do pacote @azure/logger.

Como alternativa, você pode definir a DEBUG variável de ambiente para obter logs ao usar essa biblioteca. Isso pode ser útil se você também quiser emitir logs das dependências rhea-promise e rhea também.

Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG. Não especifique nenhuma azure biblioteca por meio de DEBUG ao especificar AZURE_LOG_LEVEL ou chamar setLogLevel.

  • Obtendo apenas logs de depuração no nível de informações do SDK dos Hubs de Eventos.
export DEBUG=azure:*:info
  • Obtendo logs de depuração do SDK dos Hubs de Eventos e da biblioteca de nível de protocolo.
export DEBUG=azure*,rhea*
  • Se você não estiver interessado em exibir os dados brutos do evento (que consome uma grande quantidade de espaço em console/disco), poderá definir a variável de ambiente da DEBUG seguinte maneira:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Se você estiver interessado apenas em erros e avisos do SDK, poderá definir a DEBUG variável de ambiente da seguinte maneira:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Próximas etapas

Mais códigos de exemplo

Examine o diretório de exemplos para obter exemplos detalhados de como usar essa biblioteca para enviar e receber eventos de/para Hubs de Eventos.

Contribuição

Se você quiser contribuir com essa biblioteca, leia o guia de contribuição para saber como criar e testar o código.

Impressões