你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 MQTT 代理开发高可用性应用程序

创建使用 MQTT 代理的高可用性应用程序涉及仔细考虑会话类型、服务质量 (QoS)、消息确认、并行消息处理、消息保留和共享订阅。 MQTT 代理具有分布式内存中消息中转站和存储,它通过 MQTT 语义提供消息保留和内置状态管理。

以下部分介绍有助于构建可靠、零消息丢失和分布式应用程序的设置和功能。

服务质量 (QoS)

发布服务器和订阅服务器都应使用 QoS-1 来保证消息传递至少一次。 MQTT 代理会存储并重复传输消息,直到收到接收方的确认 (ACK),确保传输期间不会丢失任何消息。

会话类型和清理会话标志

为了确保零消息丢失,在连接到 MQTT 代理时,将 clean-start 标志设置为 false。 此设置通知中转站保持客户端的会话状态,保留连接之间的订阅和未确认的消息。 如果客户端断开连接,稍后又重新连接,它将从中断的位置继续,通过消息传递重试接收任何未确认的 QoS-1 消息。 如果已配置,并且客户端在会话到期间隔(默认值为一天)内未重新连接,则 MQTT 代理会使客户端会话过期

多线程应用程序中的 Receive-Max

多线程应用程序应使用 receive-max (最大值 65,535)并行处理消息并应用流控制。 此设置通过允许多个线程同时处理消息并且不让中转站重载消息速率高于应用程序容量的应用程序,来优化消息处理。 每个线程都可以独立处理消息,并在完成时发送确认。 典型的做法是根据应用程序使用的线程数按比例配置 max-receive

确认消息

当订阅服务器应用程序发送 QoS-1 消息的确认时,它将获取消息的所有权。 收到 QoS-1 消息的确认后,MQTT 代理停止跟踪该应用程序和主题的消息。 正确移交所有权可确保在处理问题或应用程序崩溃时保留消息。 如果一个应用程序想要使其免受应用程序崩溃的影响,则该应用程序不应在成功完成对该消息的处理之前获取所有权。 订阅 MQTT 代理的应用程序应延迟确认消息,直到处理完成,最多不超过最大值为 65,535 的 receive-max 值。 这可能包括将消息或消息派生中继到 MQTT 代理进行进一步调度。

消息保留和中转站行为

中转站会保留消息,直到收到订阅服务器的确认,确保消息零丢失。 此行为可确保即使订阅服务器应用程序暂时崩溃或失去连接,消息也不会丢失,并且一旦应用程序重新连接即可进行处理。 MQTT 代理消息若配置了 Message-Expiry-Interval,并且订阅服务器未使用该消息,则可能过期。

保留的消息

保留的消息会保持临时应用程序状态,例如特定主题的最新状态或值。 当新客户端订阅主题时,它会收到最后一条保留的消息,确保它具有最新的信息。

Keep-Alive

若要确保在连接错误或删除时保持高可用性,请为客户端服务器通信设置适当的保持连接间隔。 在空闲期间,客户端发送 PINGREQs,等待 PINGRESP。 如果没有响应,在客户端中实现自动重新连接逻辑以重新建立连接。 大多数客户端(如 Paho)内置了重试逻辑。 由于 MQTT 代理可容错,如果至少有两个正常的中转站实例(一个前端,一个后端),则重新连接会成功。

使用 QoS-1 订阅达到最终一致性

使用 QoS-1 的 MQTT 订阅通过订阅共享主题来确保同一应用程序实例的最终一致性。 发布消息时,实例会通过至少一次的传递接收并复制数据。 实例必须处理重复项,并容忍临时不一致,直到数据同步。

共享订阅

共享订阅可在高可用性应用程序的多个实例之间实现负载均衡。 消息在订阅服务器之间均匀分布,而不是每个订阅服务器都接收每条消息的副本。 MQTT 代理目前仅支持使用轮循机制算法来分发允许应用程序横向扩展的消息。典型的用例是使用 Kubernetes ReplicaSet 部署多个 Pod,这些 Pod 都使用共享订阅中的相同主题筛选器订阅 MQTT 代理。

状态存储

状态存储是一个复制的内存中 HashMap,用于管理应用程序处理状态。 例如,与 etcd 不同,状态存储通过内存中数据结构、分区和链复制确定高速吞吐量、水平缩放和低延迟的优先级。 它允许应用程序在跨实例快速访问一致状态时使用状态存储的分布式特性和容错。 若要使用分布式代理提供的内置键值存储,请执行以下操作:

  • 使用中转站的键值存储 API 实现临时存储和检索操作,确保正确的错误处理和数据一致性。 临时状态是在有状态处理中使用的短期数据存储,用于在实时计算期间快速访问中间结果或元数据。 在 HA 应用程序的上下文中,临时状态有助于恢复崩溃之间的应用程序状态。 它可以写入磁盘,但仍然是临时的,而不是专为长期存储不经常访问的数据而设计的冷存储。

  • 使用状态存储在应用程序的多个实例之间共享状态、缓存、配置或其他基本数据,使它们能够保持一致的数据视图。

使用 MQTT 代理的内置 Dapr 集成

对于更简单的用例,应用程序可能利用 Dapr (Distributed Application Runtime)。 Dapr 是一种开源的可移植事件驱动运行时,可简化微服务和分布式应用程序的构建。 它提供一组构建基块,例如服务到服务调用、状态管理和发布/订阅消息传送。

Dapr 作为 MQTT 代理的一部分提供,将 MQTT 会话管理、消息 QoS 和确认,以及内置键值存储的细节抽象化,通过以下方法使其成为针对简单用例开发高可用性应用程序的实用选择:

  • 使用 Dapr 的构建基块设计应用程序,例如用于处理键值存储的状态管理,以及用于与 MQTT 中转站交互的发布/订阅消息传送。 如果用例需要 Dapr 不支持的构建基块和抽象,请考虑使用前面提到的 MQTT 代理功能。

  • 使用首选编程语言和框架实现应用程序,利用 Dapr SDK 或 API 与中转站和键值存储无缝集成。

开发具有高可用性的应用程序的清单

  • 为编程语言选择适当的 MQTT 客户端库。 客户端应支持 MQTT v5。 如果应用程序对延迟敏感,请使用基于 C 或 Rust 的库。
  • 配置客户端库以连接到 MQTT 代理中转站,并将 clean-session 标志设置为 false 和所需的 QoS 级别 (QoS-1)。
  • 为会话过期、消息过期和保持连接间隔确定合适的值。
  • 为订阅服务器应用程序实现消息处理逻辑,包括成功传递或处理消息时发送确认。
  • 对于多线程应用程序,请配置 max-receive 参数以启用并行消息处理。
  • 利用保留的消息来保留临时应用程序状态。
  • 利用分布式状态存储来管理临时应用程序状态。
  • 如果用例简单且不需要对 MQTT 连接或消息处理进行详细控制,请评估 Dapr 以开发应用程序。
  • 实现共享订阅,以在应用程序的多个实例之间均匀分发消息,从而实现高效的缩放。