キュー トリガー
メッセージ キューは、プロセス、スレッド、またはアプリケーション間のメッセージングを処理するために使用されるソフトウェア コンポーネントです。 キューにはメッセージを格納でき、"ワーカー" は、都合の良いときにメッセージをフェッチできます。
クラウドでは、メッセージ キューはペイロードを持つイベントを生成できます。 Azure Functions のようなサービスは、このようなメッセージをリッスンし、メッセージが発行されたときにコードを実行できます。
メッセージ キューを処理する
Azure 関数がメッセージ キューからのメッセージを使用できるようにするには、トリガーと、場合によってはバインディングが必要です。
Azure 関数では、特定のキューで新しいメッセージが発行されたときにコードがトリガーされるようにするために、そのキューをリッスンする必要があります。 トリガーを設定するには、適切な資格情報を指定して、トリガー コードでメッセージ キューへの接続方法を認識できるようにする必要があります。 キューをリッスンしている関数に対して、function.json ファイル内にエントリを作成します。 bindings
要素で、エントリに関する次のプロパティを指定します。
プロパティ | 値 |
---|---|
name |
コード内で参照できる名前 |
type |
queueTrigger |
direction |
in |
queueName |
キューの名前 |
connection |
local.settings.json 内の構成変数 |
エントリの例は次のように定義できます。
{
"name": "myQueueItem",
"type": "queueTrigger",
"direction": "in",
"queueName": "messages-incoming",
"connection": "AzureWebJobsStorage"
}
このキューがストレージ アカウント上に存在する場合、AzureWebJobsStorage
値は接続文字列の値です。
キューからメッセージを使用する場合、バインディングは絶対に必要というわけではありません。 ただし、キューに書き込む場合は、出力バインディングが必要です。 このようなバインディングを使用すると、目的のキューへの参照が得られます。
Note
現在、キューでは出力バインディングのみがサポートされています。
ローカルでの開発
開発者は短いフィードバック サイクルを必要としています。 また、開発者エクスペリエンスをできるだけ運用環境に近づけるようにする必要があります。 これらの両方の目標を達成する方法として、キュー エミュレーターを使用する方法があります。
キュー エミュレーターを使用すると、Azure 関数が応答する実際のキュー メッセージをシミュレートできます。 エミュレーターを使用するには、次のようにします。
エミュレーターをインストールします。 Visual Studio Code で Azurite を検索するか、Azurite 拡張機能をダウンロードします。
エミュレーター機能を使用するには、コマンド パレットで [Azure: Start Queue Service](Azure: キュー サービスの開始) を選択して開始します。
このコマンドを実行すると、別のアプリケーションから使用できる Azure Storage Explorer という名前のリスナーが開始されます。 Storage Explorer は、クラウド リソースを参照してエミュレーター機能を使用できるようにするクライアント アプリケーションです。
Azure Storage Explorer をダウンロードします。 次に、アプリケーションを開くと、エミュレーターが動作していることが次のように示されます。
エミュレーターにキューを作成します。 このキューは、関数エンドポイントの構成の一部として使用します。 キュー要素を右クリックすると、新しいキューを作成できます。
Functions アプリでエミュレーターが使用されるようにするには、接続文字列を正しく設定する必要があります。 local.settings.js を開き、
AzureWebJobsStorage
要素を見つけて、"UseDevelopmentStorage=true"
の値を指定します。Note
クラウドに移行する場合は、このプロパティを別のものに設定してください。 運用環境では、これは Azure 上の実際のリソースを指す必要があります。
関数を構築する
これでローカル エミュレーターが設定され、そのキューが作成されました。 また、ローカル エミュレーターを指すようにプロジェクトが構成されました。 次に、キュー トリガーを処理する関数を作成する必要があります。
関数エンドポイントを作成する
受信キュー メッセージを処理できる関数を作成する準備ができました。 関数用のフォルダーを作成し、queueTrigger
などの名前を付けます。 次に、function.json ファイルを作成し、内容を次のようにします。
{
"bindings": [{
"name" "queueItem",
"type": "queueTrigger",
"direction": "in",
"queueName" : "items",
"connection": "AzureWebJobsStorage"
}]
}
name
要素の値は重要です。キューからの受信データを解析するために、この値を後でコードで参照するためです。 これは、新しいメッセージがある場合にキューによってトリガーされる queueTrigger
型である必要があります。
queueName
要素では、対話するキューを一意に識別します。 ここに入力する値は、エミュレーターでのキューの名前、または後で Azure で実際のキューに使用する名前と一致している必要があります。
connection
要素は、local.settings.json の AzureWebJobsStorage
要素の値を指定します。
キュー メッセージを処理する
受信キュー メッセージを処理するには、必要なメッセージを解析できるコードを記述する必要があります。 その時点で、次に何を行うかを決定できます。 たとえば、Web 要求を開始したり、そのメッセージを別のキューに配置したり、データベースにメッセージを送信したりできます。
ルートを設定する
受信要求を処理するためのルートが必要です。 Azure Functions では、ルートにあるキューへの要求が処理されます。 次のようにルートを設定すると、要求は http://localhost:<port>/queueTrigger
として呼び出されます。
http.HandleFunc("/queueTrigger", handleQueueTrigger)
要求をデコードする
キュー メッセージは、次の形式で送信されます。
{
"Data": {
"queueItem": "your message"
},
"Metadata": {
"DequeueCount": 1,
"ExpirationTime": "2019-10-16T17:58:31+00:00",
"Id": "800ae4b3-bdd2-4c08-badd-f08e5a34b865",
"InsertionTime": "2019-10-09T17:58:31+00:00",
"NextVisibleTime": "2019-10-09T18:08:32+00:00",
"PopReceipt": "AgAAAAMAAAAAAAAAAgtnj8x+1QE=",
"sys": {
"MethodName": "QueueTrigger",
"UtcNow": "2019-10-09T17:58:32.2205399Z",
"RandGuid": "24ad4c06-24ad-4e5b-8294-3da9714877e9"
}
}
}
この受信要求をデコードする一部として、前のメッセージをモデル化するヘルパー構造が必要です。 内容は次のようになります。
type InvokeRequest {
Data map[string]json.RawMessage
Metadata map[string]interface{}
}
この受信要求を受け取ってデコードするコードの記述を開始します。
func handleQueueTrigger(w http.ResponseWrite, r *http.Request) {
var invokeRequest InvokeRequest
d := json.NewDecoder(r.Body)
d.Decode(&invokeRequest)
}
これで要求がデコードされましたが、キュー メッセージ自体を解析する必要があります。
キュー メッセージを解析する
要求がデコードされると、Data
プロパティで要求からキュー メッセージを取得できます。 また、function.json ファイルで設定した name
プロパティ値でメッセージを参照する必要があります。 メッセージを取得するためのコードは、次のような 1 行です。
invokeRequest.Data["queueItem"]
このメッセージをクリア テキストで読み取れるようにする必要があるため、JSON ライブラリを使用してこれを解析します。 JSON ライブラリでは、解析するメッセージと、解析されたメッセージを配置する変数の 2 つのパラメーターを受け取る Unmarshal()
メソッドを使用します。 したがって、コードは次のようになります。
var parsedMessage string
json.Unmarshal(invokeRequest.Data["queueItem"], &parsedMessage)
この時点では、parsedMessage
にメッセージが含まれています。 コンソールに出力する場合は、次のコードを使用します。
fmt.Println(parsedMessage) // your message
Note
メッセージが文字列よりも高度なものである場合、parsedMessage
は、queueMessage
が差すものの形状に一致する構造を持つ必要があります。
メッセージをトリガーする
アプリケーションをテストするために、Azure Storage Explorer を使用することができます。 ツールの右側のウィンドウで、[メッセージを追加] ボタンを選択して、キューにメッセージを作成します。
Functions アプリが存在し、この時点で実行されている場合は、バインディングがトリガーされ、コードが呼び出されます。