チュートリアル:Azure Stream Analytics ジョブから Azure Functions を実行する
このチュートリアルでは、Azure Event Hubs からイベントを読み取り、イベント データに対してクエリを実行し、Azure Cache for Redis インスタンスに書き込みを行う Azure 関数を呼び出す Azure Stream Analytics ジョブを作成します。
Note
- Azure Stream Analytics から Azure Functions を実行するには、Stream Analytics ジョブへのシンク (出力) の 1 つとして Functions を構成します。 Functions はイベント ドリブン型コンピューティング オンデマンド エクスペリエンスであり、これにより、Azure またはサード パーティのサービスで発生するイベントによってトリガーされるコードを実装できます。 トリガーに応答する Azure Functions の機能によって、それは Azure Stream Analytics への自然な出力になります。
- Stream Analytics では、HTTP トリガーを使用して Functions を呼び出します。 Functions の出力アダプターにより、ユーザーは Functions を Stream Analytics に接続し、Stream Analytics クエリに基づいてイベントをトリガーできるようになります。
- マルチテナント クラスターで実行されている Stream Analytics ジョブから仮想ネットワーク (VNet) 内の Azure Functions への接続はサポートされていません。
このチュートリアルでは、次の作業を行う方法について説明します。
- Azure Event Hubs インスタンスを作成する
- Azure Cache for Redis インスタンスを作成する
- Azure Function の作成
- Stream Analytics のジョブの作成
- イベント ハブを入力として、関数を出力として構成する
- Stream Analytics ジョブの実行
- Azure Cache for Redis の結果を確認する
Azure サブスクリプションがない場合は、開始する前に無料アカウントを作成してください。
前提条件
始める前に、次の手順が完了していることを確認してください。
- Azure サブスクリプションをお持ちでない場合は、無料アカウントを作成してください。
- Microsoft ダウンロード センターから通話イベント ジェネレーター アプリTelcoGenerator.zip をダウンロードします。または、GitHub からソース コードを入手します。
Azure へのサインイン
Azure portal にサインインします。
イベント ハブの作成
Stream Analytics でデータ ストリームにおける不正な呼び出しを分析する前に、サンプル データをイベント ハブに送信する必要があります。 このチュートリアルでは、Azure Event Hubs を使用して Azure にデータを送信します。
イベント ハブを作成して呼び出しデータをそのイベント ハブに送信するには、次の手順を使用します。
Azure portal にサインインします。
左側のメニュー [すべてのサービス] を選択し、[モノのインターネット] を選択して、[Event Hubs] 上にマウス ポインターを置いて [+ (追加)] ボタンを選択します。
[名前空間の作成] ページで、これらの手順を実行します。
イベント ハブを作成する Azure サブスクリプションを選びます。
[リソース グループ] で、[新規作成] を選び、リソース グループの名前を入力します。 Event Hubs 名前空間は、このリソース グループに作成されます。
[名前空間名] に、Event Hubs 名前空間の一意の名前を入力します。
[場所] で、名前空間を作成するリージョンを選びます。
[価格レベル] で、[Standard] を選びます。
ページ下部にある [確認と作成] を選択します。
名前空間の作成ウィザードの [確認と作成] ページで、すべての設定を確認した後、ページの下部にある [作成] を選びます。
名前空間が正常にデプロイされたら、[リソースに移動] を選択して [Event Hubs 名前空間] ページに移動します。
[Event Hubs 名前空間] ページで、コマンド バーの [+ イベント ハブ] を選択します。
[イベント ハブの作成] ページで、イベント ハブの名前を入力します。 [パーティション数] を 2 に設定します。 残りの設定で既定のオプションを使用し、[確認と作成] を選びます。
[確認と作成] ページで、ページの下部にある [作成] ボタンをクリックします。 デプロイが成功するまで待ちます。
イベント ハブへのアクセスを許可し、接続文字列を取得する
アプリケーションから Azure Event Hubs にデータを送信できるようにするには、アクセスを許可するポリシーがイベント ハブに必要です。 アクセス ポリシーにより、承認情報を含む接続文字列が生成されます。
[Event Hubs 名前空間] ページで、左側のメニューの [共有アクセス ポリシー] を選択します。
ポリシーの一覧から RootManageSharedAccessKey を選択します。
次に、[接続文字列 - 主キー] の隣にあるコピー ボタンを選択します。
接続文字列をテキスト エディターに貼り付けます。 この接続文字列は、次のセクションで必要になります。
接続文字列は次のようになります。
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
接続文字列には、Endpoint、SharedAccessKeyName、SharedAccessKey という複数のキーと値のペアが含まれ、セミコロンで区切られていることに注目してください。
イベント ジェネレーター アプリケーションを起動する
TelcoGenerator アプリを起動する前に、以前に作成した Azure Event Hubs にデータを送信するよう構成する必要があります。
TelcoGenerator.zip ファイルの内容を抽出します。
任意のテキスト エディターで
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
ファイルを開きます。複数の.config
ファイルがあるので、正しいものを開くように注意してください。構成ファイル内の
<appSettings>
要素を次の詳細で更新します。- EventHubName キーの値を、接続文字列の末尾にある EntityPath の値に設定します。
- Microsoft.ServiceBus.ConnectionString キーの値を、名前空間への接続文字列に設定します。 名前空間ではなくイベント ハブへの接続文字列を使用する場合は、末尾の
EntityPath
値 (;EntityPath=myeventhub
) を削除します。 EntityPath 値の前にあるセミコロンを必ず削除してください。
ファイルを保存します。
次に、コマンド ウィンドウを開き、TelcoGenerator アプリケーションを解凍したフォルダーに変更します。 次のコマンドを入力します。
.\telcodatagen.exe 1000 0.2 2
このコマンドは、次のパラメーターを受け取ります。
- 1 時間あたりの呼び出しデータ レコードの数。
- 不正の確率のパーセンテージ。これは、アプリが不正な呼び出しをシミュレートする頻度です。 値 0.2 は、呼び出しレコードの約 20% が不正に見えることを意味します。
- 継続時間。これはアプリを実行する時間数です。 また、コマンド ラインでプロセスを終了する (Ctrl + C) ことで、いつでもアプリを停止できます。
数秒後に、アプリはイベント ハブに送信する呼び出しレコードを画面に表示し始めます。 通話データには、次のフィールドが含まれています。
レコード 定義 CallrecTime 通話開始時刻のタイムスタンプ。 SwitchNum 通話の接続に使われた電話交換機。 この例では、交換機は発信国/地域を表す文字列です (US、China、UK、Germany、Australia)。 CallingNum 発信元の電話番号。 CallingIMSI IMSI (International Mobile Subscriber Identity: 国際携帯機器加入者識別番号)。 発信元の一意識別子。 CalledNum 通話受信者の電話番号。 CalledIMSI IMSI (International Mobile Subscriber Identity: 国際携帯機器加入者識別番号)。 通話受信者の一意識別子。
Stream Analytics のジョブの作成
通話イベントのストリームを準備できたら、イベント ハブからデータを読み取る Stream Analytics ジョブを作成できます。
- Stream Analytics ジョブを作成するには、Azure portal に移動します。
- [リソースの作成] を選択し、 [Stream Analytics ジョブ] を検索します。 [Stream Analytics ジョブ] タイルを選択し、 [作成] を選択します。
- [新しい Stream Analytics ジョブ] ページで、次の手順のようにします。
[サブスクリプション] で、Event Hubs 名前空間を含むサブスクリプションを選択します。
[リソース グループ] で、前に作成したリソース グループを選びます。
[インスタンスの詳細] セクションの [名前] に、Stream Analytics ジョブの一意の名前を入力します。
[リージョン] で、Stream Analytics ジョブを作成するリージョンを選びます。 最適なパフォーマンスを実現し、リージョン間でのデータ転送の料金がかからないように、ジョブとイベント ハブを同じリージョンに配置することをお勧めします。
[ホスティング環境] で、まだ選んでいない場合は [クラウド] を選びます。 Stream Analytics ジョブは、クラウドまたはエッジにデプロイすることができます。 クラウドでは Azure Cloud にデプロイすることができ、エッジでは IoT Edge デバイスにデプロイすることができます。
[ストリーミング ユニット] で、[1] を選びます。 ストリーミング ユニットとは、ジョブの実行に必要なコンピューティング リソースのことです。 既定では、この値は 1 に設定されています。 ストリーミング ユニットのスケーリングについては、ストリーミング ユニットの理解と調整に関する記事を参照してください。
ページ下部にある [確認と作成] を選択します。
- [確認と作成] ページで設定を確認し、[作成] を選んで Stream Analytics ページを作成します。
- ジョブがデプロイされたら、[リソースに移動] を選んで [Stream Analytics ジョブ] ページに移動します。
ジョブの入力を構成する
次の手順では、前のセクションで作成したイベント ハブを使用してデータを読み取るためにジョブの入力ソースを定義します。
[Stream Analytics ジョブ] ページの左側のメニューの [ジョブ トポロジ] セクションで、[入力] を選択します。
[入力] ページで、[+ 入力の追加] と [イベント ハブ] を選択します。
[イベント ハブ] ページで、次の手順のようにします。
[入力のエイリアス] に「CallStream」と入力します。 [入力のエイリアス] には、入力を識別するためのわかりやすい名前を入力します。 入力のエイリアスに含めることのできる文字は、英数字、ハイフン、アンダースコアのみであり、長さは 3 文字以上 63 文字以下でなければなりません。
[Subscription] で、イベント ハブを作成した Azure サブスクリプションを選択します。 イベント ハブは、Stream Analytics ジョブと同じサブスクリプションにも別のサブスクリプションにも含めることができます。
[イベント ハブの名前空間] には、前のセクションで作成したイベント ハブの名前空間を選択します。 現在のサブスクリプションで利用可能な名前空間がすべて、ドロップダウンに表示されます。
[イベント ハブの名前] には、前のセクションで作成したイベント ハブを選びます。 選択した名前空間で利用可能なイベント ハブがすべて、ドロップダウンに表示されます。
[イベント ハブ コンシューマー グループ] で、[新規作成] オプションをオンのままにして、イベント ハブに新しいコンシューマー グループが作成されるようにします。 Stream Analytics ジョブごとに個別のコンシューマー グループを使用することをお勧めします。 コンシューマー グループが指定されていない場合、Stream Analytics ジョブは
$Default
コンシューマー グループを使用します。 ジョブに自己結合または複数の入力が含まれる場合、一部の入力は複数の閲覧者ダウンストリームによって読み取られる可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。[認証モード] で、[接続文字列] を選びます。 このオプションを使用してチュートリアルをテストした方が簡単です。
[イベント ハブ ポリシー名] で、[既存のものを使用] を選び、以前に作成したポリシーを選択します。
ページの下部にある [保存] を選択します。
Azure Cache for Redis インスタンスを作成する
「Azure Cache for Redis インスタンスを作成する」で説明されている手順を使用して、Azure Cache for Redis でキャッシュを作成します。
キャッシュを作成したら、 [設定] にある [アクセス キー] を選択します。 プライマリ接続文字列をメモします。
Azure Cache for Redis にデータを書き込むことができる関数を Azure Functions で作成する
Functions ドキュメントの関数アプリの作成に関するセクションを参照してください。 このサンプルは、次のものに基づいて構築されました。
このチュートリアルに従って、既定の HttpTrigger 関数アプリを Visual Studio Code に作成します。 言語:
C#
、ランタイム:.NET 6
(関数 v4 の下)、テンプレート:HTTP trigger
の情報が使用されます。プロジェクト フォルダーにあるターミナルで次のコマンドを実行して、Redis クライアント ライブラリをインストールします。
dotnet add package StackExchange.Redis --version 2.2.88
RedisConnectionString
項目とRedisDatabaseIndex
項目をValues
local.settings.json
のセクションに追加し、宛先サーバーの接続文字列を入力します。{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
Redis Database Index は、インスタンス上のデータベースを識別する 0 から 15 の数値です。
次のコード スニペットを使って、関数 (プロジェクト内の .cs ファイル) 全体を置き換えます。 名前空間、クラス名、関数名を独自に更新します。
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Stream Analytics では、関数から "HTTP 要求エンティティが大きすぎる" という例外を受け取ると、関数に送信するバッチのサイズを削減します。 次のコードでは、Stream Analytics からサイズ超過のバッチが送信されていないことを確認します。 関数で使用する最大バッチ カウントおよび最大バッチ サイズの値が Stream Analytics ポータルに入力した値と矛盾しないことを確認します。
これで、関数アプリが Azure に発行されました。
Azure portal で関数を開き、
RedisConnectionString
とRedisDatabaseIndex
のアプリケーション設定を設定します。
出力としての関数で Stream Analytics ジョブを更新する
Azure Portal で Stream Analytics ジョブを開きます。
目的の関数を参照し、 [概要]>[出力]>[追加] の順に選択します。 新しい出力を追加するには、シンク オプションとして Azure Function を選択します。 Functions の出力アダプターには次のプロパティがあります。
プロパティ名 説明 出力エイリアス 入力を参照するジョブのクエリで使用するわかりやすい名前です。 インポート オプション 現在のサブスクリプションから関数を使用できます。あるいは関数が別のサブスクリプションにある場合は、設定を手動で指定できます。 Function App Function App の名前です。 Function Function App にある関数の名前です (run.csx 関数の名前)。 最大バッチ サイズ 関数に送信される、各出力バッチの最大サイズをバイト単位で設定します。 既定では、この値は 262,144 バイト (256 KB) に設定されます。 最大バッチ カウント 関数に送信される各バッチ内の最大イベント数を指定します。 既定値は 100 です。 このプロパティは省略可能です。 Key 別のサブスクリプションから関数を使用できるようにします。 関数にアクセスするキー値を指定します。 このプロパティは省略可能です。 出力エイリアスの名前を指定します。 このチュートリアルでは saop1 という名前ですが、任意の名前を使用できます。 その他の詳細を入力します。
Stream Analytics ジョブを開き、クエリを次の内容に更新します
重要
次のサンプル スクリプトでは、入力名に CallStream、出力名に saop1 を使用していることを前提としています。 別の名前を使用した場合は、忘れずにクエリを更新してください。
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
コマンドラインで次のコマンドを実行して telcodatagen.exe アプリケーションを起動します。 コマンドには形式
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
が使用されます。telcodatagen.exe 1000 0.2 2
Stream Analytics ジョブを開始します。
Azure 関数の [監視] ページでは、関数が呼び出されていることがわかります。
キャッシュの [Azure Cache for Redis] ページで、左側のメニューで [メトリック] を選択し、[キャッシュ書き込み] メトリックを追加して、期間を [過去 1 時間] に設定します。 次の図のようなグラフが表示されます。
Azure Cache for Redis の結果を確認する
Azure Functions ログからキーを取得する
まず、Azure Cache for Redis に挿入されたレコードのキーを取得します。 コードでは、次のコード スニペットに示すように、キーは Azure 関数で計算されます。
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Azure portal を参照して Azure Functions アプリを見つけます。
左側のメニューの [関数] を選択します。
関数の一覧から [HTTPTrigger1] を選びます。
左側のメニューで [モニター] を選択します。
[ログ] タブに切り替えます。
次のスクリーンショットに示すように、情報メッセージからキーをメモします。 このキーを使用して、Azure Cache for Redis 内で値を見つけます。
このキーを使用して、Azure Cache for Redis 内でレコードを見つけます
Azure portal を参照し、Azure Cache for Redis を見つけます。 [コンソール] を説明します。
Azure Cache for Redis コマンドを使用して、Azure Cache for Redis にデータがあることを確認します (コマンドは Get {key} という形式になります)。(前のセクションで) Azure 関数の [モニター ログ] からコピーしたキーを使用します。
Get "KEY-FROM-THE-PREVIOUS-SECTION"
このコマンドにより、指定したキーの値が出力されるはずです。
エラー処理と再試行
Azure Functions へのイベントの送信中にエラーが発生した場合、Stream Analytics はほとんどの操作を再試行します。 http エラー 413 (エンティティが大きすぎます) を除き、すべての http 例外は、成功するまで再試行されます。 "エンティティが大きすぎます" エラーは、再試行またはドロップ ポリシーの対象となるデータ エラーとして扱われます。
Note
Stream Analytics から Azure Functions への HTTP 要求のタイムアウトは、100 秒に設定されています。 Azure Functions アプリが、バッチの処理に 100 秒以上を要する場合、Stream Analytics はエラーを出力し、そのバッチの再処理を試みます。
タイムアウトを再試行すると、重複するイベントが出力シンクに書き込まれる可能性があります。 Stream Analytics では、失敗したバッチを再試行するときに、バッチ内のすべてのイベントが再試行されます。 たとえば、Stream Analytics から Azure Functions に送信される 20 個のイベントからなるバッチを考えてみます。 Azure Functions によってこのバッチの最初の 10 個のイベントが処理されるのに 100 秒かかるとします。 Stream Analytics は、100 秒後、Azure Functions から肯定応答を受信していないため、要求を中断します。さらに、同じバッチに対して別の要求が送信されます。 バッチ内の最初の 10 個のイベントが Azure Functions によって再度処理され、これによって重複が発生します。
既知の問題
Azure Portal では、最大バッチ サイズ/最大バッチ カウントの値を空 (既定値) にリセットしようとしても、保存時には以前に入力した値に戻ります。 この場合は、それらのフィールドに既定値を手動で入力します。
Azure Functions での HTTP ルーティングの使用は、現在、Stream Analytics ではサポートされていません。
仮想ネットワークでホストされている Azure Functions に接続するためのサポートが、有効になっていません。
リソースをクリーンアップする
リソース グループ、ストリーミング ジョブ、および関連するすべてのリソースは、不要になったら削除します。 ジョブを削除すると、ジョブによって消費されるストリーミング ユニットに対する課金を回避することができます。 ジョブを後で使用する計画がある場合は、ジョブを停止し、必要なときに再起動することができます。 このジョブを使い続けない場合は、以下の手順のようにして、このクイックスタートで作成したすべてのリソースを削除します。
- Azure Portal の左側のメニューで [リソース グループ] を選択し、作成したリソースの名前を選択します。
- リソース グループのページで [削除] を選択し、削除するリソースの名前をテキスト ボックスに入力してから [削除] を選択します。
次のステップ
このチュートリアルでは、Azure 関数を実行する単純な Stream Analytics ジョブを作成しました。 Stream Analytics ジョブの詳細については、次のチュートリアルに進んでください。
Azure Functions 「Azure Stream Analytics ジョブで JavaScript ユーザー定義関数を実行する」を使用して、Azure SQL Database のレコードを更新またはマージします