データフローを使用してデータをマップする
重要
このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれます。 この機能は いくつかの制限を設けて提供されており、運用環境のワークロードに使用すべきものではありません。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
データフロー マッピング言語を使用して、Azure IoT Operations のデータを変換します。 そのための構文は、データをある形式から別の形式に変換するマッピングを定義する、シンプルでありながら強力な方法です。 この記事では、データフロー マッピング言語と主要な概念の概要について説明します。
マッピングを使用すると、ある形式から別の形式にデータを変換できます。 次の入力レコードについて考えてみましょう。
{
"Name": "Grace Owens",
"Place of birth": "London, TX",
"Birth Date": "19840202",
"Start Date": "20180812",
"Position": "Analyst",
"Office": "Kent, WA"
}
それを出力レコードと比較します。
{
"Employee": {
"Name": "Grace Owens",
"Date of Birth": "19840202"
},
"Employment": {
"Start Date": "20180812",
"Position": "Analyst, Kent, WA",
"Base Salary": 78000
}
}
出力レコードでは、入力レコード データに対して次の変更が行われます。
- フィールド名の変更:
Birth Date
フィールドがDate of Birth
になりました。 - フィールドの再構築:
Name
とDate of Birth
は両方とも新しいEmployee
カテゴリの下にグループ化されています。 - フィールドの削除:
Place of birth
フィールドは出力内に存在しないため削除されています。 - フィールドの追加:
Base Salary
フィールドはEmployment
カテゴリ内の新しいフィールドです。 - フィールド値の変更またはマージ: 出力内の
Position
フィールドは、入力のPosition
およびOffice
フィールドを結合します。
変換は "マッピング" によって実現され、これには通常以下が関係します。
- 入力定義: 使用される入力レコード内のフィールドを識別します。
- 出力定義: 出力レコード内の入力フィールドを編成する場所と方法を指定します。
- 変換 (省略可能): 出力フィールドに収まるように入力フィールドを変更します。
expression
は、複数の入力フィールドを 1 つの出力フィールドに結合する場合に必要です。
マッピングの例を次に示します。
{
inputs: [
'BirthDate'
]
output: 'Employee.DateOfBirth'
}
{
inputs: [
'Position' // - - - - $1
'Office' // - - - - $2
]
output: 'Employment.Position'
expression: '$1 + ", " + $2'
}
{
inputs: [
'$context(position).BaseSalary'
]
output: 'Employment.BaseSalary'
}
マップ例は次のとおりです。
- 一対一マッピング:
BirthDate
は変換なしでEmployee.DateOfBirth
に直接マップされます。 - 多対一のマッピング:
Position
とOffice
を 1 つのEmployment.Position
フィールドに結合します。 変換式 ($1 + ", " + $2
) は、これらのフィールドを書式設定された文字列にマージします。 - コンテキスト データ:
BaseSalary
はposition
という名前のコンテキスト データセットから追加されます。
フィールド参照
フィールド参照は、入力と出力におけるパスの指定を Employee.DateOfBirth
のようなドット表記を使用するか、$context(position)
を介してコンテキスト データセットからデータにアクセスする方法のどちらで行うかを表します。
MQTT および Kafka のメタデータ プロパティ
MQTT または Kafka をソースまたは宛先として使用する場合は、マッピング言語でさまざまなメタデータ プロパティにアクセスできます。 これらのプロパティは、入力または出力でマップできます。
メタデータのプロパティ
- トピック: MQTT と Kafka の両方に使用できます。 メッセージが公開された文字列が含まれます。 例:
$metadata.topic
。 - ユーザー プロパティ: MQTT では、これは MQTT メッセージが伝送できる自由形式のキー/値のペアを指します。 たとえば、MQTT メッセージがキー "priority" と値 "high" のユーザー プロパティで発行された場合、
$metadata.user_property.priority
参照で値 "high" が保持されます。 ユーザー プロパティ キーは任意の文字列で、エスケープが必要な場合があります:$metadata.user_property."weird key"
はキー "weird key" (スペースを含む) を使います。 - システム プロパティ: この用語は、ユーザー プロパティでないすべてのプロパティに使用されます。 現在、1 つのシステム プロパティのみがサポートされています: MQTT メッセージのコンテンツ タイプ プロパティを読み取る
$metadata.system_property.content_type
(設定されている場合)。 - ヘッダー: これは、MQTT ユーザー プロパティと同等の Kafka です。 Kafka はキーに任意のバイナリ値を使用できますが、dataflow は UTF-8 文字列キーのみをサポートしています。 例:
$metadata.header.priority
。 この機能はユーザー プロパティに似ています。
メタデータ プロパティのマッピング
入力マッピング
次の例では、MQTT の topic
プロパティが出力の origin_topic
フィールドにマップされています。
inputs: [
'$metadata.topic'
]
output: 'origin_topic'
ユーザー プロパティ priority
が MQTT メッセージに存在する場合、それを出力フィールドにマッピングする方法を次の例で示します。
inputs: [
'$metadata.user_property.priority'
]
output: 'priority'
出力マッピング
メタデータ プロパティを出力ヘッダーやユーザー プロパティにマッピングすることもできます。 次の例では、MQTT の topic
が出力のユーザー プロパティの origin_topic
フィールドにマップされています。
inputs: [
'$metadata.topic'
]
output: '$metadata.user_property.origin_topic'
受信ペイロードに priority
フィールドが含まれている場合、次の例でそれを MQTT ユーザー プロパティにマッピングする方法を示します。
inputs: [
'priority'
]
output: '$metadata.user_property.priority'
Kafka の同じ例:
inputs: [
'priority'
]
output: '$metadata.header.priority'
コンテキスト化データセット セレクター
これらのセレクターによって、"コンテキスト化データセット" と呼ばれる外部データベースからの追加データを統合するためのマッピングが可能になります。
レコードのフィルター処理
レコードのフィルター処理では、処理または削除するレコードを選択する条件を設定します。
ドット表記
ドット表記は、フィールドを参照するためにコンピューター サイエンスで広く使用されており、再帰的に使用されることもあります。 通常プログラミングでは、フィールド名は文字と数字で構成されます。 標準的なドット表記のサンプルは、次の例のようになります。
inputs: [
'Person.Address.Street.Number'
]
データフローでは、ドット表記で記述されたパスは、以下のように文字列とエスケープを必要としない特殊文字を含んでいる場合があります。
inputs: [
'Person.Date of Birth'
]
他の場合はエスケープが必要です。
inputs: [
'Person."Tag.10".Value'
]
前の例は、フィールド名内に他の特殊文字だけでなくドットを含んでいます。 エスケープなしでは、このフィールド名はドット表記自体の中の区切りとして機能します。
データフローはパスを解析しますが、次の 2 つの文字のみを特殊として扱います。
- ドット (
.
) はフィールド区切りとして機能します。 - セグメントの先頭または末尾に配置された単一引用符は、ドットがフィールド区切りとして扱われないエスケープされたセクションを開始します。
その他の文字は、フィールド名の一部として扱われます。 この柔軟性は、フィールド名を任意の文字列にできる JSON などの形式で役立ちます。
Bicep では、すべての文字列は単一引用符 ('
) で囲まれます。 Kubernetes 用の YAML での適切な引用符の使用に関する例は適用されません。
エスケープ
ドット表記のパス内でエスケープする主な役目は、区切り記号でなく、フィールド名の一部であるドットの使用に対処することです。
inputs: [
'Payload."Tag.10".Value'
]
この例では、パスは Payload
、Tag.10
、Value
の 3 つのセグメントで構成されています。
ドット表記でのエスケープ ルール
各セグメントの個別エスケープ: 複数のセグメントにドットが含まれている場合は、それらのセグメントを二重引用符で囲む必要があります。 他のセグメントも引用符で囲めますが、パスの解釈には影響しません。
inputs: [ 'Payload."Tag.10".Measurements."Vibration.$12".Value' ]
二重引用符の適切な使用: 二重引用符は、エスケープされるセグメントの前後に付ける必要があります。 以下のようにセグメントの途中にある引用符はすべて、フィールド名の一部と見なされます。
inputs: [ 'Payload.He said: "Hello", and waved' ]
この例では、Payload
と He said: "Hello", and waved
という 2 つのフィールドを定義します。 以下のようにこのような状況でドットが表れる場合でも、ドットは区切りとして機能します。
inputs: [
'Payload.He said: "No. It is done"'
]
この場合、パスはセグメント Payload
、He said: "No
、It is done"
(スペースから始まる) に分割されます。
分割アルゴリズム
- セグメントの最初の文字が引用符の場合、パーサーは次の引用符を探します。 これらの引用符によって囲まれた文字列は、1 つのセグメントと見なされます。
- セグメントが引用符で始まらない場合、パーサーは次のドットまたはパスの末尾を見つけることでセグメントを特定します。
ワイルドカード
多くのシナリオで、出力レコードはわずかな変更が必要なだけで、入力レコードによく似ています。 多数のフィールドを含むレコードを処理する場合は、各フィールドのマッピングを手動で指定することは大変な作業になる可能性があります。 ワイルドカードを使用すると、複数のフィールドに自動的に適用できる一般化されたマッピングが可能になるため、このプロセスが簡略化されます。
マッピングでのアスタリスクの使用を理解するための基本的なシナリオを考えてみましょう。
inputs: [
'*'
]
output: '*'
この構成は基本的なマッピングを示しています。ここでは、入力のすべてのフィールドが、変更なしに出力の同じフィールドに直接マッピングされます。 アスタリスク (*
) は、入力レコードのどのフィールドにも一致するワイルドカードとして機能します。
このコンテキストでのアスタリスク (*
) の動作を次に示します。
- パターン マッチング: アスタリスクは、パスの 1 つまたは複数のセグメントとマッチすることができます。 パス内の任意のセグメントのプレースホルダーとして機能します。
- フィールド マッチング: マッピング プロセス中に、アルゴリズムは入力レコード内の各フィールドを、
inputs
内で指定されたパターンと照らし合わせて評価します。 前の例のアスタリスクは、可能なすべてのパスと一致し、入力内のすべての個々のフィールドに有効に適合します。 - キャプチャされたセグメント: アスタリスクがマッチするパスの一部は
captured segment
と呼ばれます。 - 出力マッピング: 出力構成では、アスタリスクが表れる場所に
captured segment
が配置されます。 つまり、入力の構造は出力に保持され、captured segment
によってアスタリスクが提供するプレースホルダーが埋められます。
別の例では、ワイルドカードを使用してサブセクションを照合し、それらを一緒に移動する方法を示します。 この例では、JSON オブジェクト内の入れ子になった構造体を有効にフラット化します。
元の JSON:
{
"ColorProperties": {
"Hue": "blue",
"Saturation": "90%",
"Brightness": "50%",
"Opacity": "0.8"
},
"TextureProperties": {
"type": "fabric",
"SurfaceFeel": "soft",
"SurfaceAppearance": "matte",
"Pattern": "knitted"
}
}
ワイルドカードを使用するマッピング構成:
{
inputs: [
'ColorProperties.*'
]
output: '*'
}
{
inputs: [
'TextureProperties.*'
]
output: '*'
}
結果の JSON:
{
"Hue": "blue",
"Saturation": "90%",
"Brightness": "50%",
"Opacity": "0.8",
"type": "fabric",
"SurfaceFeel": "soft",
"SurfaceAppearance": "matte",
"Pattern": "knitted"
}
ワイルドカードの配置
ワイルドカードを配置するときは、以下のルールに従う必要があります。
- データ参照ごとに 1 つのアスタリスク: 1 つのデータ参照内で使用できるアスタリスク (
*
) は 1 つだけです。 - 完全セグメント マッチング: アスタリスクは、常にパスのセグメント全体とマッチする必要があります。
path1.partial*.path3
など、セグメントの一部のみを照合するために使用することはできません。 - 配置: アスタリスクは以下のようにデータ参照のさまざまな部分に配置できます。
- 先頭:
*.path2.path3
- この場所では、アスタリスクはpath2.path3
が後に来る任意のセグメントとマッチします。 - 中間:
path1.*.path3
- この構成では、アスタリスクはpath1
とpath3
に挟まれる任意のセグメントにマッチします。 - 末尾:
path1.path2.*
- 末尾のアスタリスクはpath1.path2
の後に続く任意のセグメントとマッチします。
- 先頭:
- アスタリスクを含むパスは、単一引用符 (
'
) で囲む必要があります。
複数入力ワイルドカード
元の JSON:
{
"Saturation": {
"Max": 0.42,
"Min": 0.67,
},
"Brightness": {
"Max": 0.78,
"Min": 0.93,
},
"Opacity": {
"Max": 0.88,
"Min": 0.91,
}
}
ワイルドカードを使用するマッピング構成:
inputs: [
'*.Max' // - $1
'*.Min' // - $2
]
output: 'ColorProperties.*'
expression: '($1 + $2) / 2'
結果の JSON:
{
"ColorProperties" : {
"Saturation": 0.54,
"Brightness": 0.85,
"Opacity": 0.89
}
}
複数入力ワイルドカードを使用する場合、アスタリスク (*
) はすべての入力において一貫して同じ Captured Segment
を表す必要があります。 たとえば、*
がパターン *.Max
内の Saturation
をキャプチャする場合、対応する Saturation.Min
がパターン *.Min
と一致することが、マッピング アルゴリズムによって想定されます。 ここで、*
は最初の入力から Captured Segment
に置き換わり、後続の入力の照合を誘導します。
この詳細な例を考えてみましょう。
元の JSON:
{
"Saturation": {
"Max": 0.42,
"Min": 0.67,
"Mid": {
"Avg": 0.51,
"Mean": 0.56
}
},
"Brightness": {
"Max": 0.78,
"Min": 0.93,
"Mid": {
"Avg": 0.81,
"Mean": 0.82
}
},
"Opacity": {
"Max": 0.88,
"Min": 0.91,
"Mid": {
"Avg": 0.89,
"Mean": 0.89
}
}
}
ワイルドカードを使用する初期マッピング構成:
inputs: [
'*.Max' // - $1
'*.Min' // - $2
'*.Avg' // - $3
'*.Mean' // - $4
]
この初期マッピングは配列を構築しようとします (たとえば、Opacity
: [0.88, 0.91, 0.89, 0.89]
)。 この構成は以下の理由で失敗します。
- 最初の入力
*.Max
は、Saturation
などのセグメントをキャプチャします。 - このマッピングでは、後続の入力が同じレベルに存在することが想定されます。
Saturation.Max
Saturation.Min
Saturation.Avg
Saturation.Mean
Avg
と Mean
は Mid
内に入れ子になっているため、初期マッピング内のアスタリスクはこれらのパスを正しくキャプチャしません。
修正したマッピング構成:
inputs: [
'*.Max' // - $1
'*.Min' // - $2
'*.Mid.Avg' // - $3
'*.Mid.Mean' // - $4
]
この修正されたマッピングは、必要なフィールドを正確にキャプチャします。 これは入れ子になった Mid
オブジェクトを含むパスを正しく指定します。これにより、アスタリスクがさまざまなレベルの JSON 構造で効果的に機能するようになります。
2 番目のルールと特殊化
複数入力ワイルドカードの前の例を使用する場合は、プロパティごとに 2 つの派生値を生成する次のマッピングを検討してください。
{
inputs: [
'*.Max' // - $1
'*.Min' // - $2
]
output: 'ColorProperties.*.Avg'
expression: '($1 + $2) / 2'
}
{
inputs: [
'*.Max' // - $1
'*.Min' // - $2
]
output: 'ColorProperties.*.Diff'
expression: '$1 - $2'
}
このマッピングは、ColorProperties
のプロパティごとに 2 つの個別の計算 (Avg
と Diff
) を作成することを目的としています。 結果は次の例のようになります。
{
"ColorProperties": {
"Saturation": {
"Avg": 0.54,
"Diff": 0.25
},
"Brightness": {
"Avg": 0.85,
"Diff": 0.15
},
"Opacity": {
"Avg": 0.89,
"Diff": 0.03
}
}
}
ここで、同じ入力に対する 2 番目のマッピング定義は、マッピングの 2 番目のルールとして機能します。
次に、特定のフィールドに別の計算が必要なシナリオを考えてみましょう。
{
inputs: [
'*.Max' // - $1
'*.Min' // - $2
]
output: 'ColorProperties.*'
expression: '($1 + $2) / 2'
}
{
inputs: [
'Opacity.Max' // - $1
'Opacity.Min' // - $2
]
output: 'ColorProperties.OpacityAdjusted'
expression: '($1 + $2 + 1.32) / 2'
}
この場合、Opacity
フィールドには一意の計算があります。 この重複するシナリオを処理する 2 つのオプションは次のとおりです。
Opacity
に対して両方のマッピングを含めます。 この例では出力フィールドがそれぞれ異なるため、それらがお互いをオーバーライドすることはありません。Opacity
に対してより具体的なルールを使用し、より汎用的なルールを削除します。
以下のような適切なアクションを決定するのに役立つ、同じフィールドの特殊なケースを考えてみましょう。
{
inputs: [
'*.Max' // - $1
'*.Min' // - $2
]
output: 'ColorProperties.*'
expression: '($1 + $2) / 2'
}
{
inputs: [
'Opacity.Max' // - $1
'Opacity.Min' // - $2
]
output: ''
}
2 番目の定義の空の output
フィールドは、出力レコードにフィールドを書き込まない (実質的に Opacity
を削除する) 意味します。 この設定は、Second Rule
というよりも Specialization
です。
データフローによる重複マッピングの解決:
- 評価は、マッピング定義の一番上のルールから進行します。
- 新しいマッピングが前のルールと同じフィールドに解決される場合は、以下の条件が適用されます。
Rank
は、ワイルドカード キャプチャのセグメント数に基づいて、解決された各入力に対して計算されます。 たとえば、Captured Segments
がProperties.Opacity
の場合、Rank
は 2 になります。 それがOpacity
だけである場合、Rank
は 1 になります。 ワイルドカードを使用しないマッピングのRank
は 0 です。- 後者のルールの
Rank
が、前のルール以上の場合、データフローはそれをSecond Rule
として扱います。 - それ以外の場合、データフローは構成を
Specialization
として扱います。
たとえば、Opacity.Max
と Opacity.Min
を空の出力に対応させるマッピングの Rank
は 0 です。 2 番目のルールは、前のルールよりも Rank
が低いため、特殊化と見なされ、前のルールをオーバーライドし、これによって Opacity
の値が計算されます。
コンテキスト化データセットのワイルドカード
次に、例を通してコンテキスト化データセットをワイルドカードと共に使用する方法を見てみましょう。 次のレコードを含む position
という名前のデータセットを考えます。
{
"Position": "Analyst",
"BaseSalary": 70000,
"WorkingHours": "Regular"
}
前の例では、このデータセットの特定のフィールドを使用しました。
inputs: [
'$context(position).BaseSalary'
]
output: 'Employment.BaseSalary'
このマッピングは、コンテキスト データセットの BaseSalary
を出力レコードの Employment
セクションに直接コピーします。 このプロセスを自動化し、position
データセットからのすべてのフィールドを Employment
セクションに含めたい場合は、以下のようにワイルドカードを使用できます。
inputs: [
'$context(position).*'
]
output: 'Employment.*'
この構成により、position
データセット内のすべてのフィールドが出力レコードの Employment
セクションにコピーされる動的マッピングが可能になります。
{
"Employment": {
"Position": "Analyst",
"BaseSalary": 70000,
"WorkingHours": "Regular"
}
}
最後の既知の値
プロパティの最後の既知の値を追跡できます。 入力フィールドの最後の既知の値を取得するには、フィールドに ? $last
のサフィックスを付けます。 後続の入力ペイロードのプロパティに値がない場合、最後の既知の値が出力ペイロードにマップされます。
たとえば、次のマッピングがあるとします。
inputs: [
'Temperature ? $last'
]
output: 'Thermostat.Temperature'
この例では、Temperature
の最後の既知の値が追跡されます。 後続の入力ペイロードに Temperature
値が含まれていない場合は、最後の既知の値が出力で使用されます。