Compartir vía


Desarrollo de aplicaciones de alta disponibilidad con el corredor MQTT

La creación de una aplicación de alta disponibilidad mediante el corredor MQTT implica una cuidadosa consideración de los tipos de sesión, la calidad del servicio (QoS), las confirmaciones de mensajes, el procesamiento paralelo de mensajes, la retención de mensajes y las suscripciones compartidas. El corredor MQTT cuenta con un agente de mensajes distribuido en memoria y un almacén que proporciona retención de mensajes y administración de estado integrada con la semántica MQTT.

Las siguientes secciones explican los valores y características que contribuyen a una aplicación distribuida, sólida y sin pérdida de mensajes.

Calidad del servicio (QoS)

Tanto los publicadores como los suscriptores deben usar QoS-1 para garantizar la entrega de mensajes al menos una vez. El corredor MQTT almacena y retransmite mensajes hasta que recibe una confirmación (ACK) del destinatario, lo que garantiza que no se pierda ningún mensaje durante la transmisión.

Tipo de sesión y marca Limpieza de sesión

Para garantizar la pérdida de mensajes cero, establezca la marca de inicio limpio en false al conectarse al corredor MQTT. Esta configuración informa al agente para mantener el estado de sesión del cliente, conservando las suscripciones y los mensajes no reconocidos entre las conexiones. Si el cliente se desconecta y después se vuelve a conectar, se reanuda desde donde se dejó, recibiendo cualquier mensaje QoS-1 no reconocido a través de reintento de entrega de mensajes. Si está configurado, el corredor MQTT expira la sesión de cliente si el cliente no se vuelve a conectar dentro del Intervalo de expiración de sesión El valor predeterminado es un día.

Recepción máxima en aplicaciones multiproceso

Las aplicaciones multiproceso deben usar recepción máxima (65.535 máximo) para procesar mensajes en paralelo y aplicar el control de flujo. Este valor optimiza el procesamiento de mensajes al permitir que varios subprocesos funcionen en mensajes simultáneamente y sin que el agente sobrecargue la aplicación con una alta tasa de mensajes por encima de la capacidad de la aplicación. Cada subproceso puede procesar un mensaje de forma independiente y enviar su confirmación tras la finalización. Una práctica habitual es configurar la recepción máxima proporcionalmente al número de subprocesos que usa la aplicación.

Confirmación de mensajes

Cuando una aplicación de suscriptor envía una confirmación para un mensaje QoS-1, toma la propiedad del mensaje. Tras recibir la confirmación de un mensaje QoS-1, el corredor MQTT detiene el seguimiento del mensaje de esa aplicación y tema. La transferencia adecuada de propiedad garantiza la conservación de mensajes en caso de incidencias de procesamiento o bloqueos de aplicación. Si una aplicación quiere protegerla de los bloqueos de la aplicación, la aplicación no debe tomar propiedad antes de completar correctamente su procesamiento en ese mensaje. Las aplicaciones que se suscriben al corredor MQTT deben retrasar la confirmación de los mensajes hasta que el procesamiento se complete hasta el valor de recepción máximo con un máximo de 65 535. Esto puede incluir la retransmisión del mensaje, o un derivado del mensaje, al corredor MQTT para su posterior envío.

Retención de mensajes y comportamiento del agente

El agente conserva los mensajes hasta que recibe una confirmación de un suscriptor, lo que garantiza una pérdida de mensajes cero. Este comportamiento garantiza que incluso si una aplicación de suscriptor se bloquea o pierde la conectividad temporalmente, los mensajes no se perderán y se pueden procesar una vez que la aplicación se vuelva a conectar. Los mensajes del corredor MQTT podrían expirar si se configura por el Message-Expiry-Interval y un suscriptor no ha consumido el mensaje.

Mensajes retenidos

Mensajes retenidos mantienen el estado temporal de la aplicación, como el estado o el valor más recientes de un tema específico. Cuando un nuevo cliente se suscribe a un tema, recibe el último mensaje retenido, asegurándose de que tiene la información más actualizada.

Keep-Alive

Para garantizar la alta disponibilidad en caso de errores de conexión o caídas, establezca intervalos de mantenimiento de adecuados para la comunicación entre cliente y servidor. Durante los períodos de inactividad, los clientes envían PINGREQ, esperando PINGRESP. Si no hay respuesta, implemente la lógica de reconexión automática en el cliente para restablecer las conexiones. La mayoría de los clientes como Paho tienen lógica de reintento integrada. Como el corredor MQTT es tolerante a errores, una reconexión se realiza correctamente si hay al menos dos instancias de agente en buen estado un front-end y un back-end.

Coherencia eventual con la suscripción QoS-1

Las suscripciones de MQTT con QoS-1 garantizan la coherencia eventual en instancias de aplicación idénticas mediante la suscripción a un tema compartido. A medida que se publican los mensajes, las instancias reciben y replican datos con la entrega al menos una vez. Las instancias deben manipular duplicados y tolerar incoherencias temporales hasta que se sincronicen los datos.

Suscripciones compartidas

Las suscripciones compartidas habilita el equilibrio de carga entre varias instancias de una aplicación de alta disponibilidad. En lugar de cada suscriptor que recibe una copia de cada mensaje, los mensajes se distribuyen uniformemente entre los suscriptores. El corredor MQTT actualmente solo admite un algoritmo round robin para distribuir mensajes que permiten a una aplicación escalar horizontalmente. Un caso de uso típico es implementar varios pods mediante ReplicaSet de Kubernetes que todos se suscriben al corredor MQTT mediante el mismo filtro de tema en la suscripción compartida.

Almacén de estado

El almacén de estado es un objeto HashMap replicado en memoria para administrar el estado de procesamiento de la aplicación. A diferencia de etcd, por ejemplo, el almacén de estado prioriza el rendimiento de alta velocidad, el escalado horizontal y la baja latencia a través de estructuras de datos en memoria, creación de particiones y replicación en cadena. Permite que las aplicaciones usen la naturaleza distribuida del almacén de estado y la tolerancia a errores, a la vez que acceden a un estado coherente rápidamente entre instancias. Para usar el almacén de clave-valor integrado proporcionado por el agente distribuido:

  • Implementar operaciones de almacenamiento efímero y recuperación mediante la API de almacén clave-valor del agente, lo que garantiza el control de errores adecuado y la coherencia de los datos. El estado efímero es un almacenamiento de datos de corta duración que se usa en el procesamiento con estado para obtener acceso rápido a los resultados intermedios o los metadatos durante los cálculos en tiempo real. En el contexto de la aplicación de alta disponibilidad, un estado efímero ayuda a recuperar los estados de la aplicación entre bloqueos. Se puede escribir en el disco, pero permanece temporal, en lugar de almacenamiento en frío diseñado para el almacenamiento a largo término de datos a los que se accede con poca frecuencia.

  • Use el almacén de estado para compartir el estado, el almacenamiento en caché, la configuración u otros datos esenciales entre varias instancias de la aplicación, lo que les permite mantener una vista coherente de los datos.

Uso de la integración integrada de Dapr del corredor MQTT

Para casos de uso más sencillos, una aplicación podría usar Dapr ( Distribuida Application Runtime). Dapr es un runtime de código abierto, portátil y controlado por eventos que simplifica la creación de microservicios y aplicaciones distribuidas. Ofrece un conjunto de bloques de compilación, como la invocación de servicio a servicio, la administración de estado y la mensajería de publicación o suscripción.

Dapr se ofrece como parte del agente MQTT, abstrayendo detalles de administración de sesiones MQTT, QoS de mensajes y confirmaciones, y almacenes integrados de clave-valor, lo que lo convierte en una opción práctica para desarrollar una aplicación de alta disponibilidad para casos de uso sencillos por:

  • Diseñe la aplicación con los bloques de compilación de Dapr, como la administración de estado para controlar el almacén de clave-valor y la mensajería de publicación o suscripción para interactuar con el agente MQTT. Si el caso de uso requiere bloques de creación y abstracciones que no son compatibles con Dapr, considere la posibilidad de usar las características de corredor MQTT mencionadas anteriormente.

  • Implemente la aplicación mediante el lenguaje de programación y el marco de trabajo preferidos, aprovechando los SDK de Dapr o las API para una integración perfecta con el agente y el almacén de clave-valor.

Lista de comprobación para desarrollar una aplicación de alta disponibilidad

  • Elija una biblioteca cliente MQTT adecuada para el lenguaje de programación. El cliente debe admitir MQTT v5. Use una biblioteca de base C o Rust si su aplicación es sensible a la latencia.
  • Configure la biblioteca cliente para conectarse al corredor MQTT con marca de sesión limpia establecida en false y el nivel de QoS deseado (QoS-1).
  • Decida un valor adecuado para la expiración de la sesión, la expiración del mensaje y los intervalos de mantenimiento activo.
  • Implemente la lógica de procesamiento de mensajes para la aplicación de suscriptor, incluido el envío de una confirmación cuando el mensaje se haya entregado o procesado correctamente.
  • En el caso de las aplicaciones multiproceso, configure el parámetro recepción máximapara habilitar el procesamiento de mensajes paralelos.
  • Use mensajes retenidos para mantener el estado temporal de la aplicación.
  • Use el almacén de estado distribuido para administrar el estado de la aplicación efímera.
  • Evalúe Dapr para desarrollar la aplicación si el caso de uso es sencillo y no requiere control detallado sobre la conexión MQTT o el control de mensajes.
  • Implemente suscripciones compartidas para distribuir mensajes uniformemente entre varias instancias de la aplicación, lo que permite un escalado eficaz.