Azure Stream Analytics でクエリの並列処理を使用する
この記事では、Azure Stream Analytics で並列処理を活用する方法を示します。 入力パーティションの構成と分析クエリ定義のチューニングによって Stream Analytics ジョブをスケールする方法について説明します。
前提条件として、「ストリーミング ユニットを効率的に使用できるようにジョブを最適化する」で説明されているストリーミング ユニットの概念について理解しておく必要があります。
Stream Analytics ジョブの構成について教えてください。
Stream Analytics のジョブ定義には少なくとも、入力、クエリ、出力が含まれています。 入力は、ジョブがデータ ストリームを読み取る場所です。 クエリは、データ入力ストリームを変換するために使用されます。出力は、ジョブ結果の送信先です。
入力と出力のパーティション
パーティション分割すると、パーティション キーに基づいてデータをサブセットに分割できます。 入力 (たとえば Event Hubs) がキーによってパーティション分割されている場合、Stream Analytics ジョブに入力を追加するとき、パーティション キーを指定することをお勧めします。 Stream Analytics ジョブのスケーリングでは、入力と出力でパーティションを利用します。 Stream Analytics ジョブでは、さまざまなパーティションを同時に使用し、書き込みを実行できるので、スループットが向上します。
入力
すべての Azure Stream Analytics ストリーミング入力において、パーティション分割 (Event Hubs、IoT Hub、Blob Storage、Data Lake Storage Gen2) を利用できます。
Note
互換性レベル 1.2 以降では、パーティション キーは入力プロパティとして設定され、クエリにおいて PARTITION BY キーワードを用いる必要はありません。 互換性レベル 1.1 以前では、パーティション キーはその代わりにクエリにおいて PARTITION BY キーワードを用いて定義される必要があります。
出力
Azure Stream Analytics を使用するときは、出力でパーティション分割を利用できます。
- Azure Data Lake Storage
- Azure Functions
- Azure テーブル
- Blob Storage (パーティション キーを明示的に設定できます)
- Azure Cosmos DB (パーティション キーを明示的に設定する必要があります)
- Event Hubs (パーティション キーを明示的に設定する必要があります)
- IoT Hub (パーティション キーを明示的に設定する必要があります)
- Service Bus
- オプションのパーティション分割を使用した SQL および Azure Synapse Analytics: 詳細については、Azure SQL Database への出力に関するページを参照してください。
Power BI では、パーティション分割がサポートされていません。 ただし、このセクションの説明に従って入力をパーティション分割することはできます。
パーティションの詳細については、次の記事をご覧ください。
クエリ
ジョブを並列にするために、すべての入力、すべてのクエリ ロジック ステップ、およびすべての出力の間でパーティション キーを調整する必要があります。 クエリ ロジックのパーティション分割は、結合と集約のために使われるキー (GROUP BY) によって決まります。 クエリ ロジックにキーが設定されていない場合 (プロジェクション、フィルター、参照結合...)、この最後の要件は無視できます。
- 入力と出力が
WarehouseId
によってパーティション分割され、クエリ グループがWarehouseId
のないProductId
によってパーティション分割されている場合、ジョブは並列ではありません。 - 結合する 2 つの入力が異なるパーティション キー (
WarehouseId
とProductId
) によってパーティション分割されている場合、ジョブは並列ではありません。 - 2 つ以上の独立したデータ フローが 1 つのジョブに含まれていて、それぞれが独自のパーティション キーを持っているならば、ジョブは並列ではありません。
すべての入力、出力、クエリ ステップで同じキーが使用されている場合にのみ、ジョブは並列です。
驚異的並列ジョブ
驚異的並列ジョブは、Azure Stream Analytics において最もスケーラブルなシナリオです。 入力の 1 つのパーティションを、出力の 1 つのパーティションに対するクエリの 1 つのインスタンスに接続します。 この並列処理には次の要件があります。
クエリ ロジックが同じクエリ インスタンスによって処理される同じキーに依存する場合、イベントが入力の同じパーティションに送信されるようにする必要があります。 Event Hubs または IoT Hub の場合、イベント データに PartitionKey 値が設定されている必要があります。 代わりに、パーティション分割された送信元を使用することもできます。 Blob Storage の場合、イベントが同じパーティション フォルダーに送信される必要があります。 たとえば、クエリ インスタンスで userID 別にデータを集計するとき、パーティション キーとして userID を使用し、入力イベント ハブがパーティション分割されます。 ただし、クエリ ロジックが、同一のクエリ インスタンスによって処理されるために同一のキーを必要としない場合は、この要件を無視してもかまいません。 このロジックの例として、単純な select-project-filter クエリがあります。
次の手順は、クエリをパーティション分割するためのものです。 互換性レベル 1.2 以上 (推奨) のジョブの場合、入力設定でカスタム列をパーティション キーとして指定でき、ジョブは自動的に並列になります。 互換性レベル 1.0 または 1.1 のジョブでは、クエリのあらゆる手順で PartitionId 別のパーティションを使用する必要があります。 複数のステップが許可されますが、すべてのステップが同じキーでパーティション分割されている必要があります。
Stream Analytics でサポートされている出力のほとんどでは、パーティション分割が活用されます。 パーティション分割をサポートしていない種類の出力を使用する場合、ジョブは "驚異的並列" になりません。 Event Hubs の出力については、パーティション キー列が、クエリで使われているものと同じパーティション キーに設定されていることを確認してください。 詳細については、「出力」セクションを参照してください。
入力パーティションの数が出力パーティションの数と同じである必要があります。 Blob Storage 出力では、パーティションをサポートでき、アップストリーム クエリのパーティション構成を継承します。 Blob Storage のパーティション キーを指定すると、データが入力パーティションごとにパーティション分割されるため、結果も完全に並列になります。 完全な並列ジョブを可能にするパーティション値の例を次に示します。
- 8 個の event hub 入力パーティションと 8 個の event hub 出力パーティション
- 8 個の event hub 入力パーティションと blob storage 出力
- 8 個の event hub 入力パーティションと任意のカーディナリティを持つカスタム フィールドによってパーティション分割された blob storage 出力
- 8 個の blob storage 入力パーティションと blob storage 出力
- 8 個の blob storage 入力パーティションと 8 個のイベント ハブ出力パーティション
以下のセクションでは、驚異的並列であるシナリオの例を示します。
単純なクエリ
- 入力: 8 個のパーティションを持つイベント ハブ
- 出力: 8 個のパーティションを持つイベント ハブ ("パーティション キー列" は
PartitionId
を使うように設定する必要があります)
クエリ:
--Using compatibility level 1.2 or above
SELECT TollBoothId
FROM Input1
WHERE TollBoothId > 100
--Using compatibility level 1.0 or 1.1
SELECT TollBoothId
FROM Input1 PARTITION BY PartitionId
WHERE TollBoothId > 100
このクエリは単純なフィルターです。 そのため、イベント ハブに送信される入力のパーティション分割を気にする必要はありません。 互換性レベルが 1.2 より前のジョブには、上記の要件 2 を満たすために、PARTITION BY PartitionId 句を含める必要があることに注意してください。 出力については、パーティション キーが PartitionId に設定されたイベント ハブ出力をジョブで構成する必要があります。 最後に、入力パーティションと出力パーティションの数が同じであることを確認します。
グループ化キーが含まれたクエリ
- 入力: 8 個のパーティションを持つ Event Hub
- 出力:BLOB ストレージ
クエリ:
--Using compatibility level 1.2 or above
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
--Using compatibility level 1.0 or 1.1
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
このクエリにはグループ化キーが含まれています。 そのため、グループ化されたイベントは、同じ Event Hubs パーティションに送信される必要があります。 この例では TollBoothID でグループ化するため、イベントが Event Hubs に送信されるときに、パーティション キーとして TollBoothID
が使用されることを確認する必要があります。 その後、Azure Stream Analytics で、PARTITION BY PartitionId を使用して、このパーティション構成から継承し、完全な並列処理を有効にすることができます。 出力は Blob Storage であるため、要件 4. に記載されているように、パーティション キー値の構成を気にする必要はありません。
驚異的並列ではないシナリオの例
前のセクションでは、驚異的並列のシナリオをいくつか記事で紹介しました。 このセクションでは、驚異的並列の一部の要件を満たしていないシナリオについて説明します。
パーティション数の不一致
- 入力: 8 個のパーティションを持つイベント ハブ
- 出力: 32 個のパーティションを持つイベント ハブ
入力パーティション数と出力パーティション数が一致しない場合、クエリに関係なく、トポロジは驚異的並列ではありません。 ただし、ある程度の並列化は可能です。
パーティション分割されていない出力を使用したクエリ
- 入力: 8 個のパーティションを持つイベント ハブ
- 出力:Power BI
現在、Power BI 出力ではパーティション分割がサポートされていません。 そのため、このシナリオは驚異的並列ではありません。
PARTITION BY 値が異なる複数ステップのクエリ
- 入力: 8 個のパーティションを持つ Event Hub
- 入力: 8 個のパーティションを持つ Event Hub
- 互換性レベル:1.0 または 1.1
クエリ:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1 Partition By TollBoothId
GROUP BY TumblingWindow(minute, 3), TollBoothId
ご覧のように、2 番目のステップは TollBoothId をパーティション キーとして使用しています。 このステップは最初のステップと異なりますので、シャッフルを実行する必要があります。
PARTITION BY 値が異なる複数ステップのクエリ
- 入力: 8 つのパーティションを持つ Event Hub ([パーティション キー列] は設定されていません。既定値の [PartitionId] です)
- 出力: 8 個のパーティションを持つ Event Hub ([パーティション キー列] は [TollBoothId] を使うように設定する必要があります)
- 互換性レベル - 1.2 以上
クエリ:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
互換性レベル 1.2 以上では、既定で並列クエリの実行が可能です。 たとえば、前のセクションのクエリは、"TollBoothId" 列が入力パーティション キーとして設定されている限りパーティション分割されます。 PARTITION BY PartitionId 句は不要です。
ジョブのストリーミング ユニットの最大数を計算する
Stream Analytics ジョブで使用できるストリーミング ユニットの合計数は、ジョブに定義されたクエリのステップ数と各ステップのパーティション数によって異なります。
クエリでのステップ
1 つのクエリに 1 つ以上のステップを含めることができます。 各ステップは、WITH キーワードで定義されたサブクエリです。 次のクエリの SELECT ステートメントのように、WITH キーワードの外にあるクエリ (1 つのクエリのみ) もステップとしてカウントされます。
クエリ:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute,3), TollBoothId
このクエリには 2 つのステップがあります。
注意
このクエリについては、この記事で後ほど詳しく説明します。
ステップをパーティション分割する
ステップをパーティション分割するには、次の条件を満たす必要があります。
- 入力ソースはパーティション分割する。
- クエリの SELECT ステートメントは、パーティション分割された入力ソースから読み取る。
- ステップ内のクエリに PARTITION BY キーワードを含める。
クエリをパーティション分割すると、入力イベントが処理されて個々のパーティション グループに集計され、グループごとに出力イベントが生成されます。 集計を結合する場合は、パーティション分割されていない 2 つ目のステップを集計用に作成する必要があります。
ジョブのストリーミング ユニットの最大数を計算する
パーティション分割されていないすべてのステップを、Stream Analytics ジョブの 1 個のストリーミング ユニット (SU V2) にスケールアップできます。 さらに、パーティション分割されたステップでパーティションごとに 1 つの SU V2 を追加できます。 例をいくつか次の表に示します。
クエリ | ジョブの最大 SU 数 |
---|---|
|
1 SU V2 |
|
16 SU V2 (1 * 16 パーティション) |
|
1 SU V2 |
|
4 SU V2 (パーティション分割されたステップ用に 3 + パーティション分割されていないステップ用に 1) |
スケーリングの例
次のクエリでは、3 つのブースがある料金所を 3 分間に通過する車の台数を計算します。 このクエリは、最大 1 個の SU V2 にスケールできます。
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
クエリに使用する SU を増やすには、入力データ ストリームとクエリの両方をパーティション分割する必要があります。 データ ストリーム パーティションが 3 に設定されているので、変更を加えた次のクエリを最大 3 個の SU V2 にスケールできます。
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
クエリがパーティション分割されている場合、入力イベントは処理されて個々のパーティション グループに集計されます。 出力イベントは、それぞれのグループに対しても生成されます。 GROUP BY フィールドが入力データ ストリームのパーティション キーでない場合、パーティション分割を実行すると予期しない結果を生じることがあります。 たとえば、前のクエリの TollBoothId フィールドは Input1 のパーティション キーではないとします。 そのため、TollBooth 1 のデータが複数のパーティションに分散される可能性があります。
Input1 の各パーティションは、Stream Analytics によって個別に処理されます。 その結果、同じタンブリング ウィンドウで同じ料金所ブースの複数の通過台数レコードが作成されます。 入力パーティション キーを変更できない場合は、次の例のように、パーティション分割されていないステップを追加してパーティション間の値を集計すると、この問題を解決できます。
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
このクエリは 4 個の SU V2 にスケールできます。
Note
2 つのストリームを結合する場合は、結合に使用する列のパーティション キーでストリームがパーティション分割されていることを確認します。 また、両方のストリームに同じ数のパーティションがあることも確認します。
大規模な高いスループットの実現
大規模な高いスループットを維持するために、驚異的並列ジョブが必要ですが十分ではありません。 すべてのストレージ システムおよびそれに対応する Stream Analytics 出力では、考えられる最高の書き込みスループットを実現する方法がそれぞれ異なっています。 他の大規模シナリオと同じように、適切な構成を使用することで解決できる課題がいくつかあります。 このセクションでは、一般的な出力をいくつか取り上げてその構成について説明し、1 秒あたり 1,000、5,000、10,000 のイベントのインジェスト率を維持するためのサンプルを提供します。
以下の観測記録は、ステートレス (パススルー) クエリを使用する Stream Analytics ジョブを使って行ったものです。このジョブは、Event Hubs、Azure SQL、または Azure Cosmos DB への書き込みを行う基本的な JavaScript UDF です。
Event Hubs
取り込み率 (1 秒あたりのイベント数) | ストリーミング ユニット数 | 出力リソース |
---|---|---|
1 K | 1/3 | 2 TU |
5 K | 1 | 6 TU |
10K | 2 | 10 TU |
Event Hubs ソリューションは、ストリーミング ユニット (SU) とスループットの観点から直線的にスケーリングをおこなうため、Stream Analytics からのデータを分析およびストリーム配信するための最も効率的かつパフォーマンスの高い方法となります。 ジョブは最大 66 SU V2 までスケールアップでき、これは概算して最大 400 MB/秒、あるいは 1 日あたり 38 兆個のイベントの処理に換算されます。
Azure SQL
取り込み率 (1 秒あたりのイベント数) | ストリーミング ユニット数 | 出力リソース |
---|---|---|
1 K | 2/3 | S3 |
5 K | 3 | P4 |
10K | 6 | P6 |
Azure SQL では、パーティション分割の継承と呼ばれる並列書き込みがサポートされていますが、既定では有効になっていません。 ただし、パーティション分割の継承と完全な並列クエリを一緒に有効にしても、高いスループットを実現するには不十分である場合があります。 SQL 書き込みスループットは、データベース構成およびテーブル スキーマに大きく依存します。 SQL 出力パフォーマンスに関する記事に、書き込みスループットを最大限に高めることができるパラメーターの詳細が記載されています。 Azure SQL Database への Azure Stream Analytics 出力に関する記事に記載されているように、このソリューションは 8 個のパーティションを超える完全な並列パイプラインとして直線的にスケールせず、SQL 出力の前に再分割が必要な場合があります (INTO に関する記事を参照してください)。 高い IO 率と、数分おきに発生するログ バックアップからのオーバーヘッドを維持するために、Premium SKU が必要です。
Azure Cosmos DB
取り込み率 (1 秒あたりのイベント数) | ストリーミング ユニット数 | 出力リソース |
---|---|---|
1 K | 2/3 | 20 K RU |
5 K | 4 | 60 K RU |
10K | 8 | 120 K RU |
Stream Analytics から出力される Azure Cosmos DB は、互換性レベル 1.2 のネイティブ統合を使用するように更新されました。 互換性レベル 1.2 は、新しいジョブの既定の互換性レベルである 1.1 に比べてスループットが著しく高く、RU 消費量が低下しています。 ソリューションは/deviceId 上にパーティション分割された Azure Cosmos DB コンテナーを使用し、ソリューションの残りの部分は全く同じように構成されます。
Azure の大規模なストリーミングのサンプルはすべて、負荷をシミュレートするテスト クライアントによってデータが取り込まれる Event Hubs を入力として使用します。 各入力イベントは 1 KB の JSON ドキュメントであり、構成されたインジェスト率からスループット レート (1 MB/秒、5 MB/秒、および 10 MB/秒) に容易に換算されます。 イベントは、最大 1,000 台のデバイス向けに以下の JSON データ (短縮された形式) を送信する IoT デバイスをシミュレートします。
{
"eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
"complexData": {
"moreData0": 51.3068118685458,
"moreData22": 45.34076957651598
},
"value": 49.02278128887753,
"deviceId": "contoso://device-id-1554",
"type": "CO2",
"createdAt": "2019-05-16T17:16:40.000003Z"
}
Note
構成は、ソリューションで使用されるさまざまなコンポーネントによって変更される可能性があります。 見積もりの精度を高めるには、ご使用のシナリオに合わせてサンプルをカスタマイズしてください。
ボトルネックの特定
Azure Stream Analytics ジョブの [メトリックス] ウィンドウを使用して、パイプラインのボトルネックを特定します。 スループットについての [Input/Output Events](入出力イベント) および [透かしの遅延] または [Backlogged Events](バックログされたイベント) を確認して、ジョブが入力速度に対応しているかどうかを確認します。 Event Hubs のメトリックスについては、スロットルされた要求 を検索し、その結果に基づいてしきい値ユニットを調整します。 Azure Cosmos DB メトリックスについては、スループットの下の [パーティション キーの範囲ごとの使用された最大 RU/秒] を確認して、パーティション キーの範囲が均一に消費されていることを確認します。 Azure SQL DB については、 [ログ IO] および [CPU] を監視します。
ヘルプの参照
詳細については、Azure Stream Analytics に関する Microsoft Q&A 質問ページを参照してください。