Поделиться через


Клиентская библиотека Центров событий Azure для JavaScript версии 5.12.1

Центры событий Azure — это высокомасштабируемая служба публикации-подписки, которая может получать миллионы событий в секунду и передавать их нескольким потребителям. Это позволяет обрабатывать и анализировать большие объемы данных, созданных подключенными устройствами и приложениями. Если вы хотите узнать больше о Центрах событий Azure, вы можете просмотреть следующие сведения: Что такое Центры событий?

Клиентская библиотека Центров событий Azure позволяет отправлять и получать события в приложении Node.js.

Ключевые ссылки:

ПРИМЕЧАНИЕ. Если вы используете версию 2.1.0 или более позднюю версию этого пакета, ознакомьтесь с нашим руководством по миграции , чтобы перейти с EventHubs V2 на EventHubs V5

Примеры для версии 2 и документации по-прежнему доступны здесь:

исходный код для пакета | версии 2.1.0 версии 2.1.0 (npm) для | примеров версии 2.1.0

Начало работы

Установка пакета

Установка клиентской библиотеки Центров событий Azure с помощью npm

npm install @azure/event-hubs

Поддерживаемые в настоящее время среды

Дополнительные сведения см. в политике поддержки .

Необходимые условия

  • подписки Azure
  • Пространство имен центров событий

Настройка TypeScript

Пользователям TypeScript необходимо установить определения типов узлов:

npm install @types/node

Кроме того, необходимо включить compilerOptions.allowSyntheticDefaultImports в tsconfig.json. Обратите внимание, что если вы включили compilerOptions.esModuleInterop, allowSyntheticDefaultImports включен по умолчанию. Дополнительные сведения см. в руководстве по параметрам компилятора TypeScript .

Пакет JavaScript

Чтобы использовать эту клиентную библиотеку в браузере, сначала необходимо использовать пакет. Дополнительные сведения о том, как это сделать, см. в нашей документации по .

Помимо описанного здесь, эта библиотека также нуждается в дополнительных полизаполнениях для следующих встроенных модулей NodeJS, чтобы правильно работать в браузерах:

  • buffer
  • os
  • path
  • process

Объединение с webpack

Если вы используете Webpack версии 5, можно установить следующие зависимости разработки.

  • npm install --save-dev os-browserify path-browserify

Затем добавьте в webpack.config.js следующую команду.

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Объединение с помощью свертки

Если вы используете пакет свертки, установите следующие зависимости разработки

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Затем добавьте в rollup.config.js следующую команду.

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Дополнительные сведения об использовании полизаполнения см. в документации избранного пакета.

Поддержка React Native

Как и в браузерах, React Native не поддерживает некоторые API JavaScript, используемые этой библиотекой SDK, поэтому их необходимо предоставить полизаполнения. Дополнительные сведения см. в примере Messaging React Native с помощью Expo.

Проверка подлинности клиента

Взаимодействие с Центрами событий начинается с экземпляра класса EventHubConsumerClient или экземпляра класса EventHubProducerClient. Существуют перегрузки конструктора для поддержки различных способов создания экземпляров этих классов, как показано ниже:

Использование строки подключения для пространства имен Центров событий

Одна из перегрузок конструктора принимает строку подключения формы Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; и имени сущности к экземпляру Концентратора событий. Вы можете создать группу потребителей и получить строку подключения, а также имя сущности на портале Azure.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Использование строки подключения для политики в Концентраторе событий

Другая перегрузка конструктора принимает строку подключения, соответствующую политике общего доступа, определенной непосредственно в экземпляре Концентратора событий (а не пространстве имен Центров событий). Эта строка подключения будет иметь форму Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. Ключевым отличием в формате строки подключения от предыдущей перегрузки конструктора является ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Использование пространства имен Центров событий и удостоверения Azure

Эта перегрузка конструктора принимает имя узла и имя сущности экземпляра Концентратора событий и учетные данные, реализующие интерфейс TokenCredential. Это позволяет пройти проверку подлинности с помощью субъекта Azure Active Directory. Существуют реализации интерфейса TokenCredential, доступные в пакете @azure/identity. Имя узла имеет формат <yournamespace>.servicebus.windows.net. При использовании Azure Active Directory субъекту необходимо назначить роль, которая позволяет получить доступ к центрам событий, таким как роль владельца данных Центров событий Azure. Дополнительные сведения об использовании авторизации Azure Active Directory с Центрами событий см. в связанной документации.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Основные понятия

  • Производитель Концентратора событий — это источник данных телеметрии, диагностических сведений, журналов использования или других данных журнала в рамках внедренного решения для устройств, приложения мобильного устройства, названия игры, работающего на консоли или другом устройстве, некоторых клиентских или серверных бизнес-решениях или веб-сайте.

  • Потребитель центра событий получает такие сведения из Концентратора событий и обрабатывает его. Обработка может включать агрегирование, сложные вычисления и фильтрацию. Обработка также может включать распределение или хранение информации в необработанном или преобразованном виде. Потребители Концентратора событий часто являются надежными и высокомасштабируемыми компонентами инфраструктуры платформы с встроенными возможностями аналитики, такими как Azure Stream Analytics, Apache Spark или Apache Storm.

  • секции — это упорядоченная последовательность событий, которые хранятся в концентраторе событий. Секции — это средство организации данных, связанной с параллелизмом, необходимым для потребителей событий. Центры событий Azure обеспечивают потоковую передачу сообщений через секционированные шаблоны потребителей, в которых каждый потребитель считывает только определенный подмножество или секцию потока сообщений. По мере поступления новых событий они добавляются в конец этой последовательности. Число секций указывается во время создания концентратора событий и не может быть изменено.

  • группы потребителей — это представление всего концентратора событий. Группы потребителей позволяют каждому из нескольких приложений использовать отдельное представление потока событий, а также читать поток независимо по своему темпу и от собственной позиции. На секцию для каждой группы потребителей может находиться не более 5 параллельных читателей; однако рекомендуется использовать только один активный потребитель для определенной пары секций и групп потребителей. Каждый активный читатель получает все события из своей секции; Если в одной секции есть несколько читателей, они получат повторяющиеся события.

Дополнительные понятия и более глубокое обсуждение см. в статье Функции Центров событий

Руководство по повторным попыткам

EventHubConsumerClient и EventHubProducerClient принять options, где можно задать retryOptions, которые позволяют настроить способ обработки временных ошибок пакета SDK. Примеры временных ошибок включают временные проблемы сети или службы.

Повторные попытки при использовании событий

Если возникает временная ошибка (например, временная сетевая проблема) во время получения событий пакета SDK, он повторит получение событий на основе параметров повторных попыток, переданных в EventHubConsumerClient. Если максимальные попытки повторных попыток исчерпаны, вызывается функция processError.

Параметры повторных попыток можно использовать для управления тем, как быстро вы узнаете о временных проблемах, таких как проблема с сетевым подключением. Например, если вам нужно знать, когда возникла проблема с сетью, вы можете сразу снизить значения для maxRetries и retryDelayInMs.

После выполнения функции processError клиент продолжает получать события из секции до тех пор, пока ошибка была повторной. В противном случае клиент вызывает предоставленную пользователем функцию processClose. Эта функция также вызывается при остановке подписки или при остановке чтения событий клиента из текущей секции из-за того, что он выбирается другим экземпляром приложения в рамках балансировки нагрузки.

Функция processClose предоставляет возможность обновлять контрольные точки при необходимости. После выполнения processCloseклиент (или в случае балансировки нагрузки клиент из другого экземпляра приложения) вызовет предоставленную пользователем функцию processInitialize, чтобы возобновить чтение событий из последней обновленной контрольной точки для той же секции.

Если вы хотите остановить попытку чтения событий, необходимо вызвать close() на subscription, возвращаемой методом subscribe.

Примеры

В следующих разделах приведены фрагменты кода, охватывающие некоторые распространенные задачи с помощью Центров событий Azure.

Проверка концентратора событий

Многие операции Концентратора событий выполняются в пределах определенной секции. Так как секции принадлежат Концентратору событий, их имена назначаются во время создания. Чтобы понять, какие разделы доступны, необходимо запросить Концентратор событий с помощью одного из двух доступных клиентов: EventHubProducerClient или EventHubConsumerClient

В приведенном ниже примере мы используем EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Публикация событий в Концентраторе событий

Чтобы опубликовать события, необходимо создать EventHubProducerClient. В приведенном ниже примере показано, как создать клиент, см. раздел Аутентификация клиента, чтобы узнать другие способы создания экземпляра клиента.

События могут публиковаться в определенном разделе или разрешать службе Центров событий решать, какие события секции следует публиковать. Рекомендуется использовать автоматическую маршрутизацию, если публикация событий должна быть высокодоступна или когда данные событий должны распределяться равномерно между секциями. В приведенном ниже примере мы воспользуемся автоматической маршрутизацией.

  • Создание объекта EventDataBatch с помощью createBatch
  • Добавьте события в пакет с помощью метода tryAdd. Это можно сделать до тех пор, пока не будет достигнуто максимальное ограничение размера пакета или до тех пор, пока не будет выполнено добавление количества нужных событий. Этот метод возвращает false, чтобы указать, что больше событий нельзя добавить в пакет из-за максимального размера пакета.
  • Отправьте пакет событий с помощью метода sendBatch.

В приведенном ниже примере мы пытаемся отправить 10 событий в Центры событий Azure.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    const wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Существуют варианты, которые можно передать на разных этапах для управления процессом отправки событий в Центры событий Azure.

  • Конструктор EventHubProducerClient принимает необязательный параметр типа EventHubClientOptions который можно использовать для указания параметров, таких как количество повторных попыток.
  • Метод createBatch принимает необязательный параметр типа CreateBatchOptions который можно использовать для speicify максимального размера пакета, поддерживаемого созданным пакетом.
  • Метод sendBatch принимает необязательный параметр типа SendBatchOptions, который можно использовать для указания abortSignal для отмены текущей операции.
  • Если вы хотите отправить в определенную секцию, перегрузка метода sendBatch позволяет передать идентификатор секции для отправки событий. В приведенном выше примере проверить концентратор событий показано, как получить доступные идентификаторы секций.

примечание. При работе с Azure Stream Analytics текст отправляемого события также должен быть объектом JSON. Например, body: { "message": "Hello World" }

Использование событий из концентратора событий

Чтобы использовать события из экземпляра Концентратора событий, также необходимо знать, какую группу потребителей вы хотите использовать. После этого вы будете готовы создать EventHubConsumerClient. В приведенном ниже примере показано, как создать клиент, см. раздел Аутентификация клиента, чтобы узнать другие способы создания экземпляра клиента.

Метод subscribe на клиенте имеет перегрузки, которые в сочетании с конструктором могут выполнять несколько способов использования событий:

Метод subscribe принимает необязательный параметр типа SubscriptionOptions который можно использовать для указания таких параметров, как maxBatchSize (число событий для ожидания) и maxWaitTimeInSeconds (время ожидания получения событий maxBatchSize).

Использование событий в одном процессе

Начните с создания экземпляра EventHubConsumerClient, а затем вызовите метод subscribe() для начала использования событий.

Метод subscribe принимает обратные вызовы для обработки событий по мере их получения от Центров событий Azure. Чтобы остановить получение событий, можно вызвать close() для объекта, возвращаемого методом subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Использование событий с балансировкой нагрузки в нескольких процессах

Центры событий Azure могут работать с миллионами событий в секунду. Чтобы масштабировать приложение обработки, можно запустить несколько экземпляров приложения и сбалансировать нагрузку между собой.

Начните с создания экземпляра EventHubConsumerClient с помощью одной из перегрузок конструктора, которые принимают CheckpointStore, а затем вызовите метод subscribe(), чтобы начать использование событий. Хранилище контрольных точек позволит подписчикам в группе потребителей координировать обработку между несколькими экземплярами приложения.

В этом примере мы будем использовать BlobCheckpointStore из пакета @azure/eventhubs-checkpointstore-blob, который реализует необходимые операции чтения и записи в устойчивое хранилище с помощью хранилища BLOB-объектов Azure.

Метод subscribe принимает обратные вызовы для обработки событий по мере их получения от Центров событий Azure. Чтобы остановить получение событий, можно вызвать close() для объекта, возвращаемого методом subscribe().

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Дополнительные сведения см. в статье Балансировка нагрузки секции в нескольких экземплярах приложения.

Использование событий из одной секции

Начните с создания экземпляра EventHubConsumerClient, а затем вызовите метод subscribe() для начала использования событий. Передайте идентификатор секции, которую вы хотите нацелить на метод subscribe(), чтобы использовать его только из этой секции.

В приведенном ниже примере мы используем первую секцию.

Метод subscribe принимает обратные вызовы для обработки событий по мере их получения от Центров событий Azure. Чтобы остановить получение событий, можно вызвать close() для объекта, возвращаемого методом subscribe().

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Использование EventHubConsumerClient для работы с IotHub

Вы также можете использовать EventHubConsumerClient для работы с IotHub. Это полезно для получения данных телеметрии IotHub из связанного eventHub. Связанная строка подключения не будет отправлять утверждения, поэтому отправка событий невозможна.

  • Обратите внимание, что строка подключения должна быть для конечной точки, совместимой с центром событий (например, Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Устранение неполадок

Зависимости AMQP

Библиотека Центров событий зависит от библиотеки rhea-promise для управления подключениями, отправки и получения событий по протоколу AMQP.

Лесозаготовка

Можно задать переменную среды AZURE_LOG_LEVEL, чтобы включить ведение журнала для stderr:

export AZURE_LOG_LEVEL=verbose

Дополнительные инструкции по включению журналов см. в документации по пакету @azure/loger.

Можно также задать переменную среды DEBUG для получения журналов при использовании этой библиотеки. Это может быть полезно, если вы также хотите выдавать журналы из зависимостей rhea-promise и rhea.

Примечание. AZURE_LOG_LEVEL, если задано, имеет приоритет над DEBUG. Не указывайте библиотеки azure через DEBUG при указании AZURE_LOG_LEVEL или вызова setLogLevel.

  • Получение журналов отладки только на уровне сведений из пакета SDK центров событий.
export DEBUG=azure:*:info
  • Получение журналов отладки из пакета SDK центров событий и библиотеки уровней протокола.
export DEBUG=azure*,rhea*
  • Если вы не заинтересованы в просмотре необработанных данных событий (который потребляет большое количество дискового пространства консоли или диска), можно задать переменную среды DEBUG следующим образом:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Если вы заинтересованы только в ошибках и пакета SDK, можно задать переменную среды DEBUG следующим образом:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Дальнейшие действия

Дополнительные примеры кода

Ознакомьтесь с примерами каталоге подробных примеров использования этой библиотеки для отправки и получения событий в центры событий .

Способствует

Если вы хотите внести свой вклад в эту библиотеку, ознакомьтесь с руководством по вкладу, чтобы узнать больше о том, как создавать и тестировать код.

впечатлений