Azure IoT Operations でデータフローを構成する
重要
このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれます。 この機能はいくつかの制限を設けて提供されており、運用環境のワークロードには使用しないでください。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
データフローとは、データがソースから宛先までたどるパスであり、必要に応じて変換が行われます。 データフローを構成するには、Dataflow カスタム リソースを作成するか、Azure IoT Operations Studio ポータルを使用します。 データフローは次の 3 つの部分で構成されます: ソース、変換、宛先
ソースと宛先を定義するには、データフロー エンドポイントを構成する必要があります。 変換は省略可能であり、データのエンリッチメント、データのフィルター処理、データの別のフィールドへのマッピングなどの操作を含めることができます。
重要
各データフローでは、ソースまたは宛先の "いずれか" として、Azure IoT Operations ローカル MQTT ブローカーの既定のエンドポイントを使用する必要があります。
Azure IoT Operations の操作エクスペリエンスを使用して、データフローを作成できます。 操作エクスペリエンスには、データフローを構成するためのビジュアル インターフェイスが用意されています。 Bicep を使って Bicep テンプレート ファイルを使用するデータフローを作成したり、Kubernetes を使って YAML ファイルを使用するデータフローを作成したりすることもできます。
ソース、変換、宛先を構成する方法については、引き続きお読みください。
前提条件
既定のデータフロー プロファイルとエンドポイントを使用して Azure IoT Operations のインスタンスが作成されたら、すぐにデータフローを展開できます。 ただし、データフロー プロファイルとエンドポイントを構成してデータフローをカスタマイズしたい場合があります。
データフロー プロファイル
データフローに複数の異なるスケーリング設定が必要ない場合は、Azure IoT Operations が提供する既定のデータフロー プロファイルを使用します。 データフロー プロファイルを構成する方法については、「データフロー プロファイルを構成する」を参照してください。
データフロー エンドポイント
データフローのソースと宛先を構成するには、データフロー エンドポイントが必要です。 すぐに開始するには、ローカル MQTT ブローカー の既定のデータフロー エンドポイントを使用できます。 Kafka、Event Hubs、Azure Data Lake Storage などの他の種類のデータフロー エンドポイントを作成することもできます。 各種のデータフロー エンドポイントを構成する方法については、「データフロー エンドポイントを構成する」を参照してください。
作業の開始
前提条件を満たしたら、データフローの作成を開始できます。
操作エクスペリエンスでデータフローを作成するには、[データフロー]>[データフローを作成する] を選択します。 次に、データフローのソース、変換、宛先を構成できるページが表示されます。
データフローの操作の種類を構成する方法については、以下のセクションを参照してください。
ソース
データフローのソースを構成するには、エンドポイント参照とエンドポイントのデータ ソースの一覧を指定します。 データフローのソースとして、次のいずれかのオプションを選択します。
既定のエンドポイントがソースとして使用されていない場合は、宛先として使用する必要があります。 詳細については、「データフローではローカル MQTT ブローカー エンドポイントの使用が必須」を参照してください。
オプション 1: 既定の MQTT エンドポイントをソースとして使用する
[ソースの詳細] で、[MQTT] を選択します。
MQTT ソースの次の設定を入力します。
設定 説明 MQTT トピック 受信メッセージをサブスクライブする MQTT トピック フィルター。 MQTT または Kafka の構成に関するトピックを参照してください。 メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。 適用を選択します。
オプション 2: 資産をソースとして使用する
資産をデータフローのソースとして使用できます。 資産をソースとして使用することは、操作エクスペリエンスでのみ使用できます。
[ソースの詳細] で、[資産] を選択します。
ソース エンドポイントとして使用する資産を選択します。
[続行] を選択します。
選択した資産のデータポイントの一覧が表示されます。
[適用] を選択して、資産をソース エンドポイントとして使用します。
資産をソースとして使用する場合、資産定義はデータフローのスキーマを推論するために使用されます。 資産定義には、資産のデータ ポイントのスキーマが含まれます。 詳細については、「資産の構成をリモートで管理する」を参照してください。
構成後、資産からのデータは、ローカル MQTT ブローカーを介してデータフローに到達しました。 そのため、ソースとして資産を使用する場合、データフローはローカル MQTT ブローカーの既定のエンドポイントを実際のソースとして使用します。
オプション 3: カスタム MQTT または Kafka データフロー エンドポイントをソースとして使用する
カスタム MQTT または Kafka データフロー エンドポイントを作成した場合 (たとえば、Event Grid または Event Hubs で使用する場合)、データフローのソースとして使用できます。 Data Lake や Fabric OneLake などのストレージ タイプのエンドポイントは、ソースとして使用できないことに注意してください。
構成するには、Kubernetes YAML または Bicep を使用します。 プレースホルダーの値をカスタム エンドポイントの名前とトピックに置き換えます。
カスタムの MQTT または Kafka エンドポイントのソースとしての使用は、現在、操作エクスペリエンスではサポートされていません。
データ ソースを構成する (MQTT または Kafka のトピック)
データフロー エンドポイントの構成を変更しなくても、ソースに複数の MQTT または Kafka トピックを指定できます。 この柔軟性により、トピックが異なる場合でも、複数のデータフロー間で同じエンドポイントを再利用できます。 詳細については、データフロー エンドポイントの再利用に関するセクションを参照してください。
MQTT のトピック
ソースが MQTT (Event Grid を含む) エンドポイントの場合は、MQTT トピック フィルターを使用して受信メッセージをサブスクライブできます。 トピック フィルターには、複数のトピックをサブスクライブするためのワイルドカードを含めることができます。 たとえば、thermostats/+/telemetry/temperature/#
はサーモスタットからのすべての温度テレメトリ メッセージをサブスクライブします。 MQTT トピック フィルターを構成するには:
操作エクスペリエンスのデータフローの [ソースの詳細] で [MQTT] を選択し、[MQTT トピック フィールド] を使用して、受信メッセージをサブスクライブする MQTT トピック フィルターを指定します。
Note
操作エクスペリエンスで指定できる MQTT トピック フィルターは 1 つだけです。 複数の MQTT トピック フィルターを使用するには、Bicep または Kubernetes を使用してください。
共有サブスクリプション
MQTT ソースで共有サブスクリプションを使用するには、$shared/<GROUP_NAME>/<TOPIC_FILTER>
の形式で共有サブスクリプション トピックを指定します。
操作エクスペリエンスのデータフローの [ソースの詳細] で、[MQTT] を選択し、[MQTT トピック] フィールドを使用して共有サブスクリプション グループとトピックを指定します。
データフロー プロファイルのインスタンス数が 1 より大きい場合、共有サブスクリプションが MQTT ソースを使用するすべてのデータフローに対して自動で有効になります。 この場合、$shared
プレフィックスが追加され、共有サブスクリプション グループ名が自動的に生成されます。 たとえば、インスタンス数が 3 のデータフロー プロファイルがあり、データフローがトピック topic1
と topic2
で構成されたソースとして MQTT エンドポイントを使用している場合、それらは自動的に共有サブスクリプションに $shared/<GENERATED_GROUP_NAME>/topic1
および $shared/<GENERATED_GROUP_NAME>/topic2
として変換されます。
構成内に $shared/mygroup/topic
という名前のトピックを明示的に作成できます。 ただし、$shared
プレフィックスは必要に応じて自動的に追加されるため、$shared
トピックを明示的に追加することはお勧めしません。 データフローが設定されていない場合は、グループ名を使用して最適化を行うことができます。 たとえば、$share
は設定されておらず、データフローはトピック名に対してのみ動作する必要がある場合があります。
重要
インスタンス数が 1 つ以上のときに共有サブスクリプションを必要とするデータフローは、Event Grid MQTT ブローカーをソースとして使用する場合に、共有サブスクリプションをサポートしていないため重要となります。 メッセージの欠落を回避するため、Event Grid MQTT ブローカーをソースとして使用するときは、データフロー プロファイル インスタンス数を 1 に設定します。 これは、データフローがサブスクライバーで、クラウドからメッセージを受信する場合です。
Kafka トピック
ソースが Kafka (Event Hubs を含む) エンドポイントの場合は、受信メッセージをサブスクライブする個々の Kafka トピックを指定します。 ワイルドカードはサポートされていないため、各トピックを静的に指定する必要があります。
Note
Kafka エンドポイント経由で Event Hubs を使用する場合、名前空間内の個々のイベント ハブは Kafka トピックです。 たとえば、thermostats
と humidifiers
の 2 つのイベント ハブを含む Event Hubs 名前空間がある場合、各イベント ハブを Kafka トピックとして指定できます。
Kafka トピックを構成するには:
Kafka エンドポイントのソースとしての使用は、現在、操作エクスペリエンスではサポートされていません。
ソース スキーマを指定する
MQTT または Kafka をソースとして使用する場合は、スキーマを指定して、操作エクスペリエンス ポータルにデータ ポイントの一覧を表示できます。 受信メッセージの逆シリアル化および検証のためのスキーマの使用は、現在サポートされていないことに注意してください。
ソースが資産の場合、スキーマは資産定義から自動的に推論されます。
ヒント
サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。
ソースからの受信メッセージを逆シリアル化するために使用するスキーマを構成するには:
操作エクスペリエンスのデータフローの [ソースの詳細] で [MQTT] を選択し、[メッセージ スキーマ] フィールドを使用してスキーマを指定します。 [アップロード] ボタンを使用して、最初にスキーマ ファイルをアップロードできます。 詳細については、「メッセージ スキーマを理解する」を参照してください。
詳細については、「メッセージ スキーマを理解する」を参照してください。
変換
変換操作では、宛先に送信する前にソースからのデータを変換できます。 変換は省略可能です。 データを変更する必要がない場合は、データフロー構成に変換操作を含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。 ステージの順序は常に次のようになります。
- エンリッチ: 一致するデータセットと条件を指定して、ソース データにデータを追加します。
- フィルター: 条件に基づいてデータをフィルター処理します。
- [マップ]、[コンピューティング]、[名前の変更]、または [新しいプロパティ] の追加: 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します。
このセクションでは、データフロー変換の概要について説明します。 詳細については、「データフローを使用してデータをマップする」、「データフロー変換を使用してデータを変換する」、「データフローを使用してデータをエンリッチする」を参照してください。
操作エクスペリエンスで、[データフロー]>[変換の追加 (省略可能)] を選びます。
エンリッチ: 参照データを追加する
データをエンリッチするには、まず Azure IoT Operations の状態ストアに参照データセットを追加します。 データセットは、条件に基づいてソース データにさらにデータを追加するために使用されます。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。
状態ストア CLI を使用して、状態ストアにサンプル データを読み込むことができます。 状態ストアのキー名は、データフロー構成のデータセットに対応します。
現在、"エンリッチ" ステージは操作エクスペリエンスではサポートされていません。
データセットに asset
フィールドを持つレコードがある場合は、次のようになります。
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
thermostat1
と一致する deviceId
フィールドを持つソースのデータには、フィルターとマップのステージで使用できる location
と manufacturer
のフィールドがあります。
条件構文の詳細については、「データフローを使用してデータをエンリッチする」と「データフローを使用したデータの変換」に関する記事を参照してください。
フィルター: 条件に基づいてデータをフィルター処理する
条件に基づいてデータをフィルター処理するには、filter
ステージを使用できます。 条件は、値と一致するソース データ内のフィールドとして指定されます。
[変換 (省略可能)] で、[フィルター]>[追加] を選びます。
必要な設定を入力します。
設定 説明 フィルターの条件 ソース データのフィールドに基づいてデータをフィルター処理する条件。 説明 フィルター条件の説明を入力します。 フィルター条件フィールドに「
@
」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。条件では、ソース データのフィールドを使用できます。 たとえば、
@temperature > 20
のようなフィルター条件を使用して、温度フィールドに基づいて 20 以下のデータをフィルター処理できます。適用を選択します。
マップ: あるフィールドから別のフィールドにデータを移動する
省略可能な変換を使用してデータを別のフィールドにマップするには、map
操作を使用できます。 変換は、ソース データのフィールドを使用する数式として指定されます。
操作エクスペリエンスでは、マッピングは現在 [コンピューティング]、[名前変更]、[新しいプロパティ] の各変換を使用してサポートされています。
Compute
[コンピューティング] 変換を使用して、ソース データに数式を適用できます。 この操作は、ソース データに数式を適用し、結果フィールドを格納するために使用されます。
[変換 (省略可能)] で、[コンピューティング]>[追加] を選びます。
必要な設定を入力します。
設定 説明 数式を選択する ドロップダウンから既存の数式を選択するか、[カスタム] を選択して手動で数式を入力します。 出力 結果の出力表示名を指定します。 式 ソース データに適用する数式を入力します。 説明 変換の説明を入力します。 最後の既知の値 必要に応じて、現在の値が使用できない場合は、最後の既知の値を使用します。 [数式] フィールドに数式を入力するか、既存の数式を編集できます。 数式には、ソース データ内のフィールドを使用できます。 「
@
」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。数式には、ソース データ内のフィールドを使用できます。 たとえば、ソース データの
temperature
フィールドを使用して温度を摂氏に変換し、それをtemperatureCelsius
出力フィールドに格納できます。適用を選択します。
名前の変更
[名前の変更] 変換を使用して、データポイントの名前を変更できます。 この操作は、ソース データ内のデータポイントの名前を新しい名前に変更するために使用されます。 新しい名前は、データフローの後続のステージで使用できます。
[変換 (省略可能)] で、[名前の変更]>[追加] を選びます。
必要な設定を入力します。
設定 説明 データポイント ドロップダウンからデータポイントを選択するか、$metadata ヘッダーを入力します。 新しいデータポイント名 データポイントの新しい名前を入力します。 説明 変換の説明を入力します。 「
@
」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。MQTT メタデータ プロパティは、形式
@$metadata.user_properties.<property>
または@$metadata.topic
を使用して入力できます。@$metadata.<header>
形式を使用して、$metadata ヘッダーを入力することもできます。$metadata
構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。適用を選択します。
新しいプロパティ
[新しいプロパティ] 変換を使用して、ソース データに新しいプロパティを追加できます。 この操作は、ソース データに新しいプロパティを追加するために使用されます。 新しいプロパティは、データフローの後続のステージで使用できます。
[変換 (省略可能)] で、[新しいプロパティ]>[追加] を選びます。
必要な設定を入力します。
設定 説明 プロパティキー 新しいプロパティのキーを入力します。 プロパティ値 新しいプロパティの値を入力します。 説明 新しいプロパティの説明を入力します。 適用を選択します。
詳細については、「データフローを使用してデータをマッピングする」と「データフローを使用したデータの変換」に関する記事を参照してください。
スキーマに従ってデータをシリアル化する
データを宛先に送信する前にシリアル化する場合は、スキーマとシリアル化形式を指定する必要があります。 それ以外の場合は、データは推論された型を使用して JSON でシリアル化されます。 Microsoft Fabric や Azure Data Lake などのストレージ エンドポイントには、データの一貫性を確保するためにスキーマが必要です。 サポートされているシリアル化形式は、Parquet と Delta です。
ヒント
サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。
操作エクスペリエンスの場合は、データフロー エンドポイントの詳細でスキーマとシリアル化の形式を指定します。 シリアル化形式をサポートするエンドポイントは、Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure Data Explorer です。 たとえば、差分形式でデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、データフロー変換先エンドポイント構成で参照する必要があります。
スキーマ レジストリの詳細については、「メッセージ スキーマについて」を参照してください。
宛先
データフローの宛先を構成するには、エンドポイント参照とデータ宛先を指定します。 エンドポイントのデータの宛先の一覧を指定できます。
ローカル MQTT ブローカー以外の宛先にデータを送信するには、データフロー エンドポイントを作成します。 その方法については、データフロー エンドポイントの構成に関するページを参照してください。 宛先がローカル MQTT ブローカーでない場合は、ソースとして使用する必要があります。 詳細については、「データフローではローカル MQTT ブローカー エンドポイントの使用が必須」を参照してください。
重要
ストレージ エンドポイントには、シリアル化にスキーマが必要です。 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル記憶域でデータフローを使用するには、スキーマ参照を指定する必要があります。
宛先として使用するデータフロー エンドポイントを選択します。
ストレージ エンドポイントには、シリアル化にスキーマが必要です。 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル ストレージの宛先エンドポイントを選択する場合は、スキーマ参照を指定する必要があります。 たとえば、データを Delta 形式で Microsoft Fabric エンドポイントにシリアル化するには、スキーマをスキーマ レジストリにアップロードし、データフローの宛先エンドポイントの構成内でそれを参照する必要があります。
[続行] を選択して、宛先を構成します。
データの送信先となるトピックやテーブルなど、宛先に必要な設定を入力します。 詳細については、「データの宛先の構成 (トピック、コンテナー、またはテーブル)」を参照してください。
データの宛先 (トピック、コンテナー、またはテーブル) を構成する
データ ソースと同様に、データの宛先は、複数のデータフロー間でデータフロー エンドポイントを再利用可能に保つために使用される概念です。 基本的には、データフロー エンドポイント構成のサブディレクトリを表します。 たとえば、データフロー エンドポイントがストレージ エンドポイントの場合、データの宛先はストレージ アカウント内のテーブルです。 データフロー エンドポイントが Kafka エンドポイントの場合、データの宛先は Kafka トピックです。
エンドポイントの種類 | データの宛先の意味 | 説明 |
---|---|---|
MQTT (または Event Grid) | トピック | データが送信される MQTT トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 |
Kafka (または Event Hubs) | トピック | データが送信される Kafka トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 エンドポイントが Event Hubs 名前空間の場合、データの宛先は名前空間内の個々のイベント ハブです。 |
Azure Data Lake Storage | コンテナー | ストレージ アカウントのコンテナー。 テーブルではありません。 |
Microsoft Fabric OneLake | ファイルまたはフォルダー | 構成済みのエンドポイントのパスの種類に対応します。 |
Azure Data Explorer | テーブル | Azure Data Explorer データベース内のテーブル。 |
ローカル ストレージ | フォルダー | ローカル ストレージの永続ボリューム マウント内のフォルダーまたはディレクトリ名。 Azure Arc クラウド取り込みエッジ ボリュームで有効な Azure コンテナー ストレージを使用する場合、これは、作成したサブボリュームの spec.path パラメーターと一致する必要があります。 |
データの宛先を構成するには:
操作エクスペリエンスを使用する場合、データの宛先フィールドはエンドポイントの種類に基づいて自動的に解釈されます。 たとえば、データフロー エンドポイントがストレージ エンドポイントの場合、宛先の詳細ページでコンテナー名の入力が求められます。 データフロー エンドポイントが MQTT エンドポイントである場合、宛先の詳細ページでトピックの入力などが求められます。
例
次の例は、ソースと宛先に MQTT エンドポイントを使用したデータフロー構成です。 ソースは、MQTT トピック azure-iot-operations/data/thermostat
からのデータをフィルターします。 変換によって温度が華氏に変換され、温度に湿度を掛けた値が 100,000 未満のデータにフィルターされます。 宛先が MQTT トピック factory
にデータを送信します。
構成例については、[Bicep] タブまたは [Kubernetes] タブを参照してください。
データフロー構成のその他の例については、Azure REST API (データフロー) と Bicep のクイックスタートに関するページを参照してください。
データフローが機能していることを確認する
「チュートリアル: Azure Event Grid への双方向 MQTT ブリッジ」に従って、データフローが機能していることを確認します。
データフロー構成のエクスポート
データフロー構成をエクスポートするには、操作エクスペリエンスを使用するか、Dataflow カスタム リソースをエクスポートします。
エクスポートするデータフローを選択し、ツール バーから [エクスポート] を選びます。
適切なデータフロー構成
データフローが期待どおりに動作していることを確認するには、次のことを確かめします。
- 既定の MQTT データフロー エンドポイントは、ソースまたは宛先のいずれかとして使用される必要があります。
- データフロー プロファイルが存在し、データフロー構成で参照されます。
- ソースは、MQTT エンドポイント、Kafka エンドポイント、資産のいずれかです。 ストレージの種類のエンドポイントは、ソースとして使用できません。
- Event Grid をソースとして使用する場合、Event Grid MQTT ブローカーは共有サブスクリプションをサポートしていないため、データフロー プロファイル インスタンス数は 1 に設定されます。
- Event Hubs をソースとして使用する場合、名前空間内の各イベント ハブは個別の Kafka トピックであり、データ ソースとして指定する必要があります。
- 変換を使用する場合は、特殊文字の適切なエスケープを含む、適切な構文で構成します。
- ストレージの種類のエンドポイントを宛先として使用する場合は、スキーマを指定します。