如何通过 Node.js 使用 Azure 队列存储

概述

本指南展示了如何使用 Azure 队列存储完成常见方案。 相关示例是使用 Node.js API 编写的。 涵盖的方案包括插入、速览、获取和删除队列消息。 还将学习创建和删除队列。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式:使用以下 URL 格式对队列进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

创建 Node.js 应用程序

若要创建空白的 Node.js 应用程序,请参阅在 Azure 应用服务中创建 Node.js Web 应用构建 Node.js 应用程序并将其部署到 Azure 云服务(使用 PowerShell 或 Visual Studio Code)。

配置应用程序以访问存储

用于 JavaScript 的 Azure 存储客户端库包括一组便于与存储 REST 服务通信的库。

使用 Node 包管理器 (npm) 获取包

  1. 使用 PowerShell (Windows)、Terminal (Mac) 或 Bash (Unix) 等命令行界面,导航到在其中创建了示例应用程序的文件夹。

  2. 在命令窗口中键入 npm install @azure/storage-queue

  3. 验证是否已创建 node_modules 文件夹。 在该文件夹中,你会发现 @azure/storage-queue 包,其中包含访问存储所需的客户端库。

导入包

使用你的代码编辑器,将以下内容添加到要在其中使用队列的 JavaScript 文件的顶部。

const { QueueClient, QueueServiceClient } = require("@azure/storage-queue");

如何创建队列

以下代码获取名为 AZURE_STORAGE_CONNECTION_STRING 的环境变量的值并使用它创建一个 QueueServiceClient 对象。 然后,将使用此对象创建一个 QueueClient 对象,后者用于处理特定队列。

// Retrieve the connection from an environment
// variable called AZURE_STORAGE_CONNECTION_STRING
const connectionString = process.env.AZURE_STORAGE_CONNECTION_STRING;

// Create a unique name for the queue
const queueName = "myqueue-" + Date.now().toString();

console.log("Creating queue: ", queueName);

// Instantiate a QueueServiceClient which will be used
// to create a QueueClient and to list all the queues
const queueServiceClient = QueueServiceClient.fromConnectionString(connectionString);

// Get a QueueClient which will be used
// to create and manipulate a queue
const queueClient = queueServiceClient.getQueueClient(queueName);

// Create the queue
await queueClient.create();

如果队列已存在,则会引发异常。

如何设置消息的格式

消息类型为字符串。 所有消息都被视为字符串。 如果需要发送其他数据类型,则需要在发送消息时将数据类型序列化为字符串,并在读取消息时反序列化字符串格式。

若要将 JSON 转换为字符串格式并在 Node.js 中再次转换回来,请使用以下帮助程序函数:

function jsonToBase64(jsonObj) {
    const jsonString = JSON.stringify(jsonObj)
    return  Buffer.from(jsonString).toString('base64')
}
function encodeBase64ToJson(base64String) {
    const jsonString = Buffer.from(base64String,'base64').toString()
    return JSON.parse(jsonString)
}

如何在队列中插入消息

若要向队列添加消息,请调用 sendMessage 方法。

messageText = "Hello, World";
console.log("Adding message to the queue: ", messageText);

// Add a message to the queue
await queueClient.sendMessage(messageText);

如何速览下一条消息

可以通过调用 peekMessages 方法来速览队列中的消息,而不必将其从队列中删除。

默认情况下,peekMessages 扫视单条消息。 以下示例速览队列中的前五条消息。 如果可见消息少于五条,则只返回这些可见消息。

// Peek at messages in the queue
const peekedMessages = await queueClient.peekMessages({ numberOfMessages: 5 });

for (i = 0; i < peekedMessages.peekedMessageItems.length; i++) {
    // Display the peeked message
    console.log("Peeked message: ", peekedMessages.peekedMessageItems[i].messageText);
}

当队列中没有消息时,调用 peekMessages 不会返回错误。 但是,不会返回消息。

如何更改已排队消息的内容

以下示例将更新消息的文本。

通过调用 updateMessage 在队列中就地更改消息的内容。

// Get the first message in the queue
var receivedMessages = await queueClient.receiveMessages();
const firstMessage = receivedMessages.receivedMessageItems[0];

// Update the received message
await queueClient.updateMessage(
    firstMessage.messageId,
    firstMessage.popReceipt,
    "This message has been updated"
);

如何将消息取消排队

将消息取消排队是一个两阶段过程:

  1. 获取消息。

  2. 删除消息。

以下示例获取一条消息,然后将其删除。

若要获取消息,请调用 receiveMessages 方法。 此调用会使消息在队列中不可见,使其他客户端无法处理它们。 当应用程序处理完某条消息后,即可调用 deleteMessage 将其从队列中删除。

// Get next message from the queue
receivedMessages = await queueClient.receiveMessages();
var message = receivedMessages.receivedMessageItems[0];

console.log("Dequeuing message: ", message.messageText);

await queueClient.deleteMessage(message.messageId, message.popReceipt);

默认情况下,消息只会隐藏 30 秒。 30 秒后,其他客户端就会看到它。 调用 receiveMessages 时,可以通过设置 options.visibilityTimeout 指定一个不同的值。

当队列中没有消息时,调用 receiveMessages 不会返回错误。 但是,不会返回消息。

用于取消对消息进行排队的其他选项

可以通过两种方式自定义队列中的消息检索:

以下示例使用 receiveMessages 方法在一次调用中获取 5 条消息。 然后,使用 for 循环处理每条消息。 它还会将通过此方法返回的所有消息的不可见性超时设置为 5 分钟。

// Get up to 5 messages from the queue
const receivedMsgsResp = await queueClient.receiveMessages({ numberOfMessages: 5, visibilityTimeout: 5 * 60 });

for (i = 0; i < receivedMsgsResp.receivedMessageItems.length; i++)
{
    message = receivedMsgsResp.receivedMessageItems[i];
    console.log("Dequeuing message: ", message.messageText);
    await queueClient.deleteMessage(message.messageId, message.popReceipt);
}

如何获取队列长度

getProperties 方法返回有关队列的元数据,其中包括在队列中等待的消息的大致数目。

const properties = await queueClient.getProperties();
console.log("Approximate queue length: ", properties.approximateMessagesCount);

如何列出队列

若要检索队列的列表,请调用 QueueServiceClient.listQueues。 若要检索按特定前缀筛选的列表,请在调用 listQueues 时设置 options.prefix

for await (const item of queueServiceClient.listQueues()) {
  console.log("Queue: ", item.name);
}

如何删除队列

若要删除队列及其包含的所有消息,请对 QueueClient 对象调用 DeleteQueue 方法。

// Delete the queue
console.log("Deleting queue: ", queueClient.name);
await queueClient.delete();

若要清除队列中的所有消息而不删除该队列,请调用 ClearMessages

提示

查看 Azure 存储代码示例存储库

如需易用且能够下载和运行的端到端 Azure 存储代码示例,请查看我们的 Azure 存储示例列表。

后续步骤

现在,你已了解了有关队列存储的基础知识,请单击下面的链接来了解更复杂的存储任务。