你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用存储 Blob Azure 事件中心适用于 Javascript 的检查点存储库

基于 Azure Blob 存储的解决方案,用于存储检查点,并在从 @azure/事件中心库使用EventHubConsumerClient时帮助实现负载均衡

源代码 | 包 (npm) | API 参考文档 | 样品

入门

安装包

使用 npm 安装Azure 事件中心检查点存储 Blob 库

npm install @azure/eventhubs-checkpointstore-blob

先决条件:必须具有 Azure 订阅事件中心命名空间 才能使用此包,以及 存储帐户

如果在 Node.js 应用程序中使用此包,请使用 Node.js 8.x 或更高版本。

配置 Typescript

TypeScript 用户需要安装 Node 类型定义:

npm install @types/node

还需要在 tsconfig.json 中启用 compilerOptions.allowSyntheticDefaultImports 。 请注意,如果已启用 compilerOptions.esModuleInteropallowSyntheticDefaultImports 则默认启用 。 有关详细信息 ,请参阅 TypeScript 的编译器选项手册

关键概念

  • 缩放: 创建多个使用者,每个使用者获取若干事件中心分区的读取所有权。

  • 负载均衡: 支持负载均衡的应用程序由一个或多个 实例组成,这些实例 EventHubConsumerClient 已配置为使用来自同一事件中心和使用者组以及同一个 CheckpointStore的事件。 它们通过将要处理的分区分布到各个实例之间来平衡不同实例的工作负荷。

  • 检查点: 这是一个读取器标记或提交其在分区事件序列中的位置的过程。 检查点操作由使用者负责,并在使用者组中的每个分区上进行。 这种责任意味着,对于每个使用者组而言,每个分区读取者必须跟踪它在事件流中的当前位置,当它认为数据流已完成时,可以通知服务。

    如果读取者与分区断开连接,当它重新连接时,将开始读取前面由该使用者组中该分区的最后一个读取者提交的检查点。 当读取者建立连接时,它会将此偏移量传递给事件中心,以指定要从其开始读取数据的位置。 这样,用户便可以使用检查点将事件标记为已由下游应用程序“完成”,并且在不同计算机上运行的读取者之间发生故障转移时,还可以提供弹性。 若要返回到较旧的数据,可以在此检查点过程中指定较低的偏移量。 借助此机制,检查点可以实现故障转移复原和事件流重放。

    BlobCheckpointStore 是实现 EventHubConsumerClient 平衡负载和更新检查点所需的关键方法的类。

示例

CheckpointStore使用 Azure Blob 存储 创建

使用以下代码片段创建 CheckpointStore。 需要向存储帐户提供连接字符串。

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

使用 Azure Blob 存储的检查点事件

若要使用 Azure Blob 存储 接收的检查点事件,需要传递与 SubscriptionEventHandlers 接口兼容的对象以及调用方法的代码updateCheckpoint()

在此示例中, SubscriptionHandlers 实现 SubscriptionEventHandlers 并处理检查点。

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

疑难解答

启用日志

可以将 环境变量设置为 AZURE_LOG_LEVEL 以下值之一,以启用日志记录到 stderr

  • verbose
  • info
  • warning
  • error

还可以通过导入 @azure/logger 包并使用其中一个日志级别值调用 setLogLevel 函数,以编程方式设置日志级别。

以编程方式或通过 AZURE_LOG_LEVEL 环境变量设置日志级别时,将发出使用等于或小于所选日志级别的日志级别写入的任何日志。 例如,将日志级别设置为 info时,也会发出为 级别 warningerror 写入的日志。 在确定要记录到的级别时,此 SDK 遵循 Azure SDK for TypeScript 准则

或者,可以设置 环境变量, DEBUG 以在使用此库时获取日志。 如果还希望从依赖项 rhea-promiserhea 发出日志,这非常有用。

注意: AZURE_LOG_LEVEL(如果已设置)优先于 DEBUG。 指定 AZURE_LOG_LEVEL 或调用 setLogLevel 时,请勿通过 DEBUG 指定任何 azure 库。

使用此库时,可设置以下环境变量来获取调试日志。

  • 仅从 Eventhubs Checkpointstore Blob 获取信息级别调试日志。
export DEBUG=azure:eventhubs-checkpointstore-blob:info

记录到文件

  • 如上所示启用日志记录,然后运行测试脚本,如下所示:

    • 将测试脚本中的日志记录语句转到 out.log ,sdk 中的日志记录语句将转到 debug.log

      node your-test-script.js > out.log 2>debug.log
      
    • 将测试脚本和 sdk 中的日志记录语句重定向到 stdout (&1) ,然后将 stdout 重定向到文件,从而转到同 out.log 一文件:

      node your-test-script.js >out.log 2>&1
      
    • 将测试脚本和 sdk 中的语句记录到同一文件 out.log

      node your-test-script.js &> out.log
      

后续步骤

有关详细示例,请查看 示例 目录。

贡献

若要为此库做出贡献,请阅读贡献指南,详细了解如何生成和测试代码。

曝光数