如何通过 Java 使用队列存储
概述
本指南演示了如何为使用 Azure 队列存储服务的常见方案编写代码。 这些示例用 Java 编写并使用用于 Java 的 Azure 存储 SDK。 方案包括插入、速览、获取和删除队列消息。 还介绍了用于创建和删除队列的代码。 有关队列的详细信息,请参阅后续步骤部分。
什么是队列存储?
Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。
队列服务概念
Azure 队列服务包含以下组件:
存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述。
队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据。
消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。
URL 格式:使用以下 URL 格式对队列进行寻址:http://
<storage account>
.queue.core.windows.net/<queue>
可使用以下 URL 访问示意图中的某个队列:
http://myaccount.queue.core.windows.net/incoming-orders
创建 Azure 存储帐户
创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户。
还可使用 Azure PowerShell、Azure CLI 或适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。
如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发。
创建 Java 应用程序
首先,验证你的开发系统是否满足用于 Java 的 Azure 队列存储客户端库 v12中列出的先决条件。
创建名为 queues-how-to-v12
的 Java 应用程序:
在控制台窗口(例如 cmd、PowerShell 或 Bash)中,使用 Maven 创建名为 blob-quickstart-v12 的新控制台应用
queues-how-to-v12
。 键入以下mvn
命令,创建“hello world”Java 项目。mvn archetype:generate \ --define interactiveMode=n \ --define groupId=com.queues.howto \ --define artifactId=queues-howto-v12 \ --define archetypeArtifactId=maven-archetype-quickstart \ --define archetypeVersion=1.4
mvn archetype:generate ` --define interactiveMode=n ` --define groupId=com.queues.howto ` --define artifactId=queues-howto-v12 ` --define archetypeArtifactId=maven-archetype-quickstart ` --define archetypeVersion=1.4
生成项目的输出应如下所示:
[INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> [INFO] [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< [INFO] [INFO] [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- [INFO] Generating project in Batch mode [INFO] ---------------------------------------------------------------------------- [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4 [INFO] ---------------------------------------------------------------------------- [INFO] Parameter: groupId, Value: com.queues.howto [INFO] Parameter: artifactId, Value: queues-howto-v12 [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: com.queues.howto [INFO] Parameter: packageInPathFormat, Value: com/queues/howto [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: com.queues.howto [INFO] Parameter: groupId, Value: com.queues.howto [INFO] Parameter: artifactId, Value: queues-howto-v12 [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 6.775 s [INFO] Finished at: 2020-08-17T15:27:31-07:00 [INFO] ------------------------------------------------------------------------
切换到新创建的
queues-howto-v12
目录。cd queues-howto-v12
安装包
在文本编辑器中打开 pom.xml 文件pom.xml
。 将以下依赖项元素添加到依赖项组。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-queue</artifactId>
<version>12.6.0</version>
</dependency>
配置应用程序以访问队列存储
将下列 import 语句添加到需要在其中使用 Azure 存储 API 来访问队列的 Java 文件的顶部:
// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;
设置 Azure 存储连接字符串
Azure 存储客户端使用存储连接字符串来访问数据管理服务。 获取 Azure 门户中列出的你的存储帐户的名称和主访问密钥。 将它们用作连接字符串中的 AccountName
和 AccountKey
值。 此示例演示如何声明一个静态字段以保存连接字符串:
// Define the connection-string with your values
final String connectStr =
"DefaultEndpointsProtocol=https;" +
"AccountName=your_storage_account;" +
"AccountKey=your_storage_account_key";
以下示例假设你有一个包含存储连接字符串的 String
对象。
如何:创建队列
QueueClient
对象包含用于与队列进行交互的操作。 以下代码创建 QueueClient
对象。 使用该 QueueClient
对象创建要使用的队列。
public static String createQueue(String connectStr)
{
try
{
// Create a unique name for the queue
String queueName = "queue-" + java.util.UUID.randomUUID();
System.out.println("Creating queue: " + queueName);
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queue = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Create the queue
queue.create();
return queue.getQueueName();
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
return null;
}
}
如何:向队列添加消息
若要在现有队列中插入消息,请调用 sendMessage
方法。 消息可以是字符串(UTF-8 格式)或字节数组。 下面是将字符串消息发送到队列的代码。
public static void addQueueMessage
(String connectStr, String queueName, String messageText)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
System.out.println("Adding message to the queue: " + messageText);
// Add a message to the queue
queueClient.sendMessage(messageText);
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:扫视下一条消息
通过调用 peekMessage
,可以速览队列前面的消息,而不会从队列中删除它。
public static void peekQueueMessage
(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Peek at the first message
PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:更改已排队消息的内容
可以更改队列中现有消息的内容。 如果消息表示一个工作任务,则可以使用此功能来更新状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 30 秒。 延长可见性超时会再给客户端 30 秒时间来继续处理该消息。 你还可以保留重试计数。 如果消息重试了 n 次以上,则你会将其删除。 此方案可避免每次处理某条消息时都触发应用程序错误。
下面的代码示例将在消息队列中进行搜索,查找与搜索字符串匹配的第一个消息内容,对消息内容进行修改,然后退出。
public static void updateQueueMessage
(String connectStr, String queueName,
String searchString, String updatedContents)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// The maximum number of messages to retrieve is 32
final int MAX_MESSAGES = 32;
// Iterate through the queue messages
for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
{
// Check for a specific string
if (message.getMessageText().equals(searchString))
{
// Update the message to be visible in 30 seconds
queueClient.updateMessage(message.getMessageId(),
message.getPopReceipt(),
updatedContents,
Duration.ofSeconds(30));
System.out.println(
String.format("Found message: \'%s\' and updated it to \'%s\'",
searchString,
updatedContents)
);
break;
}
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
以下代码示例只更新队列中的第一个可见消息。
public static void updateFirstQueueMessage
(String connectStr, String queueName, String updatedContents)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Get the first queue message
QueueMessageItem message = queueClient.receiveMessage();
// Check for a specific string
if (null != message)
{
// Update the message to be visible in 30 seconds
UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
message.getPopReceipt(),
updatedContents,
Duration.ofSeconds(30));
System.out.println("Updated the first message with the receipt: " +
result.getPopReceipt());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:获取队列长度
可以获取队列中消息的估计数。
getProperties
方法可返回多个值,包括队列中的当前消息数。 此计数仅为近似值,因为可能会在请求后添加或删除消息。
getApproximateMessageCount
方法可返回通过调用 getProperties
检索到的最后一个值,而不会调用队列存储。
public static void getQueueLength(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
QueueProperties properties = queueClient.getProperties();
long messageCount = properties.getApproximateMessagesCount();
System.out.println(String.format("Queue length: %d", messageCount));
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:取消对下一条消息的排队
代码通过两个步骤来取消对队列中某条消息的排队。 调用 receiveMessage
时,会获得队列中的下一条消息。 从 receiveMessage
返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 deleteMessage
。 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。 代码在处理消息后会立即调用 deleteMessage
。
public static void dequeueMessage(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Get the first queue message
QueueMessageItem message = queueClient.receiveMessage();
// Check for a specific string
if (null != message)
{
System.out.println("Dequeing message: " + message.getMessageText());
// Delete the message
queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
}
else
{
System.out.println("No visible messages in queue");
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
用于取消对消息进行排队的其他选项
可以通过两种方式自定义队列中的消息检索。 首先,获取一批消息(最多 32 条)。 其次,设置更长或更短的不可见超时时间,从而允许代码使用更多或更少的时间来完全处理每个消息。
以下代码示例使用 receiveMessages
方法在一个调用中获取 20 条消息。 然后,使用 for
循环处理每条消息。 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。 超时同时针对所有消息启动。 自调用 receiveMessages
起经过五分钟后,未删除的任何消息都将再次变得可见。
public static void dequeueMessages(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// The maximum number of messages to retrieve is 20
final int MAX_MESSAGES = 20;
// Retrieve 20 messages from the queue with a
// visibility timeout of 300 seconds (5 minutes)
for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
{
// Do processing for all messages in less than 5 minutes,
// deleting each message after processing.
System.out.println("Dequeing message: " + message.getMessageText());
queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:列出队列
若要获取当前队列的列表,请调用 QueueServiceClient.listQueues()
方法,它将返回 QueueItem
对象的集合。
public static void listQueues(String connectStr)
{
try
{
// Instantiate a QueueServiceClient which will be
// used to list the queues
QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
.connectionString(connectStr)
.buildClient();
// Loop through the collection of queues.
for (QueueItem queue : queueServiceClient.listQueues())
{
// Output each queue name.
System.out.println(queue.getName());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:删除队列
若要删除队列及其包含的所有消息,请对 QueueClient
对象调用 delete
方法。
public static void deleteMessageQueue(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
System.out.println("Deleting queue: " + queueClient.getQueueName());
// Delete the queue
queueClient.delete();
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
后续步骤
现在,你已了解了有关队列存储的基础知识,请单击下面的链接来了解更复杂的存储任务。
有关使用已弃用的 Java 版本 8 SDK 的相关代码示例,请参阅使用 Java 版本 8 的代码示例。