你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 JavaScript 的 Azure 服务总线客户端库 - 版本 7.9.5

Azure 服务总线 是 Microsoft 提供的高度可靠的云消息传送服务。

在应用程序中使用客户端库@azure/service-bus

  • 将消息发送到 Azure 服务总线队列或主题
  • 从 Azure 服务总线队列或订阅接收消息
  • 在 Azure 服务总线命名空间中创建/获取/删除/更新/列出队列/主题/订阅/规则。

版本 7 的资源 @azure/service-bus

关键链接:

注意:如果使用版本 1.1.10 或更低版本,并且想要迁移到此包的最新版本,请查看我们的 迁移指南,从服务总线 V1 迁移到服务总线 V7

入门

安装包

使用 npm 安装 Azure 服务总线客户端库的最新版本。

npm install @azure/service-bus

目前支持的环境

先决条件

配置 TypeScript

TypeScript 用户需要安装 Node 类型定义:

npm install @types/node

还需要在tsconfig.json中启用 compilerOptions.allowSyntheticDefaultImports 。 请注意,如果已启用 compilerOptions.esModuleInteropallowSyntheticDefaultImports 则默认启用 。 有关详细信息 ,请参阅 TypeScript 的编译器选项手册

JavaScript 捆绑包

若要在浏览器中使用此客户端库,首先需要使用捆绑程序。 有关如何执行此操作的详细信息,请参阅捆绑 文档

除了其中所述的内容外,此库还需要以下 NodeJS 核心内置模块的附加填充,以便在浏览器中正常工作:

  • buffer
  • os
  • path
  • process

与 Webpack 捆绑

如果使用的是 Webpack v5,则可以安装以下开发依赖项

  • npm install --save-dev os-browserify path-browserify

然后将以下内容添加到 webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

使用汇总捆绑

如果使用汇总捆绑程序,请安装以下开发依赖项

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

然后在 rollup.config.js 中包括以下内容

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

有关使用 polyfill 的详细信息,请参阅你最喜爱的捆绑程序的文档。

React 本机支持

与浏览器类似,React Native 不支持此 SDK 库使用的某些 JavaScript API,因此你需要为它们提供填充。 有关更多详细信息,请参阅 使用 Expo 的 Messaging React Native 示例

验证客户端

与服务总线的交互从 ServiceBusClient 类的实例开始。 可以使用连接字符串或使用 Azure Active Directory 凭据向服务总线进行身份验证。

使用连接字符串

此方法将连接字符串带到服务总线实例。 从 Azure 门户可获取连接字符串。

const { ServiceBusClient } = require("@azure/service-bus");

const serviceBusClient = new ServiceBusClient("<connectionString>");

有关此构造函数的详细信息,请参阅 API 文档

使用 Azure Active Directory 凭据

使用 Azure Active Directory 进行身份验证使用 Azure 标识库

下面的示例使用 DefaultAzureCredential,这是库中的众多可用凭据提供程序之 @azure/identity 一。

const { ServiceBusClient } = require("@azure/service-bus");
const { DefaultAzureCredential } = require("@azure/identity");

const fullyQualifiedNamespace = "<name-of-service-bus-namespace>.servicebus.windows.net";
const credential = new DefaultAzureCredential();
const serviceBusClient = new ServiceBusClient(fullyQualifiedNamespace, credential);

注意:如果针对 AAD 使用自己的接口实现 TokenCredential ,请将服务总线的“作用域”设置为以下内容以获取相应的令牌:

["https://servicebus.azure.net//user_impersonation"];

有关此构造函数的详细信息,请参阅 API 文档

关键概念

初始化 后, ServiceBusClient可以与服务总线命名空间中的这些资源进行交互:

  • 队列:允许发送和接收消息。 通常用于点到点通信。
  • 主题:与队列不同,主题更适合发布/订阅方案。 主题可以发送到 ,但需要订阅,其中可以有多个并行使用。
  • 订阅:从主题使用的机制。 每个订阅都是独立的,并接收发送到主题的每条消息的副本。 规则和筛选器可用于定制特定订阅接收的消息。

有关这些资源的详细信息,请参阅 什么是 Azure 服务总线?

若要与这些资源交互,应熟悉以下 SDK 概念:

请注意,应先创建队列、主题和订阅,然后再使用此库。

示例

以下部分提供了涵盖使用 Azure 服务总线的一些常见任务的代码片段

发送消息

创建类的ServiceBusClient实例后,可以使用可用于发送消息的 createSender 方法获取 ServiceBusSender

const sender = serviceBusClient.createSender("my-queue");

const messages = [
  { body: "Albert Einstein" },
  { body: "Werner Heisenberg" },
  { body: "Marie Curie" },
  { body: "Steven Hawking" },
  { body: "Isaac Newton" },
  { body: "Niels Bohr" },
  { body: "Michael Faraday" },
  { body: "Galileo Galilei" },
  { body: "Johannes Kepler" },
  { body: "Nikolaus Kopernikus" }
];

// sending a single message
await sender.sendMessages(messages[0]);

// sending multiple messages in a single call
// this will fail if the messages cannot fit in a batch
await sender.sendMessages(messages);

// Sends multiple messages using one or more ServiceBusMessageBatch objects as required
let batch = await sender.createMessageBatch();

for (let i = 0; i < messages.length; i++) {
  const message = messages[i];
  if (!batch.tryAddMessage(message)) {
    // Send the current batch as it is full and create a new one
    await sender.sendMessages(batch);
    batch = await sender.createMessageBatch();

    if (!batch.tryAddMessage(messages[i])) {
      throw new Error("Message too big to fit in a batch");
    }
  }
}
// Send the batch
await sender.sendMessages(batch);

接收消息

创建类的ServiceBusClient实例后,可以使用 createReceiver 方法获取 ServiceBusReceiver

const receiver = serviceBusClient.createReceiver("my-queue");

有两 receiveMode个 可用。

  • “peekLock” - 在 peekLock 模式下,接收方在队列中指定的持续时间内锁定消息。
  • “receiveAndDelete” - 在 receiveAndDelete 模式下,消息在收到时从服务总线中删除。

如果选项中未提供 receiveMode,则默认为“peekLock”模式。 还可以在“peekLock”模式下 结算收到的消息

可以通过以下 3 种方式之一使用此接收方接收消息:

获取消息数组

使用 receiveMessages 函数,该函数返回解析为消息数组的承诺。

const myMessages = await receiver.receiveMessages(10);

使用消息处理程序订阅

使用 subscribe 方法设置消息处理程序,并根据需要使其运行。

完成后,调用 receiver.close() 以停止接收任何更多消息。

const myMessageHandler = async (message) => {
  // your code here
  console.log(`message.body: ${message.body}`);
};
const myErrorHandler = async (args) => {
  console.log(
    `Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
    args.error
  );
};
receiver.subscribe({
  processMessage: myMessageHandler,
  processError: myErrorHandler
});

使用异步迭代器

使用 getMessageIterator 获取消息的异步迭代器

for await (let message of receiver.getMessageIterator()) {
  // your code here
}

解决消息

收到消息后,可以根据消息的解决方式对接收方调用 completeMessage()abandonMessage()deferMessage()deadLetterMessage()

若要了解详细信息,请阅读 Settling Received Messages

死信队列

死信队列是 子队列。 每个队列或订阅都有自己的死信队列。 死信队列存储通过 receiver.deadLetterMessage()) 显式死信 (的消息,或超过其最大传递计数的消息。

为死信子队列创建接收方类似于为订阅或队列创建接收方:

// To receive from a queue's dead letter sub-queue
const deadLetterReceiverForQueue = serviceBusClient.createReceiver("queue", {
  subQueueType: "deadLetter"
});

// To receive from a subscription's dead letter sub-queue
const deadLetterReceiverForSubscription = serviceBusClient.createReceiver("topic", "subscription", {
  subQueueType: "deadLetter"
});

// Dead letter receivers work like any other receiver connected to a queue
// ex:
const messages = await deadLetterReceiverForQueue.receiveMessages(5);

for (const message of messages) {
  console.log(`Dead lettered message: ${message.body}`);
}

更全面地演示死信队列的完整示例:

使用会话发送消息

使用会话需要创建启用了会话的队列或订阅。 可 在此处阅读有关如何配置此功能的详细信息。

若要将消息发送到会话,请使用 ServiceBusClient 通过 createSender 创建发件人。

发送消息时,请在消息中设置 sessionId 属性,以确保消息进入正确的会话。

const sender = serviceBusClient.createSender("my-session-queue");
await sender.sendMessages({
  body: "my-message-body",
  sessionId: "my-session"
});

在此处阅读有关会话工作原理的详细信息。

从会话接收消息

使用会话需要创建启用了会话的队列或订阅。 可 在此处阅读有关如何配置此功能的详细信息。

与未启用会话的队列或订阅不同,只有单个接收方可以随时从会话中读取数据。 这是通过 锁定 由服务总线处理的会话来强制实施的。 从概念上讲,这类似于使用 peekLock 模式时消息锁定的工作原理 - 当消息 (或会话) 被锁定时,接收方具有对该模式的独占访问权限。

若要打开和锁定会话,请使用 的 ServiceBusClient 实例创建 SessionReceiver

可通过两种方式选择要打开的会话:

  1. 指定一个 sessionId,用于锁定命名会话。

    const receiver = await serviceBusClient.acceptSession("my-session-queue", "my-session");
    
  2. 不要指定会话 ID。在这种情况下,服务总线将找到尚未锁定的下一个可用会话。

    const receiver = await serviceBusClient.acceptNextSession("my-session-queue");
    

    可以通过 上的 SessionReceiver属性找到会话sessionId的名称。 如果选项中未提供 receiveMode,则默认为“peekLock”模式。 还可以在“peekLock”模式下 结算收到的消息

创建接收方后,可以在接收消息的 3 种方法之间进行选择:

在此处阅读有关会话工作原理的详细信息。

管理服务总线命名空间的资源

ServiceBusAdministrationClient 允许对实体 (队列、主题和订阅) 以及订阅规则执行 CRUD 操作来管理命名空间。

  • 支持使用服务总线连接字符串以及来自 @azure/identity 类似于 的 AAD 凭据进行身份验证 ServiceBusClient

注意:服务总线尚不支持为命名空间设置 CORS 规则,因此 ServiceBusAdministrationClient ,如果不禁用 Web 安全性,则无法在浏览器中运行。 有关详细信息,请参阅 此处

// Get the connection string from the portal
// OR
// use the token credential overload, provide the host name of your Service Bus instance and the AAD credentials from the @azure/identity library
const serviceBusAdministrationClient = new ServiceBusAdministrationClient("<connectionString>");

// Similarly, you can create topics and subscriptions as well.
const createQueueResponse = await serviceBusAdministrationClient.createQueue(queueName);
console.log("Created queue with name - ", createQueueResponse.name);

const queueRuntimeProperties = await serviceBusAdministrationClient.getQueueRuntimeProperties(
  queueName
);
console.log("Number of messages in the queue = ", queueRuntimeProperties.totalMessageCount);

await serviceBusAdministrationClient.deleteQueue(queueName);

故障排除

下面是开始诊断问题的一些初始步骤。 有关详细信息,请参阅 服务总线故障排除指南

AMQP 依赖项

服务总线库依赖于用于管理连接、通过 AMQP 协议发送和接收消息的 rhea-promise 库。

日志记录

使用此库时,可设置以下环境变量来获取调试日志。

  • 从服务总线 SDK 获取调试日志
export DEBUG=azure*
  • 从服务总线 SDK 和协议级别库获取调试日志。
export DEBUG=azure*,rhea*
  • 如果对查看消耗大量控制台/磁盘空间) 的消息转换 (不感兴趣,则可以按如下所示设置 DEBUG 环境变量:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:core-amqp:datatransformer
  • 如果只对 错误感兴趣,则可以按如下所示设置 DEBUG 环境变量:
export DEBUG=azure:service-bus:error,azure:core-amqp:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

记录到文件

  1. DEBUG如上所示设置环境变量
  2. 按如下所示运行测试脚本:
  • 将测试脚本中的日志记录语句转到 out.log ,sdk 中的日志记录语句将转到 debug.log
    node your-test-script.js > out.log 2>debug.log
    
  • 通过将 stderr 重定向到 stdout (&1) ,然后将 stdout 重定向到文件,将测试脚本和 sdk 中的日志记录语句转到同 out.log 一文件:
    node your-test-script.js >out.log 2>&1
    
  • 将测试脚本和 sdk 中的语句记录到同一文件 out.log
      node your-test-script.js &> out.log
    

后续步骤

请查看 示例 目录,获取有关如何使用此库向 /从服务总线队列、主题和订阅发送和接收消息的详细示例。

贡献

若要为此库做出贡献,请阅读贡献指南,详细了解如何生成和测试代码。

曝光数