サブプロトコルを使用して信頼性の高い WebSocket を作成する
間欠的なネットワークの問題が原因で WebSocket クライアント接続が切断されると、メッセージが失われる恐れがあります。 Pub/Sub システムでは、パブリッシャーはサブスクライバーから分離されています。そのためパブリッシャーは、サブスクライバーでの接続の切断またはメッセージの損失を検出できない場合があります。 間欠的なネットワークの問題を克服し、信頼性の高いメッセージ配信を維持することは、クライアントにとって不可欠です。 それを実現するには、リライアブル Azure Web PubSub サブプロトコルを活用して、信頼性の高い WebSocket クライアントを作成します。
リライアブル プロトコル
Web PubSub サービスでは、2 つのリライアブル サブプロトコル (json.reliable.webpubsub.azure.v1
と protobuf.reliable.webpubsub.azure.v1
) がサポートされています。 クライアントでは、信頼性を実現するために、サブプロトコルのパブリッシャー、サブスクライバー、復旧の部分に従う必要があります。 サブプロトコルの正しい実装に失敗すると、メッセージ配信が期待どおりに動作しないか、プロトコル違反が原因でサービスでクライアントを終了する可能性があります。
簡単な方法 - クライアント SDK の使用
信頼性の高いクライアントを作成する最も簡単な方法は、クライアント SDK を使用することです。 クライアント SDK で Web PubSub クライアント仕様を実装し、既定で json.reliable.webpubsub.azure.v1
を使用します。 クイック スタートについては、 クライアント 間の発行/サブスクライブに関するページを参照してください。
難しい方法 - 手作業での実装
以下のチュートリアルでは、Web PubSub クライアント仕様の実装における重要な部分について説明します。 このガイドは、すぐに始めたいと考えているユーザーではなく、信頼性の実現の原則を知りたいユーザーを対象にしています。 すぐに始めるには、クライアント SDK を使用してください。
初期化
リライアブル サブプロトコルを使用するためには、WebSocket 接続の構築時にサブプロトコルを設定する必要があります。 JavaScript では、次のコードを使用できます。
Json リライアブル サブプロトコルを使用する。
var pubsub = new WebSocket( "wss://test.webpubsub.azure.com/client/hubs/hub1", "json.reliable.webpubsub.azure.v1" );
Protobuf リライアブル サブプロトコルを使用する。
var pubsub = new WebSocket( "wss://test.webpubsub.azure.com/client/hubs/hub1", "protobuf.reliable.webpubsub.azure.v1" );
接続の回復
接続の復旧は信頼性の実現の基礎であり、json.reliable.webpubsub.azure.v1
および protobuf.reliable.webpubsub.azure.v1
プロトコルを使用する場合に実装する必要があります。
Websocket 接続は TCP に依存します。 接続が切断されない場合、メッセージは失われずに、順番に配信されます。 切断された接続でメッセージが失われるのを防ぐために、Web PubSub サービスでは、グループとメッセージの情報を含む接続状態の情報を保持します。 この情報は、接続の復旧時にクライアントを復元するために使用されます
クライアントでリライアブル サブプロトコルを使用してサービスに再接続する際、クライアントは connectionId
と reconnectionToken
を含んだ Connected
メッセージを受信します。 connectionId
では、サービスにおける接続のセッションを識別します。
{
"type": "system",
"event": "connected",
"connectionId": "<connection_id>",
"reconnectionToken": "<reconnection_token>"
}
WebSocket 接続が切断されると、クライアントでは同一のセッションを復元するために、同じ connectionId
を使用して再接続を試みる必要があります。 クライアントでサーバーとネゴシエートして access_token
を取得する必要はありません。 代わりに、接続を復旧するには、クライアントでサービス ホスト名、connection_id
、reconnection_token
を使用して、WebSocket 接続の要求をサービスに対して直接行う必要があります。
wss://<service-endpoint>/client/hubs/<hub>?awps_connection_id=<connection_id>&awps_reconnection_token=<reconnection_token>
ネットワークの問題がまだ解消していない場合、接続の復旧は失敗する可能性があります。 クライアントでは、次の状況になるまで再接続し続ける必要があります。
- WebSocket 接続が状態コード 1008 で閉じる。 この状態コードは connectionId がサービスから削除されたことを意味します。
- 復旧の失敗が 1 分を超えて発生し続ける。
公開元
イベント ハンドラーにイベントを送信するか、他のクライアントにメッセージを発行するクライアントをパブリッシャーと呼びます。 パブリッシャーでは、メッセージの発行が成功したかどうかに関する肯定応答を Web PubSub サービスから受信するように、メッセージで ackId
を設定する必要があります。
ackId
はメッセージの識別子であり、それぞれの新しいメッセージで一意の ID を使用する必要があります。 元の ackId
は、メッセージを再送信する際に使用する必要があります。
グループ送信メッセージのサンプル:
{
"type": "sendToGroup",
"group": "group1",
"dataType": "text",
"data": "text data",
"ackId": 1
}
確認応答のサンプル:
{
"type": "ack",
"ackId": 1,
"success": true
}
success: true
を含む ack 応答が Web PubSub サービスから返された場合、サービスによるメッセージの処理は完了しています。また、クライアントでは、すべてのサブスクライバーにメッセージが配信されると想定して構いません。
サービスで一時的な内部エラーが発生し、メッセージをサブスクライバーに送信できない場合、パブリッシャーは success: false
を含む ack を受信します。 パブリッシャーでは、エラーを読み取って、メッセージを再送信するかどうかを判断する必要があります。 メッセージを再送信する場合は、同一の ackId
を使用する必要があります。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "InternalServerError",
"message": "Internal server error"
}
}
WebSocket 接続が切断されたためにサービスの ack 応答が失われた場合、パブリッシャーでは復旧後に同一の ackId
を使用してメッセージを再送信する必要があります。 そのメッセージが以前にサービスで処理された場合は、Duplicate
エラーを含む ack が送信されます。 パブリッシャーでは、このメッセージの再送信を停止する必要があります。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "Duplicate",
"message": "Message with ack-id: 1 has been processed"
}
}
サブスクライバー
イベント ハンドラーまたはパブリッシャーからメッセージを受信するクライアントをサブスクライバーと呼びます。 ネットワークの問題が原因で接続が切断されると、サブスクライバーに送信されたメッセージの数を Web PubSub サービスで把握できなくなります。 サブスクライバーで最後に受信したメッセージを特定するために、サービスでは sequenceId
を含むデータ メッセージを送信します。 サブスクライバーではシーケンス ack メッセージで応答します。
シーケンス ACK のサンプル:
{
"type": "sequenceAck",
"sequenceId": 1
}
sequenceId
は、connection-id セッション内の uint64 の増分値です。 サブスクライバーでは、受信した最大の sequenceId
を記録し、より大きな sequenceId
を持つメッセージのみを受信して、より小さいか等しい sequenceId
を持つメッセージを削除する必要があります。 サブスクライバーでは、既にサブスクライバーで受信済みのメッセージの再配信をサービスでスキップできるように、記録した最大の sequenceId
に肯定応答する必要があります。 たとえば、サブスクライバーで sequenceId: 5
を持つ sequenceAck
で応答した場合、サービスでは 5 より大きい sequenceId
を持つメッセージのみを再送信します。
すべてのメッセージは、WebSocket 接続が切断されるまで、サブスクライバーに配信されます。 sequenceId
を使用すると、セッション内の WebSocket 接続の全体においてサブスクライバーで受信したメッセージの数を、サービスで把握できます。 WebSocket 接続の切断後、サービスではサブスクライバーからの肯定応答がないメッセージを再配信します。 このサービスには、限られた数の未確認のメッセージが格納されます。 メッセージの数が制限を超えると、サービスで WebSocket 接続を閉じて、セッションを削除します。 したがってサブスクライバーは、できるだけ速やかに sequenceId
に対して肯定応答する必要があります。