次の方法で共有


MQTT ブローカーを使用して高可用性アプリケーションを開発する

MQTT ブローカーを使用して高可用性アプリケーションを作成するには、セッションの種類、サービスの品質 (QoS)、メッセージ受信確認、並列メッセージ処理、メッセージの保持、共有サブスクリプションを慎重に検討する必要があります。 MQTT ブローカーは、MQTT セマンティクスを使用してメッセージの保持と組み込みの状態管理を提供する、分散型のインメモリ メッセージ ブローカーとストアを備えています。

次のセクションでは、堅牢でメッセージの損失が発生しない、分散型アプリケーションに寄与する設定と機能について説明します。

サービスの品質 (QoS)

パブリッシャーとサブスクライバーは両方とも、QoS-1 を使用して、少なくとも 1 回はメッセージ配信を保証する必要があります。 MQTT ブローカーは、受信者から受信確認 (ACK) を受信するまでメッセージを保存および再送信し、送信中にメッセージが失われないようにします。

セッションの種類とクリーン セッション フラグ

メッセージ損失が発生しないようにするには、MQTT ブローカーに接続するときに、clean-start フラグを false に設定します。 この設定を使用すると、クライアントのセッション状態を維持するようにブローカーに通知され、接続間のサブスクリプションと未確認のメッセージが保持されます。 クライアントが切断され、後で再接続されると、中断した場所から再開され、メッセージ配信の再試行によって未確認の QoS-1 メッセージを受信します。 構成されている場合、MQTT ブローカーでは、クライアントがセッションの有効期限の間隔 (既定値は 1 日) 内で再接続しないと、クライアント セッションの有効期限が切れるようになります。

マルチスレッド アプリケーションでの最大受信

マルチスレッド アプリケーションは、最大受信 (最大 65,535) を使用してメッセージを並列処理し、フロー制御を適用する必要があります。 この設定では、複数のスレッドが同時にメッセージに対応できるようにすることで、アプリケーションの容量を超える高いメッセージ レートでブローカーがアプリケーションをオーバーロードすることなく、メッセージ処理が最適化されます。 各スレッドは個別にメッセージを処理でき、完了時にその受信確認を送信します。 一般的な方法は、アプリケーションで使用されるスレッドの数に比例して "最大受信" を構成することです

メッセージの受信確認

サブスクライバー アプリケーションは、QoS-1 メッセージの受信確認を送信するときに、メッセージの所有権を取得します。 QoS-1 メッセージの受信確認を受信すると、MQTT ブローカーはそのアプリケーションおよびトピックのメッセージの追跡を停止します。 所有権を適切に譲渡することで、処理の問題やアプリケーションのクラッシュが発生した場合にメッセージが保持されるようにします。 アプリケーションのクラッシュからアプリケーションを保護する場合は、そのメッセージの処理を正常に完了する前に、アプリケーションが所有権を取得しないようにする必要があります。 MQTT ブローカーをサブスクライブしているアプリケーションでは、最大 65,535 の "最大受信" 値の処理が完了するまで、メッセージの受信確認を遅らせる必要があります。 これには、メッセージまたはメッセージの派生物を MQTT ブローカーにリレーして、さらにディスパッチすることが含まれる場合があります。

メッセージの保持とブローカーの動作

ブローカーは、サブスクライバーから受信確認 (ACK) を受信するまでメッセージを保持し、メッセージが失われないようにします。 この動作により、サブスクライバー アプリケーションがクラッシュしたり、接続が一時的に失われたりしても、メッセージは失われず、アプリケーションが再接続した後に処理できるようになります。 メッセージの有効期限の間隔によって構成され、サブスクライバーがメッセージを使用しなかった場合、MQTT ブローカー メッセージの有効期限が切れる可能性があります。

保持されたメッセージ

保持されたメッセージにより、特定のトピックの最新の状態や値など、一時的なアプリケーションの状態が維持されます。 新しいクライアントがトピックをサブスクライブすると、最後に保持されたメッセージが受信され、最新の情報が確実に保持されます。

Keep-Alive

接続エラーや切断が発生した場合に高可用性を確保するには、クライアントとサーバー間の通信に適切なキープアライブ間隔を設定します。 アイドル期間中、クライアントは PINGREQ を送信し、PINGRESP を待ちます。 応答がない場合は、クライアントに自動再接続ロジックを実装して接続を再確立します。 Paho のようなほとんどのクライアントには再試行ロジックが組み込まれています。 MQTT ブローカーはフォールト トレラントであるため、フロントエンドとバックエンドに少なくとも 2 つの正常なブローカー インスタンスがある場合、正常に再接続されます。

QoS-1 サブスクリプションとの最終的な整合性

QoS-1 を使用する MQTT サブスクリプションでは、共有トピックをサブスクライブすることで、同じアプリケーション インスタンス間で最終的な整合性が確保されます。 メッセージが発行されると、インスタンスは少なくとも 1 回の配信でデータを受信してレプリケートします。 インスタンスは重複を処理し、データが同期されるまで一時的な不整合を許容する必要があります。

共有サブスクリプション

共有サブスクリプションにより、高可用性アプリケーションの複数のインスタンス間での負荷分散が有効になります。 各サブスクライバーがすべてのメッセージのコピーを受信する代わりに、サブスクライバー間でメッセージが均等に分散されます。 現在、MQTT ブローカーでは、アプリケーションのスケールアウトを可能にする、メッセージを分散するためのラウンド ロビン アルゴリズムのみがサポートされています。一般的なユース ケースは、共有サブスクリプションで同じトピック フィルターを使用してすべて MQTT ブローカーにサブスクライブする Kubernetes ReplicaSet を使用して複数のポッドをデプロイすることです。

状態ストア

状態ストアは、アプリケーションの処理状態を管理するためのレプリケートされたメモリ内 HashMap です。 たとえば、etcd とは異なり、状態ストアでは、メモリ内のデータ構造、パーティション分割、チェーン レプリケーションを通じて、高速スループット、水平スケーリング、低待機時間が優先されます。 これにより、アプリケーションは状態ストアの分散型の性質とフォールト トレランスを使用しながら、インスタンス間で整合性が確保された状態にすばやくアクセスできます。 分散型ブローカーによって提供される組み込みのキー値ストアを使用するには:

  • ブローカーのキー値ストア API を使用して一時的なストレージと取得操作を実装し、適切なエラー処理とデータの整合性を確保します。 エフェメラル状態とは、リアルタイムの評価中に中間結果またはメタデータに高速にアクセスするためにステートフル処理で使用される有効期間の短いデータの保存のことを指します。 HA アプリケーションのコンテキストでは、エフェメラル状態はクラッシュ間でのアプリケーション状態の回復に役立ちます。 これはディスクに書き込むことができますが、アクセス頻度の低いデータの長期保存用に設計されたコールド ストレージとは対照的に、一時的なままです。

  • 状態ストアを使用して、アプリケーションの複数のインスタンス間で状態、キャッシュ、構成、またはその他の重要なデータを共有し、データの一貫性のあるビューを維持できるようにします。

MQTT ブローカーの組み込みの Dapr 統合を使用する

より簡単なユース ケースでは、アプリケーションで Dapr (Distributed Application Runtime) を利用することがあります。 Dapr はオープンソースの移植可能なイベント駆動型ランタイムであり、マイクロサービスや分散型アプリケーションを簡単に構築できます。 これには、サービス間呼び出し、状態管理、発行/サブスクライブ メッセージングなど、一連の構成要素が用意されています。

Dapr は、MQTT ブローカーの一部として提供され、MQTT セッション管理、メッセージの QoS と受信確認、組み込みのキー値ストアの詳細を抽象化し、次のことによって、単純なユース ケース用の高可用性アプリケーションを開発するための実用的な選択肢となります。

  • キー値ストアを処理するための状態管理、MQTT ブローカーと対話するための発行/サブスクライブ メッセージングなど、Dapr の構成要素を使用してアプリケーションを設計します。 ユース ケースで Dapr でサポートされていない構成要素と抽象化が必要な場合は、前述の MQTT ブローカー機能の使用を検討してください。

  • 好みのプログラミング言語とフレームワークを使用してアプリケーションを実装し、Dapr SDK または API を利用してブローカーとキー値ストアとのシームレスな統合を実現します。

高可用性アプリケーションを開発するためのチェックリスト

  • お使いのプログラミング言語に適した MQTT クライアント ライブラリを選択します。 クライアントは MQTT バージョン 5 をサポートする必要があります。 アプリケーションが待機時間の影響を受ける場合は、C または Rust ベースのライブラリを使用します。
  • クリーン セッション フラグが false に設定され、必要な QoS レベル (QoS-1) を使用して MQTT ブローカーに接続するようにクライアント ライブラリを構成します。
  • セッションの有効期限、メッセージの有効期限、キープアライブ間隔に適した値を決定します。
  • メッセージが正常に配信または処理されたときの受信確認の送信など、サブスクライバー アプリケーションのメッセージ処理ロジックを実装します。
  • マルチスレッド アプリケーションの場合は、"最大受信" パラメータを構成して、メッセージを並列で処理できるようにします。
  • 一時的なアプリケーションの状態を維持するために保持されたメッセージを使用します。
  • 分散状態ストアを使用して、一時的なアプリケーションの状態を管理します。
  • ユース ケースが単純であり、MQTT 接続またはメッセージ処理を詳細に制御する必要がない場合は、Dapr を使用してアプリケーションを開発します。
  • アプリケーションの複数のインスタンス間でメッセージを均等に分散する共有サブスクリプションを実装して、効率的なスケーリングを可能にします。