次の方法で共有


Azure Functions の信頼性の高いイベント処理

イベント処理は、サーバーレス アーキテクチャに関連する最も一般的なシナリオの 1 つです。 この記事では、メッセージが失われないよう Azure Functions を使用して信頼性の高いメッセージ プロセッサを作成する方法について説明します。

分散システムにおけるイベント ストリームの課題

1 秒あたり 100 イベントを一定の率で送信するシステムがあるとします。 この率で継続した場合、複数の並列関数インスタンスは数分以内に毎秒 100 イベントを使用します。

ただし、次のような最適でない条件も考えられます。

  • イベント発行元が破損したイベントを送信した場合
  • 関数インスタンスでハンドルされない例外が発生した場合
  • ダウンストリーム システムがオフラインになった場合

アプリケーションのスループットを維持しながら、このような状況を処理するにはどうすればよいでしょうか。

キューでは、信頼できるメッセージングがサポートされています。 Functions トリガーと組み合わせることで、関数はキュー メッセージをロックします。 処理が失敗した場合、別のインスタンスが処理を再試行できるようにロックが解放されます。 処理は、メッセージが正常に評価されるか有害キューに追加されるまで続行されます。

1 つのキュー メッセージが再試行サイクルを継続中である場合も、引き続き残りのメッセージを並列実行してデキューします。 そのため、スループット全体が 1 つの無効なメッセージの影響を受けることはほとんどありません。 ただし、ストレージ キューは順序を保証するものではなく、Event Hubs に必要な高スループット要求にも最適化されていません。

これに対し、Azure Event Hubs にはロックの概念がありません。 高スループット、複数のコンシューマー グループ、再試行などの機能を実現するため、Event Hubs イベントはビデオ プレーヤーのように動作します。 イベントは、パーティションごとにストリーム内の 1 つのポイントから読み取られます。 読み取りはポインターから前方または後方に行われますが、イベントの処理を行うにはポインターを移動することを選択する必要があります。

ストリームでエラーが発生し、ポインターを移動しない場合、ポインターが移動するまでイベント処理はブロックされます。 つまり、1 つのイベントの処理のためポインターが停止した場合、未処理のイベントは積み上げられます。

Azure Functions は、成否に関わらずストリームのポインターを進めることで、デッドロックを回避します。 ポインターは前進し続けるため、関数を実行して失敗を適切に処理する必要があります。

Azure Functions で Event Hubs イベントを使用する方法

Azure Functions は、次の手順を実行しながら Event Hubs イベントを使用します。

  1. ポインターが作成され、イベント ハブの各パーティションの Azure Storage に保存されます。
  2. 新しいメッセージを (既定ではバッチで) 受信すると、ホストはメッセージのバッチを使用して関数をトリガーしようとします。
  3. 関数の実行が完了 (例外の有無に関わらず) した場合は、ポインターが進み、ストレージ アカウントにチェックポイントが保存されます。
  4. 条件によって関数の実行が完了しない場合、ホストのポインター進行は失敗します。 ポインターが移動しない場合は、後で同じメッセージの処理を終了することを確認します。
  5. 手順 2 から 4 を繰り返します

この動作により、いくつかの重要な点がわかります。

例外の処理

原則として、最高レベルのコードに try/catch block を含める必要があります。 具体的には、Event Hubs イベントを使用するすべての関数には、catchのブロックが必要です。 これにより、例外が発生すると、ポインターが進む前に catch ブロックによってエラーが処理されます。

再試行メカニズムとポリシー

一時的な例外もあり、これらは後でもう一度操作を実行しようとしても再表示されません。 このため、最初の手順では常に操作を再試行します。 関数アプリの再試行ポリシーを利用するか、関数の実行内で再試行ロジックを作成することができます。

関数にエラー処理動作を組み込むと、基本的な再試行ポリシーと高度な再試行ポリシーの両方を定義できます。 たとえば、次の規則で示すワークフローに従うポリシーを実装できます。

  • メッセージを 3 回挿入してみます (再試行の間に遅延が発生する可能性があります)。
  • 再試行が最終的にすべて失敗した場合は、メッセージをキューに追加して、ストリームで処理を続行できるようにします。
  • その後、破損したメッセージまたは未処理のメッセージが処理されます。

Note

Polly は、C# アプリケーションの復元性および一時的なエラー処理ライブラリの例です。

例外以外のエラー

エラーが存在しない場合でも、問題が発生する場合があります。 たとえば、実行中に失敗が発生したとします。 この場合、関数の実行が完了するまでオフセット ポインターは進行しません。 ポインターが先に進まない場合、実行に失敗した後に実行されるインスタンスは、引き続き同じメッセージを読み取ります。 この状況では、"at-least-once" 保証が提供されます。

すべてのメッセージが少なくとも 1 回処理されることが保証されることは、同時に一部のメッセージが複数回処理される可能性があることを意味します。 関数アプリはこの可能性に注意し、べき等の原則を中心に関数アプリを構築する必要があります。

実行の停止と再起動

少数であればエラーは許容されますが、アプリで重大な失敗が発生した場合はどうなるでしょうか。 システムが正常な状態になるまで、イベントのトリガーを停止することができます。 多くの場合、処理の一時停止はサーキット ブレーカー パターンで可能です。 サーキット ブレーカー パターンを使用すると、アプリでイベント プロセスの「サーキットを中断」し、後で再開することができます。

イベント プロセスにサーキット ブレーカーを実装するには、次の 2 つの要素が必要です。

  • すべてのインスタンスで状態を共有し、サーキットの正常性を追跡および監視する
  • サーキットの状態 (開または閉) を管理できるマスター プロセス

実装の詳細は異なる場合がありますが、インスタンス間で状態を共有するには、ストレージ メカニズムが必要です。 Azure Storage、Redis Cache、または一連の機能でアクセスできるその他のアカウントに状態を格納することを選択できます。

Azure Logic Apps または Durable Functions は、ワークフローとサーキットの状態を管理するのに最適です。 他のサービスも同様に機能しますが、この例ではロジック アプリを使用します。 ロジック アプリを使用すると、関数の実行の一時停止や再起動など、サーキット ブレーカー パターンを実装するために必要な制御機能が使用できます。

インスタンス間での失敗のしきい値の定義

イベントを同時に処理する複数のインスタンスを考慮するには、サーキットの正常性を監視するために共有されている外部の状態を保持する必要があります。

実装することができる規則では、次を適用できます。

  • すべてのインスタンスで 30秒間に 100 以上の最終的な失敗が発生した場合は、サーキットを中断し、新しいメッセージのトリガーを停止します。

実装の詳細はニーズによって異なりますが、一般的には次のようなシステムを作成できます。

  1. ストレージ アカウント (Azure Storage、Redis など) に失敗をログに記録します。
  2. 新しい失敗がログに記録されたら、ローリング カウントを調べて、しきい値に達しているかを確認します (たとえば、過去 30 秒間で 100 を超えるなど)。
  3. しきい値に達した場合は、Azure Event Grid にイベントを生成し、システムにサーキットを中断するように指示します。

Azure Logic Apps を使用したサーキット状態の管理

次の説明は、Azure Logic App を作成して Functions アプリの処理を停止する方法の 1 つを示しています。

Azure Logic Apps には、さまざまなサービスへの組み込みコネクタが用意され、ステートフル オーケストレーションを特徴としており、サーキットの状態管理に最適です。 サーキットを中断する必要があることが検出されたら、ロジック アプリを構築し、次のワークフローを実装します。

  1. Event Grid ワークフローをトリガーし、Azure Function を停止する (Azure リソース コネクタを使用)
  2. ワークフローの再開オプションを含む通知用メールを送信する

メールの受信者はサーキットの正常性を調査し、必要に応じて、通知用メールのリンクからサーキットを再起動できます。 ワークフローで関数を再起動すると、メッセージは最後のイベント ハブ チェックポイントから処理されます。

この方法を使用すると、メッセージが失われないだけでなく、すべてのメッセージが順番に処理され、必要に応じてサーキットを中断できます。

リソース

次のステップ

詳細については、次のリソースを参照してください。