如何通过 Python 使用 Azure 队列存储
概述
本文演示使用 Azure 队列存储服务的常见方案。 涵盖的方案包括插入、速览、获取和删除队列消息。 还介绍了用于创建和删除队列的代码。
本文中的示例是用 Python 编写的,并且使用了用于 Python 的 Azure 队列存储客户端库。 有关队列的详细信息,请参阅后续步骤部分。
什么是队列存储?
Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。
队列服务概念
Azure 队列服务包含以下组件:
存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述。
队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据。
消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。
URL 格式:使用以下 URL 格式对队列进行寻址:http://
<storage account>
.queue.core.windows.net/<queue>
可使用以下 URL 访问示意图中的某个队列:
http://myaccount.queue.core.windows.net/incoming-orders
创建 Azure 存储帐户
创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户。
还可使用 Azure PowerShell、Azure CLI 或适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。
如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发。
下载并安装用于 Python 的 Azure 存储 SDK
用于 Python 的 Azure 存储 SDK 需要 Python v2.7、v3.3 或更高版本。
通过 PyPI 安装
要通过 Python 包索引 (PyPI) 安装,请键入:
pip install azure-storage-queue
注意
如果要从适用于 Python 的 Azure 存储 SDK v0.36 或更早版本升级,请在安装最新软件包之前使用 pip uninstall azure-storage
卸载旧版 SDK。
有关其他安装方法,请参阅适用于 Python 的 Azure SDK。
从 Azure 门户复制凭据
当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。 若要查看存储帐户凭据,请按以下步骤操作:
登录 Azure 门户。
找到自己的存储帐户。
在存储帐户菜单窗格中的“安全性 + 网络”下,选择“访问密钥” 。 在这里,可以查看帐户访问密钥以及每个密钥的完整连接字符串。
在“访问密钥”窗格中,选择“显示密钥” 。
在“key1”部分,找到“连接字符串”值 。 选择“复制到剪贴板”图标来复制该连接字符串。 在下一部分,你要将此连接字符串值添加到某个环境变量。
配置存储连接字符串
在复制连接字符串后,请将其写入到运行该应用程序的本地计算机上的新环境变量。 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。 将 <yourconnectionstring>
替换为实际的连接字符串。
setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"
在 Windows 中添加环境变量后,必须启动命令窗口的新实例。
重新启动程序
添加环境变量后,重启需要读取环境变量的任何正在运行的程序。 例如,先重启开发环境或编辑器,然后再继续操作。
配置应用程序以访问队列存储
可通过 QueueClient
对象来处理队列。 在你希望以编程方式访问服务总线的任何 Python 文件中,将以下代码添加到顶部附近:
from azure.storage.queue import (
QueueClient,
BinaryBase64EncodePolicy,
BinaryBase64DecodePolicy
)
import os, uuid
os
包支持检索环境变量。
uuid
包支持为队列名称生成唯一标识符。
创建队列
连接字符串是从前面设置的 AZURE_STORAGE_CONNECTION_STRING
环境变量检索的。
以下代码使用存储连接字符串创建 QueueClient
对象。
# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())
# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)
# Create the queue
queue_client.create_queue()
Azure 队列消息以文本形式存储。 如果要存储二进制数据,请在将消息放入队列之前设置 Base64 编码和解码函数。
在创建客户端对象时配置 Base64 编码和解码函数。
# Setup Base64 encoding and decoding functions
base64_queue_client = QueueClient.from_connection_string(
conn_str=connect_str, queue_name=q_name,
message_encode_policy = BinaryBase64EncodePolicy(),
message_decode_policy = BinaryBase64DecodePolicy()
)
在队列中插入消息
若要在队列中插入消息,请使用 send_message
方法。
message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)
扫视消息
可以通过调用 peek_messages
方法来速览消息,而不会将其从队列中删除。 默认情况下,此方法会速览单个消息。
# Peek at the first message
messages = queue_client.peek_messages()
for peeked_message in messages:
print("Peeked message: " + peeked_message.content)
更改已排队消息的内容
可以更改队列中现有消息的内容。 如果消息表示某个任务,则可以使用此功能来更新该任务的状态。
下面的代码使用 update_message
方法来更新消息。 可见性超时设为 0,这意味着消息会立刻出现且内容将更新。
messages = queue_client.receive_messages()
list_result = next(messages)
message = queue_client.update_message(
list_result.id, list_result.pop_receipt,
visibility_timeout=0, content=u'Hello World Again')
print("Updated message to: " + message.content)
获取队列长度
可以获取队列中消息的估计数。
get_queue_properties 方法返回包括 approximate_message_count
在内的队列属性。
properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))
结果仅是近似值,因为在服务响应请求之后,可能添加或删除了消息。
取消消息的排队
通过两个步骤从队列中删除消息。 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。 在消息成功处理后调用 delete_message
。
在调用 receive_messages 时,默认情况下会获得队列中的下一条消息。 从 receive_messages
返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 delete_message。
messages = queue_client.receive_messages()
for message in messages:
print("Dequeueing message: " + message.content)
queue_client.delete_message(message.id, message.pop_receipt)
可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。
以下代码示例使用 receive_messages
方法成批获取消息。 然后,它使用嵌套的 for
循环来处理每批中的每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。
messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)
for msg_batch in messages.by_page():
for msg in msg_batch:
print("Batch dequeue message: " + msg.content)
queue_client.delete_message(msg)
删除队列
若要删除队列及其包含的所有消息,请调用 delete_queue
方法。
print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()
提示
试用 Microsoft Azure 存储资源管理器
Microsoft Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。
后续步骤
在了解了队列存储的基础知识后,可单击下面的链接了解更多信息。
有关使用已弃用的 Python 版本 2 SDK 的相关代码示例,请参阅使用 Python 版本 2 的代码示例。