你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 JavaScript 的Azure 事件中心客户端库 - 版本 5.12.0
Azure 事件中心是一种高度可缩放的发布-订阅服务,每秒可以引入数百万个事件并将其流式传输到多个使用者。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 如果想要了解有关Azure 事件中心的详细信息,请查看:什么是事件中心?
Azure 事件中心客户端库允许在 Node.js 应用程序中发送和接收事件。
关键链接:
注意:如果使用的是版本 2.1.0 或更低版本,并且想要迁移到此包的最新版本,请查看我们的 迁移指南,从 EventHubs V2 迁移到 EventHubs V5
此处仍提供 v2 示例和文档:
v2.1.0 | 的源代码v2.1.0 (npm) | 包v2.1.0 的示例
入门
安装包
使用 npm 安装 Azure 事件中心 客户端库
npm install @azure/event-hubs
目前支持的环境
- LTS 版本的 Node.js
- 最新版本的 Safari、Chrome、Edge 和 Firefox。
有关更多详细信息,请参阅我们的支持政策。
先决条件
配置 TypeScript
TypeScript 用户需要安装 Node 类型定义:
npm install @types/node
还需要在tsconfig.json中启用 compilerOptions.allowSyntheticDefaultImports
。 请注意,如果已启用 compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
则默认启用 。 有关详细信息 ,请参阅 TypeScript 的编译器选项手册 。
JavaScript 捆绑包
若要在浏览器中使用此客户端库,首先需要使用捆绑程序。 有关如何执行此操作的详细信息,请参阅捆绑 文档。
除了其中所述的内容外,此库还需要以下 NodeJS 核心内置模块的附加填充,以便在浏览器中正常工作:
buffer
os
path
process
与 Webpack 捆绑
如果使用的是 Webpack v5,则可以安装以下开发依赖项
npm install --save-dev os-browserify path-browserify
然后将以下内容添加到 webpack.config.js
const path = require("path");
+const webpack = require("webpack");
module.exports = {
entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
},
],
},
+ plugins: [
+ new webpack.ProvidePlugin({
+ process: "process/browser",
+ }),
+ new webpack.ProvidePlugin({
+ Buffer: ["buffer", "Buffer"],
+ }),
+ ],
resolve: {
extensions: [".ts", ".js"],
+ fallback: {
+ buffer: require.resolve("buffer/"),
+ os: require.resolve("os-browserify"),
+ path: require.resolve("path-browserify"),
+ },
},
使用汇总捆绑
如果使用汇总捆绑程序,请安装以下开发依赖项
npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve
然后在 rollup.config.js 中包括以下内容
+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";
export default {
// other configs
plugins: [
+ shim({
+ fs: `export default {}`,
+ net: `export default {}`,
+ tls: `export default {}`,
+ path: `export default {}`,
+ dns: `export function resolve() { }`,
+ }),
+ nodeResolve({
+ mainFields: ["module", "browser"],
+ preferBuiltins: false,
+ }),
+ cjs(),
+ inject({
+ modules: {
+ Buffer: ["buffer", "Buffer"],
+ process: "process",
+ },
+ exclude: ["./**/package.json"],
+ }),
]
};
有关使用 polyfill 的详细信息,请参阅你最喜爱的捆绑程序的文档。
React Native支持
与浏览器类似,React Native不支持此 SDK 库使用的某些 JavaScript API,因此你需要为它们提供填充。 有关更多详细信息,请参阅 Expo 的消息传送React Native示例。
验证客户端
与事件中心的交互从 EventHubConsumerClient 类的实例或 EventHubProducerClient 类的实例开始。 有一些构造函数重载来支持实例化这些类的不同方法,如下所示:
对事件中心命名空间使用 连接字符串
其中一个构造函数重载将窗体Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;
和实体名称连接字符串到事件中心实例。 可以创建使用者组,并从Azure 门户获取连接字符串和实体名称。
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string",
"my-event-hub"
);
对事件中心上的策略使用连接字符串
另一个构造函数重载采用与直接在事件中心实例上定义的共享访问策略对应的连接字符串 (,而不是事件中心命名空间) 。
此连接字符串的格式为 Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name
。
连接字符串格式与上一个构造函数重载的主要区别在于 ;EntityPath=my-event-hub-name
。
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string-with-entity-path"
);
使用事件中心命名空间和 Azure 标识
此构造函数重载采用实现 TokenCredential 接口的事件中心实例和凭据的主机名和实体名称。 这允许使用 Azure Active Directory 主体进行身份验证。
@azure/标识包中提供了 接口的实现TokenCredential
。 主机名的格式 <yournamespace>.servicebus.windows.net
为 。 使用 Azure Active Directory 时,必须为主体分配一个允许访问事件中心的角色,例如Azure 事件中心数据所有者角色。 有关将 Azure Active Directory 授权与事件中心配合使用的详细信息,请参阅 相关文档。
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-host-name",
"my-event-hub",
credential
);
关键概念
事件中心生成者是遥测数据、诊断信息、使用情况日志或其他日志数据的源,作为嵌入式设备解决方案、移动设备应用程序、在主机或其他设备上运行的游戏、某些基于客户端或服务器的业务解决方案或网站的一部分。
事件中心使用者从事件中心获取此类信息并对其进行处理。 处理过程可能涉及聚合、复杂的计算和筛选, 也可能涉及以原始或转换方式分发或存储信息。 事件中心使用者通常是具有内置分析功能(如 Azure 流分析、Apache Spark 或 Apache Storm)的强大的大规模平台基础结构部件。
分区是事件中心内保留的有序事件。 分区是一种与事件使用者所需的并行度关联的数据组织方式。 Azure 事件中心通过分区的使用者模式提供消息流,在此模式下,每个使用者只读取消息流的特定子集或分区。 当较新的事件到达时,它们将添加到此序列的末尾。 分区数量在创建事件中心时指定,无法更改。
使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调、从自身立场独立读取流。 每个使用者组的分区上最多可以有 5 个并发读取者,但建议给定分区和使用者组配对只有一个活动的使用者。 每个活动读取器从其分区接收所有事件;如果同一分区上有多个读取器,则它们将收到重复的事件。
有关更多概念和更深入的讨论,请参阅: 事件中心功能
有关重试的指南
EventHubConsumerClient
和 EventHubProducerClient
接受 options
,可在其中设置 retryOptions
,以便优化 SDK 处理暂时性错误的方式。
暂时性错误的示例包括临时网络或服务问题。
使用事件时重试
如果暂时性错误 (例如在 SDK 接收事件时遇到临时网络问题) ,它将根据传入 的 EventHubConsumerClient
重试选项重试接收事件。
如果已用尽最大重试次数,则将调用 函数 processError
。
可以使用重试设置来控制通知临时问题(如网络连接问题)的速度。
例如,如果需要知道何时立即出现网络问题,则可以降低 和 retryDelayInMs
的值maxRetries
。
执行 processError
函数后,只要错误是可重试错误,客户端将继续从分区接收事件。 否则,客户端将调用用户提供的 processClose
函数。
当你停止订阅或客户端停止从当前分区读取事件时,也会调用此函数,因为应用程序的另一个实例作为负载均衡的一部分选取了该事件。
如果需要, processClose
函数提供了更新检查点的机会。
执行 processClose
后,客户端 (负载均衡的情况下,来自应用程序) 的另一个实例的客户端将调用用户提供的 processInitialize
函数,以从同一分区的上次更新检查点继续读取事件。
如果要停止尝试读取事件,则必须对 subscription
方法返回的 subscribe
调用 close()
。
示例
以下部分提供了代码片段,这些代码片段涵盖了使用 Azure 事件中心
检查事件中心
许多事件中心操作都在特定分区范围内进行。
由于分区由事件中心拥有,因此在创建时将分配名称。
若要了解哪些分区可用,请使用两个可用的客户端之一查询事件中心: EventHubProducerClient
或 EventHubConsumerClient
在下面的示例中,我们使用 EventHubProducerClient
。
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubProducerClient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
await client.close();
}
main();
将事件发布到事件中心
若要发布事件,将需要创建 EventHubProducerClient
。 虽然下面的示例演示了创建客户端的一种方法,但请参阅 对客户端进行身份验证 部分,了解实例化客户端的其他方法。
可以将事件发布到特定分区,或者允许事件中心服务决定应发布到哪些分区事件。 当发布事件需要高度可用或事件数据应在分区之间均匀分布时,建议使用自动路由。 在下面的示例中,我们将利用自动路由。
- 使用 createBatch Create
EventDataBatch
对象 - 使用 tryAdd 方法将事件添加到批处理。 可以执行此操作,直到达到最大批大小限制或添加完喜欢的事件数为止,以先到者为准。 此方法将返回
false
,指示由于达到最大批大小,无法再将事件添加到批处理中。 - 使用 sendBatch 方法发送事件批。
在下面的示例中,我们尝试向Azure 事件中心发送 10 个事件。
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const producerClient = new EventHubProducerClient("connectionString", "eventHubName");
const eventDataBatch = await producerClient.createBatch();
let numberOfEventsToSend = 10;
while (numberOfEventsToSend > 0) {
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
if (!wasAdded) {
break;
}
numberOfEventsToSend--;
}
await producerClient.sendBatch(eventDataBatch);
await producerClient.close();
}
main();
可以在不同阶段传递一些选项来控制将事件发送到Azure 事件中心的过程。
- 构造
EventHubProducerClient
函数采用 类型的EventHubClientOptions
可选参数,可用于指定重试次数等选项。 - 方法
createBatch
采用 类型的CreateBatchOptions
可选参数,可用于指定所创建的批处理支持的最大批大小。 - 方法
sendBatch
采用 类型的SendBatchOptions
可选参数,可用于指定abortSignal
取消当前操作。 - 如果要发送到特定分区,方法的
sendBatch
重载允许将事件发送到的分区的 ID。 上面的 检查事件中心 示例演示如何提取可用分区 ID。
注意:使用 Azure Stream Analytics 时,发送的事件正文也应是 JSON 对象。
例如: body: { "message": "Hello World" }
从事件中心使用事件
若要使用事件中心实例的事件,还需要知道要面向哪个使用者组。 知道这一点后,即可创建 EventHubConsumerClient。 虽然下面的示例演示了创建客户端的一种方法,但请参阅 对客户端进行身份验证 部分,了解实例化客户端的其他方法。
subscribe
客户端上的 方法具有重载,这些重载与构造函数结合使用,可以满足多种使用事件的方式:
方法 subscribe
采用 类型的 SubscriptionOptions
可选参数,可用于指定选项(如 maxBatchSize (事件数)等待) 和 maxWaitTimeInSeconds (等待 maxBatchSize 事件到达) 的时间。
在单个进程中使用事件
首先创建 的 EventHubConsumerClient
实例,然后在其上调用 subscribe()
方法以开始使用事件。
方法subscribe
在从 Azure 事件中心 接收事件时采用回调来处理事件。
若要停止接收事件,可以对 方法返回subscribe()
的 对象调用 close()
。
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
使用跨多个进程进行负载均衡的事件
Azure 事件中心每秒能够处理数百万个事件。 若要缩放处理应用程序,可以运行应用程序的多个实例,并使其在它们之间平衡负载。
首先,使用采用 CheckpointStore
的构造函数重载之一创建 的实例EventHubConsumerClient
,然后调用 subscribe()
方法以开始使用 事件。 检查点存储将使使用者组中的订阅者能够协调应用程序的多个实例之间的处理。
在此示例中,我们将使用 BlobCheckpointStore
包中的 @azure/eventhubs-checkpointstore-blob
,它通过使用 Azure Blob 存储 实现对持久存储的所需读/写操作。
方法subscribe
在从 Azure 事件中心 接收事件时采用回调来处理事件。
若要停止接收事件,可以对 方法返回subscribe()
的 对象调用 close()
。
const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";
async function main() {
const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);
if (!(await blobContainerClient.exists())) {
await blobContainerClient.create();
}
const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
eventHubConnectionString,
eventHubName,
checkpointStore
);
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
// event processing code goes here
if (events.length === 0) {
// If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
// will pass you an empty array.
return;
}
// Checkpointing will allow your service to pick up from
// where it left off when restarting.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(events[events.length - 1]);
},
processError: async (err, context) => {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
}
});
// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));
await subscription.close();
await consumerClient.close();
console.log(`Exiting sample`);
}
main();
有关详细信息,请参阅 跨应用程序的多个实例均衡分区负载 。
使用单个分区中的事件
首先创建 的 EventHubConsumerClient
实例,然后在其上调用 subscribe()
方法以开始使用事件。 将要面向的分区的 ID 传递给方法, subscribe()
以便仅从该分区使用。
在下面的示例中,我们使用第一个分区。
方法subscribe
在从 Azure 事件中心 接收事件时采用回调来处理事件。
若要停止接收事件,可以对 方法返回subscribe()
的 对象调用 close()
。
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
const partitionIds = await client.getPartitionIds();
// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
startPosition: earliestEventPosition
};
const subscription = client.subscribe(
partitionIds[0],
{
processEvents: async (events, context) => {
// event processing code goes here
},
processError: async (err, context) => {
// error reporting/handling code here
}
},
subscriptionOptions
);
// Wait for a few seconds to receive events before closing
setTimeout(async () => {
await subscription.close();
await client.close();
console.log(`Exiting sample`);
}, 3 * 1000);
}
main();
使用 EventHubConsumerClient 与 IotHub 配合使用
也可以使用 EventHubConsumerClient
来使用 IotHub。 这对于从链接的 EventHub 接收 IotHub 的遥测数据很有用。
关联的连接字符串将没有发送声明,因此无法发送事件。
- 请注意,连接字符串需要适用于与事件中心兼容的终结点 (例如“Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name“)
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
);
await client.getEventHubProperties();
// retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
const partitionId = "0";
await client.getPartitionProperties(partitionId);
await client.close();
}
main();
故障排除
AMQP 依赖项
事件中心库依赖于用于管理连接、通过 AMQP 协议发送和接收事件的 rhea-promise 库。
日志记录
可以将 环境变量设置为 AZURE_LOG_LEVEL
启用日志记录 stderr
:
export AZURE_LOG_LEVEL=verbose
有关如何启用日志的更详细说明,请查看 @azure/logger 包文档。
或者,可以设置 环境变量, DEBUG
以在使用此库时获取日志。
如果还希望从依赖项 rhea-promise
和 rhea
发出日志,这非常有用。
注意: AZURE_LOG_LEVEL(如果已设置)优先于 DEBUG。
指定 AZURE_LOG_LEVEL 或调用 setLogLevel 时,请勿通过 DEBUG 指定任何 azure
库。
- 仅从事件中心 SDK 获取信息级调试日志。
export DEBUG=azure:*:info
- 从事件中心 SDK 和协议级别库获取调试日志。
export DEBUG=azure*,rhea*
- 如果对查看消耗大量控制台/磁盘空间) 的原始事件数据 (不感兴趣 ,则可以
DEBUG
设置环境变量,如下所示:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
- 如果只对 错误 和 SDK 警告感兴趣,则可以按如下所示设置
DEBUG
环境变量:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
后续步骤
更多示例代码
请查看 示例 目录,获取有关如何使用此库向/从 事件中心发送和接收事件的详细示例。
贡献
若要为此库做出贡献,请阅读贡献指南,详细了解如何生成和测试代码。