队列触发器
消息队列是一个软件组件,用于处理进程、线程或应用程序之间的消息传递。 队列可以存储消息,辅助角色可以在合适的时候提取消息。
在云中,消息队列可以生成具有有效负载的事件。 诸如 Azure Functions 等服务可以侦听此类消息,并可在发布消息时运行其代码。
使用消息队列
若要使 Azure 函数能够使用来自消息队列的消息,则其需要触发器,可能还要绑定。
Azure 函数需要侦听特定队列,以便当新消息发布到该队列时触发其代码。 若要设置触发器,需要提供适当的凭据,以便触发器代码知道如何连接到消息队列。 在“function.json”文件中为侦听队列的函数创建条目。 在 bindings
元素中,在条目上指定以下属性:
属性 | Value |
---|---|
name |
可在代码中引用的名称 |
type |
queueTrigger |
direction |
in |
queueName |
调用队列的内容 |
connection |
“local.settings.json”中的配置变量 |
可以按如下所示定义示例条目:
{
"name": "myQueueItem",
"type": "queueTrigger",
"direction": "in",
"queueName": "messages-incoming",
"connection": "AzureWebJobsStorage"
}
如果此队列位于存储帐户上,则 AzureWebJobsStorage
值是连接字符串的值。
使用队列中的消息时,严格意义上来说,不需要绑定。 但是,如果要写入队列,则需要输出绑定。 使用此类绑定时,可获取对预期队列的引用。
注意
目前,队列仅支持输出绑定。
本地开发
作为开发人员,你需要简短的反馈周期。 你还希望确保开发人员体验尽可能接近生产环境。 实现这两个目标的一种方式是使用队列仿真器。
使用队列仿真器可以模拟 Azure 函数将响应的真实队列消息。 若要关闭仿真器:
安装仿真器。 在 Visual Studio Code 中搜索“Azurite”或下载“Azurite 扩展”。
若要使用仿真器功能,请首先在命令面板中选择“Azure: 启动队列服务”。
运行此命令将启动名为 Azure 存储资源管理器的侦听器,另一个应用程序可以使用该侦听器。 存储资源管理器是一个客户端应用程序支持你浏览云资源并使用仿真器功能。
下载 Azure 存储资源管理器。 然后打开应用程序,你将看到以下指示,指示模拟器正在工作:
在仿真器上创建队列。 将使用此队列作为配置函数终结点的一部分。 通过右键单击队列元素,可以创建一个新队列。
若要确保 Functions 应用使用仿真器,需要正确设置连接字符串。 打开“local.settings.json”,找到
AzureWebJobsStorage
,然后赋予其值"UseDevelopmentStorage=true"
。注意
在移动到云时,请记得以不同方式设置此属性。 在生产时,它应指向 Azure 上的实际资源。
构建函数
现已设置本地仿真器,并且其上有一个队列。 你还将项目配置为指向本地仿真器。 现在,需要创建一个函数来处理队列触发器。
创建函数终结点
你已准备好创建一个可以处理传入队列消息的函数。 为函数创建一个文件夹,然后为其命名,例如:queueTrigger
。 然后创建“function.json”文件,并为其提供以下内容:
{
"bindings": [{
"name" "queueItem",
"type": "queueTrigger",
"direction": "in",
"queueName" : "items",
"connection": "AzureWebJobsStorage"
}]
}
name
元素的值非常重要,因为稍后将在代码中引用它,以分析来自队列的传入数据。 它需要为类型 queueTrigger
,以便队列在有新消息时触发它。
queueName
元素唯一标识要与之交互的队列。 在此处输入的任何内容都需要与在仿真器中调用队列或稍后在 Azure 中调用的实际队列匹配。
connection
元素指向“local.settings.json”中 AzureWebJobsStorage
元素的值。
处理队列消息
若要处理传入队列消息,需要编写可以分析出所需消息的代码。 此时,你可以决定接下来要执行的操作。 例如,可以启动 Web 请求,将该消息放入另一个队列,或将消息发送到数据库。
设置路由
需要一个路由来处理传入请求。 Azure Functions 将处理对根目录下队列的请求。 按如下所示设置路由时,请求将调用为 http://localhost:<port>/queueTrigger
:
http.HandleFunc("/queueTrigger", handleQueueTrigger)
对请求进行解码
在将队列消息发送给你时,它将具有此形状:
{
"Data": {
"queueItem": "your message"
},
"Metadata": {
"DequeueCount": 1,
"ExpirationTime": "2019-10-16T17:58:31+00:00",
"Id": "800ae4b3-bdd2-4c08-badd-f08e5a34b865",
"InsertionTime": "2019-10-09T17:58:31+00:00",
"NextVisibleTime": "2019-10-09T18:08:32+00:00",
"PopReceipt": "AgAAAAMAAAAAAAAAAgtnj8x+1QE=",
"sys": {
"MethodName": "QueueTrigger",
"UtcNow": "2019-10-09T17:58:32.2205399Z",
"RandGuid": "24ad4c06-24ad-4e5b-8294-3da9714877e9"
}
}
}
作为解码该传入请求的一部分,需要一个对前一条消息进行建模的帮助程序结构。 应如下所示:
type InvokeRequest {
Data map[string]json.RawMessage
Metadata map[string]interface{}
}
开始编写代码以接受该传入请求并进行解码:
func handleQueueTrigger(w http.ResponseWrite, r *http.Request) {
var invokeRequest InvokeRequest
d := json.NewDecoder(r.Body)
d.Decode(&invokeRequest)
}
现在,你已对请求进行了解码,但还需要分析出队列消息本身。
分析队列消息
对请求进行解码后,可以从对 Data
属性的请求中检索队列消息。 还需要通过在“function.json”文件中设置的 name
属性值来引用消息。 用于检索消息的代码是单行代码,如下所示:
invokeRequest.Data["queueItem"]
由于你需要能够以纯文本格式读取此消息,因此将使用 JSON 库并进行分析。 JSON 库将使用采用两个参数的 Unmarshal()
方法:要分析的消息和要在其上放置已分析消息的变量。 所以你的代码需要如下所示:
var parsedMessage string
json.Unmarshal(invokeRequest.Data["queueItem"], &parsedMessage)
此时,parsedMessage
包含了你的消息。 如果要打印到控制台,请使用以下代码:
fmt.Println(parsedMessage) // your message
注意
如果消息比字符串更高级,则 parsedMessage
需要有一个与 queueMessage
所指向内容的形状相匹配的结构。
触发消息
若要测试应用程序,可以使用 Azure 存储资源管理器。 在工具的右窗格中,选择“添加消息”按钮,可在队列上创建消息。
如果你的 Functions 应用此时正在运行,它将触发绑定,你的代码将被调用。