Azure Stream Analytics での一般的なクエリ パターン
Azure Stream Analytics のクエリは SQL に類似したクエリ言語で表現されます。 言語の構造については、「Stream Analytics query language reference」(Stream Analytics クエリ言語リファレンス) ガイドで確認できます。
クエリのデザインでは、イベント データを 1 つの入力ストリームから出力データ ストアに移動する、単純なパススルー ロジックを表すことができます。また、「Stream Analytics を使って IoT ソリューションを構築する」のガイドで説明しているように、豊富なパターン マッチングとテンポラル解析を行って、さまざまな時間枠にわたって集計を計算することもできます。 複数の入力からのデータを結合してストリーミング イベントを結合し、静的な参照データに対する参照を行ってイベントの値を多様化することができます。 複数の出力にデータを書き込むこともできます。
この記事では、実際のシナリオに基づいて、いくつかの一般的なクエリ パターンの対処方法について説明します。
サポートされるデータ形式
Azure Stream Analytics では、CSV、JSON、および Avro データ形式のイベントの処理をサポートしています。 JSON と Avro のどちらの形式も、入れ子にしたオブジェクト (レコード) や配列などの複合型を含めることができます。 これらの複合データ型の操作について詳しくは、JSON および AVRO データの解析に関する記事を参照してください。
複数の出力にデータを送信する
複数の SELECT ステートメントを使用して、異なる出力シンクにデータを出力できます。 たとえば、一方の SELECT ステートメントでしきい値に基づくアラートを出力し、もう一方の SELECT ステートメントで BLOB ストレージにイベントを出力することができます。
次の入力について考えてみましょう。
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
また、クエリから得る必要がある出力は、次の 2 つです。
ArchiveOutput:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
AlertOutput:
| Make | Time | Count |
| --- | --- | --- |
| Make2 |2023-01-01T00:00:10.0000000Z |3 |
2 つの SELECT ステートメントを使用し、出力として Archive 出力と Alert 出力を使用するクエリ:
SELECT
*
INTO
ArchiveOutput
FROM
Input TIMESTAMP BY Time
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO
AlertOutput
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING
[Count] >= 3
INTO 句を使用して、Stream Analytics サービスに対して、データを書き込む出力先を指定します。 最初の SELECT では、入力からデータを受信して ArchiveOutput という名前の出力に送信するパススルー クエリを定義します。 2 番目のクエリでは、データを集計してフィルター処理し、その結果を、AlertOutput という名前のダウンストリームのアラート システム出力に送信します。
WITH 句を使用すると、複数のサブクエリ ブロックを定義できます。 このオプションには、入力ソースに対して開くリーダーが少なくて済むという利点があります。
Query:
WITH ReaderQuery AS (
SELECT
*
FROM
Input TIMESTAMP BY Time
)
SELECT * INTO ArchiveOutput FROM ReaderQuery
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO AlertOutput
FROM ReaderQuery
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING [Count] >= 3
詳細については、「WITH句」を参照してください。
単純なパススルー クエリ
単純なパススルー クエリを使用すると、入力ストリーム データを出力にコピーできます。 たとえば、後で分析を行うために、リアルタイムの車両情報を含むデータ ストリームを SQL データベースに保存する必要がある場合、単純なパススルー クエリを使用してこのジョブを実行します。
次の入力について考えてみましょう。
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
output が入力と同じになるようにする必要があります。
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
クエリは次のとおりです。
SELECT
*
INTO Output
FROM Input
この SELECT * クエリを実行すると、受信イベントのすべてのフィールドが投影され、出力に送信されます。 代わりに、SELECT ステートメントで必要なフィールドのみを投影することもできます。 次の例では、SELECT ステートメントで、入力データからMake フィールドと Time フィールドのみを投影します。
次の入力について考えてみましょう。
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
出力 には、Make フィールドと Time フィールドのみがふくまれるようにする必要があります。
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:04.0000000Z |
必要なフィールドのみを投影するクエリを次に示します。
SELECT
Make, Time
INTO Output
FROM Input
LIKE と NOT LIKE を使用した文字列の照合
LIKE や NOT LIKE を使用して、フィールドが特定のパターンに一致するかどうかを検証することができます。 たとえば、フィルター処理を使用して、文字 A
で始まり、数字 9
で終了するナンバー プレートのみを返すことができます。
次の入力について考えてみましょう。
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
出力には、文字 A
で始まり、数字 9
で終わるナンバー プレートを含める必要があります。
| Make | License_plate | Time |
| --- | --- | --- |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
LLIKE 演算子を使用するクエリを次に示します。
SELECT
*
FROM
Input TIMESTAMP BY Time
WHERE
License_plate LIKE 'A%9'
LIKE ステートメントを使用して License_plate フィールドの値を検査します。 値は、文字 A
で始まり、0 文字以上の文字列が続き、数字 9 で終わる必要があります。
過去のイベントに対する計算
LAG 関数を使用すると、時間枠内の過去のイベントを調べて、現在のイベントと比較することができます。 たとえば、現在の自動車の Make が、料金所を最後に通過した自動車の Make と異なる場合、現在の自動車の Make を出力できます。
サンプル入力:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
サンプル出力:
| Make | Time |
| --- | --- |
| Make2 |2023-01-01T00:00:02.0000000Z |
サンプルクエリ:
SELECT
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(minute, 1)) <> Make
LAG を使用して入力ストリームで 1 つ前のイベントを調べて、Make の値を取得し、その値を現在のイベントの Make と比較し、イベントを出力します。
詳細については、LAG に関するページを参照してください。
期間内の最後のイベントを返す
イベントはシステムによってリアルタイムで使用されるため、イベントが、その時間枠で最後の到着イベントであるかどうかを判断できる関数はありません。 これを実現するには、イベントの時間がそのウィンドウでのすべてのイベントの最大時間となるように、入力ストリームを別のストリームと結合する必要があります。
サンプル入力:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
10 分間の 2 つの時間枠内の最後の自動車に関する情報を含むサンプル出力:
| License_plate | Make | Time |
| --- | --- | --- |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
サンプルクエリ:
WITH LastInWindow AS
(
SELECT
MAX(Time) AS LastEventTime
FROM
Input TIMESTAMP BY Time
GROUP BY
TumblingWindow(minute, 10)
)
SELECT
Input.License_plate,
Input.Make,
Input.Time
FROM
Input TIMESTAMP BY Time
INNER JOIN LastInWindow
ON DATEDIFF(minute, Input, LastInWindow) BETWEEN 0 AND 10
AND Input.Time = LastInWindow.LastEventTime
クエリの最初のステップでは、10 分間の時間枠内で最大のタイム スタンプ、つまり、その時間枠の最後のイベントのタイム スタンプを検索します。 2 番目の手順では、最初のクエリの結果と元のストリームを結合し、各期間で最後のタイム スタンプに一致するイベントを検索します。
DATEDIFF は、2 つの DateTime フィールド間の時差を比較して返す、日付固有の関数です。詳細については、日付関数に関するページを参照してください。
ストリームの結合の詳細については、JOIN に関するページを参照してください。
時間ごとのデータ集計
時間枠にわたって情報を計算するには、データを集計することができます。 この例では、自動車の特定の Make ごとに過去 10 秒間のカウントが計算されます。
サンプル入力:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
サンプル出力:
| Make | Count |
| --- | --- |
| Make1 | 2 |
| Make2 | 1 |
Query:
SELECT
Make,
COUNT(*) AS Count
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
この集計では、自動車を Make でグループ化して、10 秒ごとにカウントします。 出力には、料金所を通過した自動車の Make と Count が含まれます。
TumblingWindow は、イベントをグループ化するために使用されるウィンドウ関数です。 集計は、グループ化されたすべてのイベントに適用できます。 詳細については、ウィンドウ関数に関するページを参照してください。
集計の詳細については、集計関数に関するページを参照してください。
定期的な値の出力
イベントが見つからない、または不規則な場合、よりスパースなデータ入力から一定の間隔の出力を生成できます。 たとえば、最近検出されたデータ ポイントを報告する 5 秒ごとのイベントを生成します。
サンプル入力:
| Time | Value |
| --- | --- |
| "2014-01-01T06:01:00" |1 |
| "2014-01-01T06:01:05" |2 |
| "2014-01-01T06:01:10" |3 |
| "2014-01-01T06:01:15" |4 |
| "2014-01-01T06:01:30" |5 |
| "2014-01-01T06:01:35" |6 |
サンプル出力 (最初の 10 行):
| Window_end | Last_event.Time | Last_event.Value |
| --- | --- | --- |
| 2014-01-01T14:01:00.000Z |2014-01-01T14:01:00.000Z |1 |
| 2014-01-01T14:01:05.000Z |2014-01-01T14:01:05.000Z |2 |
| 2014-01-01T14:01:10.000Z |2014-01-01T14:01:10.000Z |3 |
| 2014-01-01T14:01:15.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:20.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:25.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:30.000Z |2014-01-01T14:01:30.000Z |5 |
| 2014-01-01T14:01:35.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:40.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:45.000Z |2014-01-01T14:01:35.000Z |6 |
サンプルクエリ:
SELECT
System.Timestamp() AS Window_end,
TopOne() OVER (ORDER BY Time DESC) AS Last_event
FROM
Input TIMESTAMP BY Time
GROUP BY
HOPPINGWINDOW(second, 300, 5)
このクエリは、5 秒ごとにイベントを生成し、それまでに受信した最後のイベントを出力します。 HOPPINGWINDOW 期間で、クエリが最新のイベントを検出するためにさかのぼる期間を指定します。
詳細については、ホッピング ウィンドウに関するページを参照してください。
ストリームでのイベントの関連付け
LAG 関数を使用して、過去のイベントを調べることにより、同じストリーム内のイベントを関連付けることができます。 たとえば、過去 90 秒間に同じ Make の 2 台の自動車が連続して料金所を通過するたびに出力を生成できます。
サンプル入力:
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make1 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make2 |DEF-987 |2023-01-01T00:00:03.0000000Z |
| Make1 |GHI-345 |2023-01-01T00:00:04.0000000Z |
サンプル出力:
| Make | Time | Current_car_license_plate | First_car_license_plate | First_car_time |
| --- | --- | --- | --- | --- |
| Make1 |2023-01-01T00:00:02.0000000Z |AAA-999 |ABC-123 |2023-01-01T00:00:01.0000000Z |
サンプルクエリ:
SELECT
Make,
Time,
License_plate AS Current_car_license_plate,
LAG(License_plate, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_license_plate,
LAG(Time, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(second, 90)) = Make
LAG 関数を使用して、入力ストリームで 1 つ前のイベントを調べて、Make の値を取得し、現在のイベントの Make の値と比較することができます。 条件が満たされると、SELECT ステートメントで LAG を使用して、前のイベントのデータを射影できます。
詳細については、LAG に関するページを参照してください。
イベントの間隔を検出する
イベントの間隔は、終了イベントを受信したときに最後の開始イベントを調べることによって計算できます。 このクエリは、ユーザーがページまたは機能に費やした時間を判断するのに役立ちます。
サンプル入力:
| User | Feature | Event | Time |
| --- | --- | --- | --- |
| user@location.com |RightMenu |Start |2023-01-01T00:00:01.0000000Z |
| user@location.com |RightMenu |End |2023-01-01T00:00:08.0000000Z |
サンプル出力:
| User | Feature | Duration |
| --- | --- | --- |
| user@location.com |RightMenu |7 |
サンプルクエリ:
SELECT
[user],
feature,
DATEDIFF(
second,
LAST(Time) OVER (PARTITION BY [user], feature LIMIT DURATION(hour, 1) WHEN Event = 'start'),
Time) as duration
FROM input TIMESTAMP BY Time
WHERE
Event = 'end'
LAST 関数を使用して、特定の条件内で最後のイベントを取得できます。 この例では、条件は Start 型のイベントで、ユーザーと機能に対して PARTITION BY を実行することで検索をパーティション分割します。 このようにして、すべてのユーザーと機能が、Start イベントを検索するときに個別に処理されます。 LIMIT DURATION を使用して、終了イベントと開始イベントの間の過去 1 時間に検索を制限します。
一意の値をカウントする
COUNT と DISTINCT を使用すると、ストリームに出現するフィールドの、値ごとの件数を一定間隔でカウントできます。 2 秒間の時間枠内で料金所を通過した自動車の Make ごとの台数を計算するクエリを作成できます。
サンプル入力:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
サンプル出力:
| Count_make | Time |
| --- | --- |
| 2 |2023-01-01T00:00:02.000Z |
| 1 |2023-01-01T00:00:04.000Z |
サンプル クエリ:
SELECT
COUNT(DISTINCT Make) AS Count_make,
System.TIMESTAMP() AS Time
FROM Input TIMESTAMP BY TIME
GROUP BY
TumblingWindow(second, 2)
COUNT(DISTINCT Make) は、ある時間枠内で、Make 列の個別の値の数を返します。 詳細については、COUNT 集計関数に関するページをご覧ください。
期間内の最初のイベントを取得する
IsFirst
を使用すると、時間枠内の最初のイベントを取得できます。 たとえば、10 分間隔で最初の自動車情報を出力します。
サンプル入力:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
サンプル出力:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
サンプルクエリ:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) = 1
IsFirst を使用して、データをパーティション分割し、10 分間隔で検出された自動車の Make ごとの最初のイベントを計算することもできます。
サンプル出力:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
サンプルクエリ:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) OVER (PARTITION BY Make) = 1
詳しくは、IsFirst に関するページをご覧ください。
期間内の重複するイベントを削除する
特定の時間枠内のイベントの平均を計算するなどの操作を実行する場合、重複するイベントをフィルター処理する必要があります。 次の例では、2 番目のイベントは最初のイベントの重複です。
サンプル入力:
| DeviceId | Time | Attribute | Value |
| --- | --- | --- | --- |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 2 |2018-07-27T00:00:01.0000000Z |Temperature |40 |
| 1 |2018-07-27T00:00:05.0000000Z |Temperature |60 |
| 2 |2018-07-27T00:00:05.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:10.0000000Z |Temperature |100 |
サンプル出力:
| AverageValue | DeviceId |
| --- | --- |
| 70 | 1 |
|45 | 2 |
サンプルクエリ:
WITH Temp AS (
SELECT Value, DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, System.Timestamp()
)
SELECT
AVG(Value) AS AverageValue, DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
group by 句のフィールドがすべて同じであるため、最初のステートメントが実行されると、重複するレコードが 1 つに結合されます。 したがって、重複は削除されます。
異なるケース/値に異なるロジックを指定する (CASE ステートメント)
CASE ステートメントでは、特定の条件に基づいてさまざまなフィールドに対して異なる計算を行うことができます。 たとえば、レーン A
を Make1
の自動車に割り当て、レーン B
を他のメーカーに割り当てます。
サンプル入力:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
サンプル出力:
| Make |Dispatch_to_lane | Time |
| --- | --- | --- |
| Make1 |"A" |2023-01-01T00:00:01.0000000Z |
| Make2 |"B" |2023-01-01T00:00:02.0000000Z |
サンプルクエリ:
SELECT
Make
CASE
WHEN Make = "Make1" THEN "A"
ELSE "B"
END AS Dispatch_to_lane,
System.TimeStamp() AS Time
FROM
Input TIMESTAMP BY Time
CASE 式を使用して、式を一連の単純な式と比較して結果を決定します。 この例では、Make1
の車両がレーン A
にディスパッチされ、他のメーカーの車両にレーン B
が割り当てられます。
詳細については、「CASE 式」を参照してください。
データ変換
CAST メソッドを使用して、データをリアルタイムでキャストできます。 たとえば、自動車の重量を型 nvarchar(max) から型 bigint に変換し、数値計算で使用することができます。
サンプル入力:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
サンプル出力:
| Make | Weight |
| --- | --- |
| Make1 |3000 |
サンプルクエリ:
SELECT
Make,
SUM(CAST(Weight AS BIGINT)) AS Weight
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
CAST ステートメントを使用してデータ型を指定します。 サポートされるデータ型の一覧については、「データ型 (Azure Stream Analytics)」をご覧ください。
詳細については、データ変換関数に関するページを参照してください。
条件の期間を検出する
複数のイベントが対象となる条件の場合は、LAG 関数を使用して、その条件の期間を特定できます。 たとえば、バグのためにすべての自動車の重量が正しくない (20,000 ポンドを超過) 結果になった場合、バグが継続した期間を計算する必要があります。
サンプル入力:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |2000 |
| Make2 |2023-01-01T00:00:02.0000000Z |25000 |
| Make1 |2023-01-01T00:00:03.0000000Z |26000 |
| Make2 |2023-01-01T00:00:04.0000000Z |25000 |
| Make1 |2023-01-01T00:00:05.0000000Z |26000 |
| Make2 |2023-01-01T00:00:06.0000000Z |25000 |
| Make1 |2023-01-01T00:00:07.0000000Z |26000 |
| Make2 |2023-01-01T00:00:08.0000000Z |2000 |
サンプル出力:
| Start_fault | End_fault |
| --- | --- |
| 2023-01-01T00:00:02.000Z |2023-01-01T00:00:07.000Z |
サンプルクエリ:
WITH SelectPreviousEvent AS
(
SELECT
*,
LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previous_time,
LAG([weight]) OVER (LIMIT DURATION(hour, 24)) as previous_weight
FROM input TIMESTAMP BY [time]
)
SELECT
LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN previous_weight < 20000 ) [Start_fault],
previous_time [End_fault]
FROM SelectPreviousEvent
WHERE
[weight] < 20000
AND previous_weight > 20000
最初の SELECT ステートメントでは、現在の重量測定値を前の測定値と関連付け、現在の測定値と共に射影します。 2 番目の SELECT では、previous_weight が 20000 未満であり、現在の重量が 20000 未満で、現在のイベントの previous_weight が 20000 を超えていた最後のイベントを探します。
End_fault は、以前は問題があったが、現在は問題がないイベントです。Start_fault は、その前の問題がない最後のイベントです。
個別の時間でイベントを処理する (サブストリーム)
イベント プロデューサー間またはパーティション間のクロックのずれや、ネットワーク待機時間が原因でイベントが遅れて、あるいは順序がずれて到着することがあります。 たとえば、TollID 2 のデバイス クロックは TollID 1 より 5 秒遅れており、TollID 3 のデバイス クロックは TollID 1 より 10 秒遅れています。 計算は料金所ごとに個別に実行され、各料金所のクロック データだけがタイムスタンプとして考慮されます。
サンプル入力:
| LicensePlate | Make | Time | TollID |
| --- | --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:01.0000000Z | 1 |
| YHN 6970 |Make2 |2023-07-27T00:00:05.0000000Z | 1 |
| QYF 9358 |Make1 |2023-07-27T00:00:01.0000000Z | 2 |
| GXF 9462 |Make3 |2023-07-27T00:00:04.0000000Z | 2 |
| VFE 1616 |Make2 |2023-07-27T00:00:10.0000000Z | 1 |
| RMV 8282 |Make1 |2023-07-27T00:00:03.0000000Z | 3 |
| MDR 6128 |Make3 |2023-07-27T00:00:11.0000000Z | 2 |
| YZK 5704 |Make4 |2023-07-27T00:00:07.0000000Z | 3 |
サンプル出力:
| TollID | Count |
| --- | --- |
| 1 | 2 |
| 2 | 2 |
| 1 | 1 |
| 3 | 1 |
| 2 | 1 |
| 3 | 1 |
サンプルクエリ:
SELECT
TollId,
COUNT(*) AS Count
FROM input
TIMESTAMP BY Time OVER TollId
GROUP BY TUMBLINGWINDOW(second, 5), TollId
TIMESTAMP OVER BY 句では、サブストリームを使用して各デバイスのタイムラインで個別に検索します。 各 TollID の出力イベントは、計算されるときに生成されます。つまり、すべてのデバイスが同じクロックを参照しているように順序が変更されるのではなく、各 TollID を基準にしてイベントが順序付けられます。
詳細については、TIMESTAMP BY OVER に関するページを参照してください。
セッション ウィンドウ
セッション ウィンドウは、イベントが発生している間、拡張し続ける時間枠であり、一定の時間イベントが受信されない場合、または時間枠が最大継続時間に達したときに閉じられ、計算が行われます。 この時間枠は、ユーザー操作データを計算する場合に特に役立ちます。 時間枠は、ユーザーがシステムを操作し始めると開始し、イベントがそれ以上観測されなくなると (つまり、ユーザーが操作をやめると) 終了します。 たとえば、ユーザーが Web ページを操作していて、クリック回数がログに記録される場合、セッション ウィンドウを使用して、ユーザーがサイトを操作した時間を調べることができます。
サンプル入力:
| User_id | Time | URL |
| --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | "www.example.com/a.html" |
| 0 | 2017-01-26T00:00:20.0000000Z | "www.example.com/b.html" |
| 1 | 2017-01-26T00:00:55.0000000Z | "www.example.com/c.html" |
| 0 | 2017-01-26T00:01:10.0000000Z | "www.example.com/d.html" |
| 1 | 2017-01-26T00:01:15.0000000Z | "www.example.com/e.html" |
サンプル出力:
| User_id | StartTime | EndTime | Duration_in_seconds |
| --- | --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | 2017-01-26T00:01:10.0000000Z | 70 |
| 1 | 2017-01-26T00:00:55.0000000Z | 2017-01-26T00:01:15.0000000Z | 20 |
サンプルクエリ:
SELECT
user_id,
MIN(time) as StartTime,
MAX(time) as EndTime,
DATEDIFF(second, MIN(time), MAX(time)) AS duration_in_seconds
FROM input TIMESTAMP BY time
GROUP BY
user_id,
SessionWindow(minute, 1, 60) OVER (PARTITION BY user_id)
SELECT を使用して、ユーザー操作に関連するデータを操作時間と共に射影します。 ユーザーと SessionWindow でデータをグループ化します。このセッション ウィンドウでは 1 分間操作が発生しないと終了し、最大ウィンドウ サイズは 60 分です。
SessionWindow の詳細については、セッション ウィンドウに関するページを参照してください。
JavaScript および C# のユーザー定義関数
Azure Stream Analytics のクエリ言語は、JavaScript または C# 言語で記述されたカスタム関数を使用して拡張できます。 ユーザー定義関数 (UDF) とは、SQL 言語を使用して簡単に表現することができないカスタムまたは複雑な計算です。 これらの UDF は一度定義すると、クエリ内で複数回使用できます。 たとえば、UDF を使用して、16 進数の nvarchar(max) 値を bigint 値に変換できます。
サンプル入力:
| Device_id | HexValue |
| --- | --- |
| 1 | "B4" |
| 2 | "11B" |
| 3 | "121" |
サンプル出力:
| Device_id | Decimal |
| --- | --- |
| 1 | 180 |
| 2 | 283 |
| 3 | 289 |
function hex2Int(hexValue){
return parseInt(hexValue, 16);
}
public static class MyUdfClass {
public static long Hex2Int(string hexValue){
return int.Parse(hexValue, System.Globalization.NumberStyles.HexNumber);
}
}
SELECT
Device_id,
udf.Hex2Int(HexValue) AS Decimal
From
Input
ユーザー定義関数は、使用されるすべてのイベントに対して HexValue の bigint 値を計算します。
詳細については、JavaScript と C# に関するページを参照してください。
MATCH_RECOGNIZE を使用した高度なパターン マッチング
MATCH_RECOGNIZE は、イベントのシーケンスを適切に定義された正規表現パターンに一致させるために使用できる高度なパターン マッチング メカニズムです。 たとえば、ATM の障害をリアルタイムで監視しており、ATM の操作時に管理者への通知が必要な警告メッセージが 2 つ連続して発生したとします。
入力:
| ATM_id | Operation_id | Return_Code | Time |
| --- | --- | --- | --- |
| 1 | "Entering Pin" | "Success" | 2017-01-26T00:10:00.0000000Z |
| 2 | "Opening Money Slot" | "Success" | 2017-01-26T00:10:07.0000000Z |
| 2 | "Closing Money Slot" | "Success" | 2017-01-26T00:10:11.0000000Z |
| 1 | "Entering Withdraw Quantity" | "Success" | 2017-01-26T00:10:08.0000000Z |
| 1 | "Opening Money Slot" | "Warning" | 2017-01-26T00:10:14.0000000Z |
| 1 | "Printing Bank Balance" | "Warning" | 2017-01-26T00:10:19.0000000Z |
出力:
| ATM_id | First_Warning_Operation_id | Warning_Time |
| --- | --- | --- |
| 1 | "Opening Money Slot" | 2017-01-26T00:10:14.0000000Z |
SELECT *
FROM input TIMESTAMP BY time OVER ATM_id
MATCH_RECOGNIZE (
LIMIT DURATION(minute, 1)
PARTITION BY ATM_id
MEASURES
First(Warning.ATM_id) AS ATM_id,
First(Warning.Operation_Id) AS First_Warning_Operation_id,
First(Warning.Time) AS Warning_Time
AFTER MATCH SKIP TO NEXT ROW
PATTERN (Success+ Warning{2,})
DEFINE
Success AS Success.Return_Code = 'Success',
Warning AS Warning.Return_Code <> 'Success'
) AS patternMatch
このクエリは、少なくとも 2 つの連続するエラー イベントに一致し、条件が満たされたときにアラームを生成します。 PATTERN では、照合に使用される正規表現を定義します。このケースでは、少なくとも 1 つの成功した操作の後、少なくとも 2 つの連続した警告が続きます。 成功と警告は、Return_Code 値を使用して定義します。条件が満たされると、ATM_id、最初の警告操作、最初の警告時刻を使用して MEASURES が射影されます。
詳細については、MATCH_RECOGNIZE に関するページを参照してください。
ジオフェンシングおよび地理空間クエリ
Azure Stream Analytics には、フリート管理、ライド シェア、コネクテッド カー、および資産追跡などのシナリオを実装するために使用できる、組み込みの地理空間関数が用意されています。 地理空間データは、イベント ストリームまたは参照データの一部として、GeoJSON または WKT 形式で取り込むことができます。 たとえば、パスポート印刷機械の製造を専門とする会社が、政府機関や領事館に機械をリースします。 その機械を置き間違えてパスポートの偽造に使われることがないようにするため、そのような機械の場所は厳重に管理されます。 各機械には GPS トラッカーが搭載され、その情報は Azure Stream Analytics ジョブにリレーされます。 製造元では、これらの機械の場所を追跡し、そのうちの 1 つが承認済みのエリアを離れたときにアラートを受け取るようにします。こうすれば、製造元は装置をリモートで使用不可能にし、関係機関に注意喚起し、装置を取り返すことができます。
入力:
| Equipment_id | Equipment_current_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13288797982818 47.64082002051315)" | 2017-01-26T00:10:00.0000000Z |
| 1 | "POINT(-122.13307252987875 47.64081350934929)" | 2017-01-26T00:11:00.0000000Z |
| 1 | "POINT(-122.13308862313283 47.6406508603241)" | 2017-01-26T00:12:00.0000000Z |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
参照データ入力:
| Equipment_id | Equipment_lease_location |
| --- | --- |
| 1 | "POLYGON((-122.13326028450979 47.6409833866794,-122.13261655434621 47.6409833866794,-122.13261655434621 47.64061471602751,-122.13326028450979 47.64061471602751,-122.13326028450979 47.6409833866794))" |
出力:
| Equipment_id | Equipment_alert_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
SELECT
input.Equipment_id AS Equipment_id,
input.Equipment_current_location AS Equipment_current_location,
input.Time AS Time
FROM input TIMESTAMP BY time
JOIN
referenceInput
ON input.Equipment_id = referenceInput.Equipment_id
WHERE
ST_WITHIN(input.Equipment_current_location, referenceInput.Equipment_lease_location) = 1
このクエリを使用すると、製造元では機械の場所を自動的に監視し、機械が許可されたジオフェンスから離れたときにアラートを受け取ることができます。 組み込みの地理空間関数を活用すれば、サードパーティのライブラリを使用せずに、クエリ内で GPS データを使用できます。
詳細については、「Azure Stream Analytics を使用したジオフェンシングおよび地理空間集計のシナリオ」を参照してください。
ヘルプの参照
詳細については、Azure Stream Analytics に関する Microsoft Q&A 質問ページを参照してください。