Azure IoT Operations でデータ フローを構成する
重要
このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれます。 この機能はいくつかの制限を設けて提供されており、運用環境のワークロードには使用しないでください。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
データ フローとは、データがソースから宛先までたどるパスであり、必要に応じて変換が行われます。 データ フローを構成するには、Dataflow カスタム リソースを作成するか、Azure IoT Operations Studio ポータルを使用します。 データ フローは次の 3 つの部分で構成されます: ソース、変換、宛先。
ソースと宛先を定義するには、データ フロー エンドポイントを構成する必要があります。 変換は省略可能であり、データのエンリッチメント、データのフィルター処理、データの別のフィールドへのマッピングなどの操作を含めることができます。
重要
各データ フローでは、ソースまたは宛先の "いずれか" として、Azure IoT Operations ローカル MQTT ブローカーの既定のエンドポイントを使用する必要があります。
Azure IoT Operations の操作エクスペリエンスを使用して、データ フローを作成できます。 操作エクスペリエンスには、データ フローを構成するためのビジュアル インターフェイスが用意されています。 Bicep を使って Bicep テンプレート ファイルを使用するデータ フローを作成したり、Kubernetes を使って YAML ファイルを使用するデータ フローを作成したりすることもできます。
ソース、変換、宛先を構成する方法については、引き続きお読みください。
前提条件
既定のデータ フロー プロファイルとエンドポイントを使用して Azure IoT Operations のインスタンスが作成されたら、すぐにデータ フローを展開できます。 ただし、データ フロー プロファイルとエンドポイントを構成してデータ フローをカスタマイズしたい場合があります。
データ フロー プロファイル
データ フローに複数の異なるスケーリング設定が必要ない場合は、Azure IoT Operations が提供する既定のデータ フロー プロファイルを使用します。 データ フロー プロファイルを構成する方法については、「データ フロー プロファイルを構成する」を参照してください。
データ フロー エンドポイント
データ フローのソースと宛先を構成するには、データ フロー エンドポイントが必要です。 すぐに開始するには、ローカル MQTT ブローカー の既定のデータ フロー エンドポイントを使用できます。 Kafka、Event Hubs、Azure Data Lake Storage などの他の種類のデータ フロー エンドポイントを作成することもできます。 各種のデータ フロー エンドポイントを構成する方法については、「データ フロー エンドポイントを構成する」を参照してください。
作業の開始
前提条件を満たしたら、データ フローの作成を開始できます。
操作エクスペリエンスでデータ フローを作成するには、[データ フロー]>[データ フローを作成する] を選択します。 次に、データ フローのソース、変換、宛先を構成できるページが表示されます。
データ フローの操作の種類を構成する方法については、以下のセクションを参照してください。
ソース
データ フローのソースを構成するには、エンドポイント参照とエンドポイントのデータ ソースの一覧を指定します。 データ フローのソースとして、次のいずれかのオプションを選択します。
既定のエンドポイントがソースとして使用されていない場合は、宛先として使用する必要があります。 詳細については、「データ フローではローカル MQTT ブローカー エンドポイントの使用が必須」を参照してください。
オプション 1: 既定のメッセージ ブローカー エンドポイントをソースとして使用する
[ソースの詳細] で、[メッセージ ブローカー] を選択します。
メッセージ ブローカー ソースについて次の設定を入力します。
設定 説明 データ フロー エンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。 トピック 受信メッセージをサブスクライブするための MQTT トピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 トピックの詳細については、「MQTT または Kafka トピックの構成」を参照してください。 メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。 適用を選択します。
オプション 2: 資産をソースとして使用する
資産をデータ フローのソースとして使用できます。 資産をソースとして使用することは、操作エクスペリエンスでのみ使用できます。
[ソースの詳細] で、[資産] を選択します。
ソース エンドポイントとして使用する資産を選択します。
[続行] を選択します。
選択した資産のデータポイントの一覧が表示されます。
[適用] を選択して、資産をソース エンドポイントとして使用します。
資産をソースとして使用する場合、資産定義はデータ フローのスキーマを推論するために使用されます。 資産定義には、資産のデータ ポイントのスキーマが含まれます。 詳細については、「資産の構成をリモートで管理する」を参照してください。
構成が完了すると、資産からのデータは、ローカル MQTT ブローカーを介してデータ フローに到達します。 そのため、ソースとして資産を使用する場合、データ フローはローカル MQTT ブローカーの既定のエンドポイントを実際のソースとして使用します。
オプション 3: カスタム MQTT または Kafka データ フロー エンドポイントをソースとして使用する
カスタム MQTT または Kafka データ フロー エンドポイントを作成した場合 (たとえば、Event Grid または Event Hubs で使用する場合)、データ フローのソースとして使用できます。 Data Lake や Fabric OneLake などのストレージ タイプのエンドポイントは、ソースとして使用できないことに注意してください。
[ソースの詳細] で、[メッセージ ブローカー] を選択します。
メッセージ ブローカー ソースについて次の設定を入力します。
設定 説明 データ フロー エンドポイント カスタムの MQTT または Kafka データ フロー エンドポイントを選択するには、[再選択] ボタンを使用します。 詳細については、「MQTT データ フロー エンドポイントを構成する」または「Azure Event Hubs と Kafka データ フロー エンドポイントを構成する」を参照してください。 トピック 受信メッセージをサブスクライブするための MQTT トピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 トピックの詳細については、「MQTT または Kafka トピックの構成」を参照してください。 メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。 適用を選択します。
データ ソースを構成する (MQTT または Kafka のトピック)
データ フロー エンドポイントの構成を変更しなくても、ソースに複数の MQTT または Kafka トピックを指定できます。 この柔軟性により、トピックが異なる場合でも、複数のデータ フロー間で同じエンドポイントを再利用できます。 詳細については、データ フロー エンドポイントの再利用に関するセクションを参照してください。
MQTT のトピック
ソースが MQTT (Event Grid を含む) エンドポイントの場合は、MQTT トピック フィルターを使用して受信メッセージをサブスクライブできます。 トピック フィルターには、複数のトピックをサブスクライブするためのワイルドカードを含めることができます。 たとえば、thermostats/+/telemetry/temperature/#
はサーモスタットからのすべての温度テレメトリ メッセージをサブスクライブします。 MQTT トピック フィルターを構成するには:
操作エクスペリエンスのデータ フローの [ソースの詳細] で [メッセージ ブローカー] を選択してから、[トピック] フィールドを使用して、受信メッセージをサブスクライブするための MQTT トピック フィルターを指定します。 [行の追加] を選択し、新しいトピックを入力することで、複数の MQTT トピックを追加できます。
共有サブスクリプション
メッセージ ブローカー ソースで共有サブスクリプションを使用するには、$shared/<GROUP_NAME>/<TOPIC_FILTER>
の形式で共有サブスクリプション トピックを指定します。
操作エクスペリエンスのデータ フローの [ソースの詳細] で、[メッセージ ブローカー] を選択し、[トピック] フィールドを使用して共有サブスクリプション グループとトピックを指定します。
データ フロー プロファイルのインスタンス数が 1 より大きい場合は、メッセージ ブローカー ソースを使用するすべてのデータ フローに対して共有サブスクリプションが自動で有効になります。 この場合、$shared
プレフィックスが追加され、共有サブスクリプション グループ名が自動的に生成されます。 たとえば、インスタンス数が 3 のデータ フロー プロファイルがあり、データ フローがトピック topic1
と topic2
で構成されたソースとしてメッセージ ブローカー エンドポイントを使用している場合、それらは自動的に共有サブスクリプションに $shared/<GENERATED_GROUP_NAME>/topic1
および $shared/<GENERATED_GROUP_NAME>/topic2
として変換されます。
構成内に $shared/mygroup/topic
という名前のトピックを明示的に作成できます。 ただし、$shared
プレフィックスは必要に応じて自動的に追加されるため、$shared
トピックを明示的に追加することはお勧めしません。 データ フローが設定されていない場合は、グループ名を使用して最適化を行うことができます。 たとえば、$share
は設定されておらず、データ フローはトピック名に対してのみ動作する必要がある場合があります。
重要
インスタンス数が 1 つ以上のときに共有サブスクリプションを必要とするデータ フローは、Event Grid MQTT ブローカーをソースとして使用する場合に、共有サブスクリプションをサポートしていないため重要となります。 メッセージの欠落を回避するため、Event Grid MQTT ブローカーをソースとして使用するときは、データ フロー プロファイル インスタンス数を 1 に設定します。 これは、データ フローがサブスクライバーで、クラウドからメッセージを受信する場合です。
Kafka トピック
ソースが Kafka (Event Hubs を含む) エンドポイントの場合は、受信メッセージをサブスクライブする個々の Kafka トピックを指定します。 ワイルドカードはサポートされていないため、各トピックを静的に指定する必要があります。
Note
Kafka エンドポイント経由で Event Hubs を使用する場合、名前空間内の個々のイベント ハブは Kafka トピックです。 たとえば、thermostats
と humidifiers
の 2 つのイベント ハブを含む Event Hubs 名前空間がある場合、各イベント ハブを Kafka トピックとして指定できます。
Kafka トピックを構成するには:
操作エクスペリエンスのデータ フローの [ソースの詳細] で [メッセージ ブローカー] を選択してから、[トピック] フィールドを使用して、受信メッセージをサブスクライブするための Kafka トピック フィルターを指定します。
Note
操作エクスペリエンスで指定できるトピック フィルターは 1 つだけです。 複数のトピック フィルターを使用するには、Bicep または Kubernetes を使用してください。
ソース スキーマを指定する
MQTT または Kafka をソースとして使用する場合は、スキーマを指定して、操作エクスペリエンス ポータルにデータ ポイントの一覧を表示できます。 受信メッセージの逆シリアル化および検証のためのスキーマの使用は、現在サポートされていません。
ソースが資産の場合、スキーマは資産定義から自動的に推論されます。
ヒント
サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。
ソースからの受信メッセージを逆シリアル化するために使用するスキーマを構成するには:
操作エクスペリエンスのデータ フローの [ソースの詳細] で [メッセージ ブローカー] を選択し、[メッセージ スキーマ] フィールドを使用してスキーマを指定します。 [アップロード] ボタンを使用して、最初にスキーマ ファイルをアップロードできます。 詳細については、「メッセージ スキーマを理解する」を参照してください。
詳細については、「メッセージ スキーマを理解する」を参照してください。
変換
変換操作では、宛先に送信する前にソースからのデータを変換できます。 変換は省略可能です。 データを変更する必要がない場合は、データ フロー構成に変換操作を含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。 ステージの順序は常に次のようになります。
- エンリッチ: 一致するデータセットと条件を指定して、ソース データにデータを追加します。
- フィルター: 条件に基づいてデータをフィルター処理します。
- [マップ]、[コンピューティング]、[名前の変更]、または [新しいプロパティ] の追加: 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します。
このセクションでは、データ フロー変換の概要について説明します。 詳細については、「データ フローを使用してデータをマップする」、「データ フロー変換を使用してデータを変換する」、「データ フローを使用してデータをエンリッチする」を参照してください。
操作エクスペリエンスで、[データ フロー]>[変換の追加 (省略可能)] を選びます。
エンリッチ: 参照データを追加する
データをエンリッチするには、まず Azure IoT Operations の状態ストアに参照データセットを追加します。 データセットは、条件に基づいてソース データにさらにデータを追加するために使用されます。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。
状態ストア CLI を使用して、状態ストアにサンプル データを読み込むことができます。 状態ストア内のキー名は、データ フロー構成のデータセットに対応します。
現在、"エンリッチ" ステージは操作エクスペリエンスではサポートされていません。
データセットに asset
フィールドを持つレコードがある場合は、次のようになります。
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
thermostat1
と一致する deviceId
フィールドを持つソースのデータには、フィルターとマップのステージで使用できる location
と manufacturer
のフィールドがあります。
条件構文の詳細については、「データ フローを使用してデータをエンリッチする」と「データ フローを使用したデータの変換」に関する記事を参照してください。
フィルター: 条件に基づいてデータをフィルター処理する
条件に基づいてデータをフィルター処理するには、filter
ステージを使用できます。 条件は、値と一致するソース データ内のフィールドとして指定されます。
[変換 (省略可能)] で、[フィルター]>[追加] を選びます。
必要な設定を入力します。
設定 説明 フィルターの条件 ソース データのフィールドに基づいてデータをフィルター処理する条件。 説明 フィルター条件の説明を入力します。 フィルター条件フィールドに「
@
」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。条件では、ソース データのフィールドを使用できます。 たとえば、
@temperature > 20
のようなフィルター条件を使用して、温度フィールドに基づいて 20 以下のデータをフィルター処理できます。適用を選択します。
マップ: あるフィールドから別のフィールドにデータを移動する
省略可能な変換を使用してデータを別のフィールドにマップするには、map
操作を使用できます。 変換は、ソース データのフィールドを使用する数式として指定されます。
操作エクスペリエンスでは、マッピングは現在 [コンピューティング]、[名前変更]、[新しいプロパティ] の各変換を使用してサポートされています。
Compute
[コンピューティング] 変換を使用して、ソース データに数式を適用できます。 この操作は、ソース データに数式を適用し、結果フィールドを格納するために使用されます。
[変換 (省略可能)] で、[コンピューティング]>[追加] を選びます。
必要な設定を入力します。
設定 説明 数式を選択する ドロップダウンから既存の数式を選択するか、[カスタム] を選択して手動で数式を入力します。 出力 結果の出力表示名を指定します。 式 ソース データに適用する数式を入力します。 説明 変換の説明を入力します。 最後の既知の値 必要に応じて、現在の値が使用できない場合は、最後の既知の値を使用します。 [数式] フィールドに数式を入力するか、既存の数式を編集できます。 数式には、ソース データ内のフィールドを使用できます。 「
@
」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。 組み込みの数式の場合は、<dataflow>
プレースホルダーを選択して、使用可能なデータ ポイントの一覧を表示します。MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。数式には、ソース データ内のフィールドを使用できます。 たとえば、ソース データの
temperature
フィールドを使用して温度を摂氏に変換し、それをtemperatureCelsius
出力フィールドに格納できます。適用を選択します。
名前の変更
[名前の変更] 変換を使用して、データポイントの名前を変更できます。 この操作は、ソース データ内のデータポイントの名前を新しい名前に変更するために使用されます。 新しい名前は、データ フローの後続のステージで使用できます。
[変換 (省略可能)] で、[名前の変更]>[追加] を選びます。
必要な設定を入力します。
設定 説明 データポイント ドロップダウンからデータポイントを選択するか、$metadata ヘッダーを入力します。 新しいデータポイント名 データポイントの新しい名前を入力します。 説明 変換の説明を入力します。 MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。適用を選択します。
新しいプロパティ
[新しいプロパティ] 変換を使用して、ソース データに新しいプロパティを追加できます。 この操作は、ソース データに新しいプロパティを追加するために使用されます。 新しいプロパティは、データ フローの後続のステージで使用できます。
[変換 (省略可能)] で、[新しいプロパティ]>[追加] を選びます。
必要な設定を入力します。
設定 説明 プロパティキー 新しいプロパティのキーを入力します。 プロパティ値 新しいプロパティの値を入力します。 説明 新しいプロパティの説明を入力します。 適用を選択します。
詳細については、「データ フローを使用してデータをマッピングする」と「データ フローを使用したデータの変換」に関する記事を参照してください。
スキーマに従ってデータをシリアル化する
データを宛先に送信する前にシリアル化する場合は、スキーマとシリアル化形式を指定する必要があります。 それ以外の場合は、データは推論された型を使用して JSON でシリアル化されます。 Microsoft Fabric や Azure Data Lake などのストレージ エンドポイントには、データの一貫性を確保するためにスキーマが必要です。 サポートされているシリアル化形式は、Parquet と Delta です。
ヒント
サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。
操作エクスペリエンスの場合は、データ フロー エンドポイントの詳細でスキーマとシリアル化の形式を指定します。 シリアル化形式をサポートするエンドポイントは、Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure Data Explorer、およびローカル ストレージです。 たとえば、差分形式でデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、データ フロー変換先エンドポイント構成で参照する必要があります。
スキーマ レジストリの詳細については、「メッセージ スキーマについて」を参照してください。
宛先
データ フローの宛先を構成するには、エンドポイント参照とデータ宛先を指定します。 エンドポイントのデータの宛先の一覧を指定できます。
ローカル MQTT ブローカー以外の宛先にデータを送信するには、データ フロー エンドポイントを作成します。 その方法については、データ フロー エンドポイントの構成に関するページを参照してください。 宛先がローカル MQTT ブローカーでない場合は、ソースとして使用する必要があります。 詳細については、「データ フローではローカル MQTT ブローカー エンドポイントの使用が必須」を参照してください。
重要
ストレージ エンドポイントには、シリアル化にスキーマが必要です。 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル記憶域でデータ フローを使用するには、スキーマ参照を指定する必要があります。
宛先として使用するデータ フロー エンドポイントを選択します。
ストレージ エンドポイントには、シリアル化にスキーマが必要です。 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル ストレージの宛先エンドポイントを選択する場合は、スキーマ参照を指定する必要があります。 たとえば、データを Delta 形式で Microsoft Fabric エンドポイントにシリアル化するには、スキーマをスキーマ レジストリにアップロードし、データ フローの宛先エンドポイントの構成内でそれを参照する必要があります。
[続行] を選択して、宛先を構成します。
データの送信先となるトピックやテーブルなど、宛先に必要な設定を入力します。 詳細については、「データの宛先の構成 (トピック、コンテナー、またはテーブル)」を参照してください。
データの宛先 (トピック、コンテナー、またはテーブル) を構成する
データ ソースと同様に、データの宛先は、複数のデータ フロー間でデータ フロー エンドポイントを再利用可能に保つために使用される概念です。 基本的には、データ フロー エンドポイント構成のサブディレクトリを表します。 たとえば、データ フロー エンドポイントがストレージ エンドポイントの場合、データの宛先はストレージ アカウント内のテーブルです。 データ フロー エンドポイントが Kafka エンドポイントの場合、データの宛先は Kafka トピックです。
エンドポイントの種類 | データの宛先の意味 | 説明 |
---|---|---|
MQTT (または Event Grid) | トピック | データが送信される MQTT トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 |
Kafka (または Event Hubs) | トピック | データが送信される Kafka トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 エンドポイントが Event Hubs 名前空間の場合、データの宛先は名前空間内の個々のイベント ハブです。 |
Azure Data Lake Storage | コンテナー | ストレージ アカウントのコンテナー。 テーブルではありません。 |
Microsoft Fabric OneLake | ファイルまたはフォルダー | 構成済みのエンドポイントのパスの種類に対応します。 |
Azure Data Explorer | テーブル | Azure Data Explorer データベース内のテーブル。 |
ローカル ストレージ | フォルダー | ローカル ストレージの永続ボリューム マウント内のフォルダーまたはディレクトリ名。
Azure Arc クラウド取り込みエッジ ボリュームで有効な Azure コンテナー ストレージを使用する場合、これは、作成したサブボリュームの spec.path パラメーターと一致する必要があります。 |
データの宛先を構成するには:
操作エクスペリエンスを使用する場合、データの宛先フィールドはエンドポイントの種類に基づいて自動的に解釈されます。 たとえば、データ フロー エンドポイントがストレージ エンドポイントの場合、宛先の詳細ページでコンテナー名の入力が求められます。 データ フロー エンドポイントが MQTT エンドポイントである場合、宛先の詳細ページでトピックの入力などが求められます。
例
次の例は、ソースと宛先に MQTT エンドポイントを使用したデータ フロー構成です。 ソースは、MQTT トピック azure-iot-operations/data/thermostat
からのデータをフィルターします。 変換によって温度が華氏に変換され、温度に湿度を掛けた値が 100,000 未満のデータにフィルターされます。 宛先が MQTT トピック factory
にデータを送信します。
データ フロー構成のその他の例については、Azure REST API (データ フロー) と Bicep のクイックスタートに関するページを参照してください。
データ フローが機能していることを確認する
「チュートリアル: Azure Event Grid への双方向 MQTT ブリッジ」に従って、データ フローが機能していることを確認します。
データ フロー構成のエクスポート
データ フロー構成をエクスポートするには、操作エクスペリエンスを使用するか、Dataflow カスタム リソースをエクスポートします。
エクスポートするデータ フローを選択し、ツール バーから [エクスポート] を選びます。
適切なデータ フロー構成
データ フローが期待どおりに動作していることを確認するには、次のことを確かめします:
- 既定の MQTT データ フロー エンドポイントは、ソースまたは宛先のいずれかとして使用される必要があります。
- データ フロー プロファイルが存在し、データ フロー構成で参照されます。
- ソースは、MQTT エンドポイント、Kafka エンドポイント、資産のいずれかです。 ストレージの種類のエンドポイントは、ソースとして使用できません。
- Event Grid をソースとして使用する場合、Event Grid MQTT ブローカーは共有サブスクリプションをサポートしていないため、データフロー プロファイル インスタンス数は 1 に設定されます。
- Event Hubs をソースとして使用する場合、名前空間内の各イベント ハブは個別の Kafka トピックであり、データ ソースとして指定する必要があります。
- 変換を使用する場合は、特殊文字の適切なエスケープを含む、適切な構文で構成します。
- ストレージの種類のエンドポイントを宛先として使用する場合は、スキーマを指定します。