Поделиться через


Центры событий Azure библиотеки хранилища контрольных точек для JavaScript с помощью blob-объектов хранилища

Решение на основе хранилища BLOB-объектов Azure для хранения контрольных точек и балансировки нагрузки при использовании EventHubConsumerClient из библиотеки @azure/концентраторов событий

Исходный код | Пакет (npm) | Справочная документация по | APIОбразцы

Начало работы

Установка пакета

Установка библиотеки BLOB-объектов хранилища контрольных точек Центры событий Azure с помощью npm

npm install @azure/eventhubs-checkpointstore-blob

Предварительные требования. Для использования этого пакета у вас должна быть подписка Azure, пространство имен Центров событий и учетная запись хранения.

Если вы используете этот пакет в приложении Node.js, используйте Node.js 8.x или более поздней версии.

Настройка Typescript

Для пользователей TypeScript необходимо установить определения типов Node:

npm install @types/node

Также необходимо включить compilerOptions.allowSyntheticDefaultImports в файле tsconfig.json. Обратите внимание, что если вы включили compilerOptions.esModuleInteropпараметр , allowSyntheticDefaultImports он включен по умолчанию. Дополнительные сведения см. в руководстве по параметрам компилятора TypeScript .

Основные понятия

  • Масштаб. Создайте несколько потребителей и каждый потребитель возьмет на себя ответственность за чтение нескольких секций Центров событий.

  • Балансировка нагрузки: Приложения, поддерживающие балансировку нагрузки, состоят из одного или нескольких экземпляров, настроенных EventHubConsumerClient для использования событий из одного концентратора событий и группы потребителей и одного и того же CheckpointStore. Они распределяют рабочую нагрузку между различными экземплярами, распределяя между собой секции, которые необходимо обрабатывать.

  • Контрольные точки: Это процесс, с помощью которого читатели помечают или фиксируют свою позицию в последовательности событий секции. Создание контрольных точек является ответственностью потребителя и выполняется для каждой секции в пределах группы потребителей. Это означает, что для каждой группы потребителей модуль чтения каждой секции должен хранить свое текущее положение в потоке событий и может сообщать службе, когда он считает поток данных завершенным.

    Если модуль чтения отключается от секции, при повторном подключении он приступает к чтению данных с контрольной точки, которая ранее была отправлена последним модулем чтения этой секции в этой группе потребителей. При подключении модуль чтения передает это смещение в концентратор событий, чтобы указать место, с которого следует начинать чтение. Таким образом, можно использовать контрольные точки как для маркировки событий как "завершенных" подчиненными приложениями, так и для обеспечения устойчивости в случае отработки отказа между модулями чтения, работающими на разных компьютерах. Вы можете вернуться к предыдущим данным, указав более низкое значение смещения по отношению к этому процессу создания контрольных точек. В рамках такого подхода при создании контрольных точек вы обеспечиваете отказоустойчивость и воспроизведение потока событий.

    BlobCheckpointStore — это класс, реализующий ключевые методы, необходимые EventHubConsumerClient для балансировки нагрузки и контрольных точек обновления.

Примеры

Создание с CheckpointStore помощью Хранилище BLOB-объектов Azure

Используйте приведенный ниже фрагмент кода, чтобы создать CheckpointStore. Вам потребуется предоставить строка подключения учетной записи хранения.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

События контрольных точек с использованием хранилища BLOB-объектов Azure

Чтобы события контрольных точек, полученные с помощью Хранилище BLOB-объектов Azure, необходимо передать объект, совместимый с интерфейсом SubscriptionEventHandlers, а также код для вызова updateCheckpoint() метода .

В этом примере реализует SubscriptionEventHandlers, SubscriptionHandlers а также обрабатывает контрольные точки.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Устранение неполадок

Включение журналов

Для переменной AZURE_LOG_LEVEL среды можно задать одно из следующих значений, чтобы включить ведение журнала в stderr:

  • verbose
  • сведения
  • warning
  • error

Вы также можете задать уровень журнала программным способом, импортировав пакет @azure или средства ведения журнала и вызвав функцию setLogLevel с одним из значений уровня журнала.

При задании уровня журнала программными средствами или с помощью AZURE_LOG_LEVEL переменной среды будут создаваться все журналы, которые записываются с использованием уровня журнала, равного или меньше выбранного. Например, если для уровня журнала задано значение info, журналы, которые записываются для уровней warning и error также создаются. При определении уровня для входа в систему этот пакет SDK соответствует рекомендациям пакета Azure SDK для TypeScript.

Можно также задать DEBUG переменную среды для получения журналов при использовании этой библиотеки. Это может быть полезно, если вы также хотите создавать журналы из зависимостей rhea-promise и rhea .

Примечание: AZURE_LOG_LEVEL, если задано, имеет приоритет над DEBUG. Не указывайте библиотеки azure с помощью DEBUG при указании AZURE_LOG_LEVEL или вызове setLogLevel.

При использовании этой библиотеки с помощью приведенной ниже переменной среды можно получить журналы отладки.

  • Получение только журналов отладки уровня информации из blob-объекта Хранилища контрольных точек событий.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Ведение журнала в файле

  • Включите ведение журнала, как показано выше, а затем запустите тестовый скрипт следующим образом:

    • Инструкции ведения журнала из тестового скрипта переходят к , out.log а инструкции ведения журнала из пакета SDK — в debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Инструкции ведения журнала из тестового скрипта и пакета SDK переходят в тот же файл out.log , перенаправляя stderr в stdout (&1), а затем перенаправляя stdout в файл:

      node your-test-script.js >out.log 2>&1
      
    • Инструкции ведения журнала из тестового скрипта и пакета SDK переходят в один и тот же файл out.log.

      node your-test-script.js &> out.log
      

Next Steps

Подробный пример см. в каталоге примеров .

Участие

Если вы хотите вносить изменения в эту библиотеку, ознакомьтесь с руководством по внесению изменений, в котором содержатся сведения о создании и тестировании кода.

Просмотры