次の方法で共有


Azure Event Hubs と Kafka データ フロー エンドポイントを構成する

重要

このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれます。 この機能はいくつかの制限を設けて提供されており、運用環境のワークロードには使用しないでください。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

Azure IoT Operations と Apache Kafka ブローカー間の双方向通信を設定するには、データ フロー エンドポイントを構成します。 この構成では、エンドポイント、トランスポート層セキュリティ (TLS)、認証、およびその他の設定を指定できます。

前提条件

Azure Event Hubs

Azure Event Hubs は Kafka プロトコルと互換であり、いくつかの制限付きでデータ フローとともに使用できます。

Azure Event Hubs 名前空間とイベント ハブを作成する

まず、Kafka 対応の Azure Event Hubs 名前空間を作成します

次に、名前空間にイベント ハブを作成します。 個々のイベント ハブは、Kafka トピックに対応します。 同じ名前空間に複数のイベント ハブを作成して、複数の Kafka トピックを表すことができます。

マネージド ID にアクセス許可を割り当てる

Azure Event Hubs のデータ フロー エンドポイントを構成するには、ユーザー割り当てまたはシステム割り当ての、いずれかのマネージド ID を使用することをお勧めします。 この方法は安全であり、認証情報を手動で管理する必要がありません。

Azure Event Hubs 名前空間とイベント ハブが作成されたら、イベント ハブのメッセージを送受信するためのアクセス許可を付与するロールを、Azure IoT Operations マネージド ID に割り当てる必要があります。

システム割り当てマネージド ID を使用する場合は、Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。 Azure IoT Operations Arc 拡張機能の後一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7。 システム割り当てマネージド ID を見つけるには、Azure IoT Operations Arc 拡張機能の同じ名前を使用します。

次に、Event Hubs 名前空間 >[アクセス制御 (IAM)]>[ロールの割り当ての追加] に移動します。

  1. [ロール] タブで、Azure Event Hubs Data SenderAzure Event Hubs Data Receiver などの適切なロールを選択します。 これにより、名前空間内のすべてのイベント ハブのメッセージを送受信するために必要なアクセス許可がマネージド ID に付与されます。 詳細については、「Event Hubs リソースにアクセスするための Microsoft Entra ID によりアプリケーションを認証する」を参照してください。
  2. [メンバー] タブで次の操作を行います。
    1. システム割り当てマネージド ID を使用している場合、[アクセスの割り当て先][ユーザー、グループ、またはサービス プリンシパル] オプションを選択し、[+ メンバーの選択] を選択して、Azure IoT Operations Arc 拡張機能の名前を検索します。
    2. ユーザー割り当てマネージド ID を使用している場合、[アクセスの割り当て先][マネージド ID] オプションを選択し、[+ メンバーの選択] を選択して、クラウド接続用に設定されたユーザー割り当てマネージド ID を検索します。

Azure Event Hubs のデータ フロー エンドポイントを作成する

Azure Event Hubs の名前空間とイベント ハブが構成されたら、Kafka 対応 Azure Event Hubs 名前空間のデータ フロー エンドポイントを作成できます。

  1. 操作エクスペリエンスで、[データ フロー エンドポイント] タブを選択します。

  2. [新しいデータ フロー エンドポイントの作成] で、[Azure Event Hubs]>[新規] を選択します。

    操作エクスペリエンスを使用して Azure Event Hubs データ フロー エンドポイントを作成しているスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 内容
    Name データ フロー エンドポイントの名前。
    Host 形式 <NAMESPACE>.servicebus.windows.net:9093 の Kafka ブローカーのホスト名。 Event Hubs のホスト設定にポート番号 9093 を含めます。
    認証方法 認証に使用する方式。 "システム割り当てマネージド ID" または "ユーザー割り当てマネージド ID" を選択することをお勧めします。
  4. [適用] を選択してエンドポイントをプロビジョニングします。

Note

Kafka トピックまたは個々のイベント ハブは、後でデータ フローを作成する際に構成されます。 Kafka トピックは、データ フロー メッセージの宛先です。

Event Hubs への認証に接続文字列を使用する

重要

操作エクスペリエンス ポータルを使用してシークレットを管理するには、まず、安全な設定で Azure IoT Operations を有効にする必要があります。それには Azure Key Vault を構成し、ワークロード ID を有効にします。 詳細については、Azure IoT Operations デプロイでの安全な設定の有効化に関する記事を参照してください。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[SASL] を選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 Plain を選択します。
同期されたシークレット名 接続文字列を含む Kubernetes シークレットの名前を入力します。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。 Key Vault の一覧から選択するか、新しく作成します。 値は $ConnectionString である必要があります。
パスワード参照またはトークン シークレット SASL 認証に使用されるパスワードへの参照またはトークン シークレット。 Key Vault の一覧から選択するか、新しく作成します。 値は Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> の形式でなければなりません。

[参照の追加] を選択した後、[新規作成] を選択した場合は、次の設定を入力します。

設定 説明
シークレット名 Azure Key Vault 内のシークレットの名前。 覚えやすい名前にして、後でリストからそのシークレットを選択できるようにしてください。
シークレットの値 ユーザーには「$ConnectionString」を入力します。 パスワードには接続文字列を Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> 形式で入力します。
アクティブ化する日を設定する オンにした場合、シークレットがアクティブになる日付。
有効期限を設定する オンにした場合、シークレットの有効期限が切れる日付。

シークレットの詳細については、Azure IoT Operations でのシークレットの作成と管理に関する記事を参照してください。

制限事項

Azure Event Hubs は、Kafka がサポートするすべての圧縮の種類をサポートするわけではありません。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。 他の圧縮の種類を使用すると、エラーが発生する可能性があります。

カスタム Kafka ブローカー

イベント ハブ以外の Kafka ブローカーのデータ フロー エンドポイントを構成するには、必要に応じてホスト、TLS、認証、およびその他の設定を設定します。

  1. 操作エクスペリエンスで、[データ フロー エンドポイント] タブを選択します。

  2. [新しいデータ フロー エンドポイントの作成] で、[カスタム Kafka ブローカー]>[新規] を選択します。

    操作エクスペリエンスを使用して Kafka データ フロー エンドポイントを作成しているスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 内容
    Name データ フロー エンドポイントの名前。
    Host 形式 <Kafka-broker-host>:xxxx の Kafka ブローカーのホスト名。 ホスト設定にポート番号を含めます。
    認証方法 認証に使用する方式。 [SASL] を選択します。
    SASL の種類 SASL 認証の種類。 [Plain]、[ScramSha256]、または [ScramSha512] を選びます。 [SASL] を使用する場合は必須です。
    同期されたシークレット名 シークレットの名前。 [SASL] を使用する場合は必須です。
    Username reference of token secret (トークン シークレットのユーザー名参照) SASL トークン シークレット内のユーザー名への参照。 [SASL] を使用する場合は必須です。
  4. [適用] を選択してエンドポイントをプロビジョニングします。

Note

現在、操作エクスペリエンスは、ソースとしての Kafka データ フロー エンドポイントの使用をサポートしていません。 Kafka データ フロー エンドポイントをソースに使用するデータ フローは、Kubernetes または Bicep を使用して作成できます。

エンドポイント設定をカスタマイズする場合は、次のセクションを使用して詳細を確認してください。

使用可能な認証方法

Kafka データ フロー エンドポイントで使用できる認証方法は次のとおりです。

システム割り当てマネージド ID

データ フロー エンドポイントを構成する前に、Kafka ブローカーに接続するアクセス許可を付与するロールを、Azure IoT Operations マネージド ID に割り当てます。

  1. Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。
  2. Azure IoT Operations Arc 拡張機能の後一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7
  3. アクセス許可を付与する必要があるクラウド リソースに移動します。 たとえば、Event Hubs 名前空間 >[アクセス制御 (IAM)]>[ロールの割り当ての追加] に移動します。
  4. [ロール] タブで、適切なロールを選択します。
  5. [メンバー] タブの [アクセスの割り当て先][ユーザー、グループ、またはサービス プリンシパル] オプションを選択し、[+ メンバーの選択] を選択して、Azure IoT Operations マネージド ID を検索します。 たとえば、azure-iot-operations-xxxx7

次に、システム割り当てマネージド ID の設定を使用してデータ フロー エンドポイントを構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[システム割り当てマネージド ID] を選択します。

この構成では、既定の対象ユーザーを使用してマネージド ID を作成します。これは、https://<NAMESPACE>.servicebus.windows.net という形式の Event Hubs 名前空間のホスト値と同じです。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、audience フィールドを目的の値に設定できます。

Operations Experience ではサポートされていません。

ユーザー割り当てマネージド ID

認証にユーザー割り当てマネージド ID を使用するには、まず、セキュリティで保護された設定を有効にして Azure IoT Operations をデプロイする必要があります。 次に、クラウド接続用にユーザー割り当てマネージド ID を設定する必要があります。 詳細については、Azure IoT Operations デプロイでの安全な設定の有効化に関する記事を参照してください。

データ フロー エンドポイントを構成する前に、Kafka ブローカーに接続するアクセス許可を付与するロールを、ユーザー割り当てマネージド ID に割り当てます。

  1. Azure portal で、アクセス許可を付与する必要があるクラウド リソースに移動します。 たとえば、Event Grid 名前空間 >[アクセス制御 (IAM)]>[ロールの割り当ての追加] に移動します。
  2. [ロール] タブで、適切なロールを選択します。
  3. [メンバー] タブの [アクセスの割り当て先][マネージド ID] オプションを選択し、[+ メンバーの選択] を選択して、ユーザー割り当てマネージド ID を検索します。

次に、ユーザー割り当てマネージド ID の設定を使用してデータ フロー エンドポイントを構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[ユーザー割り当てマネージド ID] を選択します。

ここでは、スコープはマネージド ID の対象ユーザーです。 既定値は、Event Hubs 名前空間のホスト値と同じで、形式は https://<NAMESPACE>.servicebus.windows.net です。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、Bicep または Kubernetes を使用してスコープ フィールドを目的の値に設定できます。

SASL

認証に SASL を使用するには、SASL 認証方法を指定し、SASL の種類と、SASL トークンを含むシークレットの名前を持つシークレット参照を構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[SASL] を選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 使用する SASL 認証の種類。 サポートされている型は、PlainScramSha256ScramSha512 です。
同期されたシークレット名 SASL トークンを含む Kubernetes シークレットの名前。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。
パスワード参照またはトークン シークレット SASL 認証に使用されるパスワードへの参照またはトークン シークレット。

サポートされている SASL の種類は次のとおりです。

  • Plain
  • ScramSha256
  • ScramSha512

このシークレットは、Kafka データ フロー エンドポイントと同じ名前空間に存在する必要があります。 シークレットには、SASL トークンが、キーと値のペアの形式で存在する必要があります。

匿名

匿名認証を使用するには、Anonymous メソッドを使用するように Kafka 設定の認証セクションを更新します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[なし] を選択します。

詳細設定

TLS、信頼されたCA 証明書、Kafka メッセージング設定、バッチ処理、CloudEvents など、Kafka データ フロー エンドポイントの詳細設定を設定できます。 これらの設定は、データ フロー エンドポイントの [詳細] ポータル タブ、またはデータ フロー エンドポイント リソース内で設定できます。

操作エクスペリエンスで、データ フロー エンドポイントの [詳細] タブを選択します。

操作エクスペリエンスを使って Kafka データ フロー エンドポイントの詳細設定を行っているスクリーンショット。

TLS の設定

TLS モード

Kafka エンドポイントの TLS を有効または無効にするには、TLS 設定の mode 設定を更新します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[TLS モードが有効] の横にあるチェックボックスを使用します。

TLS モードは、Enabled または Disabled に設定できます。 モードが Enabled に設定されている場合、データ フローは Kafka ブローカーに対して安全な接続を使用しています。 モードが Disabled に設定されている場合は、データ フローは Kafka ブローカーに対して安全ではない接続を使用しています。

信頼された証明機関証明書

Kafka ブローカーへのセキュリティで保護された接続を確立するように、Kafka エンドポイントの信頼された CA 証明書を構成します。 この設定は、Kafka ブローカーが自己署名証明書または既定では信頼されていないカスタム証明機関によって署名された証明書を使用する場合に重要です。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[信頼された CA 証明書構成マップ] フィールドを使用して、信頼された CA 証明書を含む ConfigMap を指定します。

この ConfigMap には、PEM 形式の証明機関証明書が含まれている必要があります。 この ConfigMap は、Kafka データ フロー リソースと同じ名前空間に存在する必要があります。 次に例を示します。

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

ヒント

Azure Event Hubs に接続する場合、Event Hubs サービスは既定で信頼されているパブリック CA によって署名された証明書を使用するため、CA 証明書は必要ありません。

コンシューマー グループ ID

コンシューマー グループ ID は、Kafka トピックからのメッセージの読み取りにデータ フローが使用する、コンシューマー グループを識別するために使用されます。 コンシューマー グループ ID は、Kafka ブローカー内で一意である必要があります。

重要

Kafka エンドポイントをソースとして使用する場合は、コンシューマー グループ ID が必要です。 それ以外の場合、データ フローでは Kafka トピックからメッセージを読み取ることができず、"Kafka 型のソース エンドポイントには consumerGroupId が定義されている必要があります" というエラーが表示されます。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[コンシューマー グループ ID] フィールドを使用してコンシューマー グループ ID を指定します。

この設定は、エンドポイントがソース (つまり、データ フローがコンシューマー) として使用される場合にのみ有効になります。

圧縮

圧縮フィールドを使用すると、Kafka トピックに送信されるメッセージの圧縮が可能になります。 圧縮は、データ転送に必要なネットワーク帯域幅とストレージ領域を減らすのに役立ちます。 ただし、圧縮により、オーバーヘッドと待機時間もプロセスに追加されます。 サポートされている圧縮の種類を次の表に示します。

説明
None 圧縮またはバッチ処理は適用されません。 compression が指定されていない場合、既定値は None です。
Gzip GZIP 圧縮とバッチ処理が適用されます。 GZIP は、圧縮率と速度のバランスが良い汎用圧縮アルゴリズムです。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。
Snappy すばやい圧縮とバッチ処理が適用されます。 Snappy は、中程度の圧縮率と速度を提供する高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。
Lz4 LZ4 圧縮とバッチ処理が適用されます。 LZ4は、低圧縮比と高速を提供する高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。

圧縮を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[圧縮] フィールドを使用して圧縮の種類を指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

バッチ処理

圧縮とは別に、Kafka トピックに送信する前にメッセージのバッチ処理を構成することもできます。 バッチ処理を使用すると、複数のメッセージをグループ化して 1 つの単位として圧縮できるため、圧縮効率が向上し、ネットワークのオーバーヘッドが削減されます。

フィールド Description 必須
mode Enabled または Disabled を指定できます。 Kafka には "バッチ処理されない" メッセージングという概念がないため、既定値は Enabled です。 Disabled に設定すると、バッチ処理が最小化され、毎回 1 つのメッセージでバッチが作成されます。 いいえ
latencyMs メッセージをバッファー処理してから送信できる最大時間間隔 (ミリ秒単位)。 この間隔に達した場合、バッファー内のすべてのメッセージは、メッセージの数や大きさに関係なく、バッチとして送信されます。 設定しない場合、既定値は 5 です。 いいえ
maxMessages 送信前にバッファーに格納できるメッセージの最大数。 この数に達すると、バッファーに格納されているメッセージの大きさやバッファーの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 設定しない場合、既定値は 100000 です。 いいえ
maxBytes 送信前にバッファーに格納できる最大サイズ (バイト単位)。 このサイズに達すると、バッファーに格納されているメッセージの数やバッファーの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 既定値は 1000000 (1 MB) です。 いいえ

たとえば、latencyMs を 1000 に、maxMessages を 100 に、maxBytes を 1024 に設定した場合、バッファーのメッセージが 100 件になるか、バッファーのバイト数が 1,024 になるか、最後の送信から 1,000 ミリ秒が経過するかのいずれかが最初に発生したときにメッセージが送信されます。

バッチ処理を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[バッチ処理が有効] フィールドを使用してバッチ処理を有効にします。 [バッチ処理の待機時間][最大バイト数][メッセージ数] フィールドを使用してバッチ処理の設定を指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

パーティション処理戦略

パーティション処理戦略は、Kafka トピックに送信するときにメッセージを Kafka パーティションに割り当てる方法を制御します。 Kafka パーティションは、並列処理とフォールト トレランスを可能にする Kafka トピックの論理セグメントです。 Kafka トピック内の各メッセージには、メッセージの識別と順序付けに使用されるパーティションとオフセットがあります。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

既定では、データ フローはラウンド ロビン アルゴリズムを使用して、ランダムなパーティションにメッセージを割り当てます。 ただし、MQTT トピック名や MQTT メッセージ プロパティなど、いくつかの条件に基づいてパーティションにメッセージを割り当てるには、さまざまな方法を使用できます。 これは、負荷分散、データの局所性、またはメッセージの順序付けを改善するのに役立ちます。

説明
Default ラウンド ロビン アルゴリズムを使用して、ランダム パーティションにメッセージを割り当てます。 これは、戦略が指定されていない場合の既定値です。
Static データ フローのインスタンス ID から派生した固定のパーティション番号にメッセージを割り当てます。 これは、各データ フロー インスタンスが異なるパーティションにメッセージを送信することを意味します。 これにより、負荷分散とデータの局所性が向上します。
Topic パーティション分割のキーとして、データ フロー ソースからの MQTT トピック名を使用します。 これは、同じ MQTT トピック名のメッセージが同じパーティションに送信されることを意味します。 これは、メッセージの順序付けとデータの局所性の向上に役立ちます。
Property パーティション分割のキーとして、データ フロー ソースからの MQTT メッセージ プロパティを使用します。 partitionKeyProperty フィールドにプロパティの名前を指定します。 これは、同じプロパティ値を持つメッセージが同じパーティションに送信されることを意味します。 これにより、カスタム条件に基づいてメッセージの順序付けとデータの局所性を向上させることができます。

たとえば、パーティション処理戦略を Property に設定し、パーティション キー プロパティを device-id に設定した場合、同じ device-id プロパティを持つメッセージは同じパーティションに送信されます。

パーティション処理戦略を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[パーティション処理戦略] フィールドを使用してパーティション処理戦略を指定します。 戦略が Property に設定されている場合は、パーティション キー プロパティ フィールドを使用して、パーティション化に使用するプロパティを指定します。

Kafka の受信確認

Kafka 受信確認 (ACK) は、Kafka トピックに送信されるメッセージの持続性と一貫性を制御するために使用されます。 プロデューサーは、Kafka トピックにメッセージを送信するときに、メッセージがトピックに正常に書き込まれ、Kafka クラスター全体にレプリケートされたことを確認するために、Kafka ブローカーにさまざまなレベルの受信確認を要求できます。

この設定は、エンドポイントが宛先 (つまり、データ フローがプロデューサー) として使用される場合にのみ有効になります。

説明
None データ フローは、Kafka ブローカーからの受信確認を待機しません。 この設定は最も高速ですが、最も持続性の低いオプションです。
All データ フローは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるのを待機します。 この設定は最も低速ですが、最も持続性の高いオプションです。 この設定は既定のオプションでもあります
One データ フローは、メッセージがリーダー パーティションと少なくとも 1 個のフォロワー パーティションに書き込まれるのを待機します。
Zero データ フローは、メッセージがリーダー パーティションに書き込まれるのを待機しますが、フォロワーからの受信確認を待機しません。 これは One よりも高速ですが、持続性は低くなります。

たとえば、Kafka の受信確認を All に設定した場合、データ フローでは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるまで待機してから、次のメッセージを送信します。

Kafka の受信確認を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[Kafka 受信確認] フィールドを使用して Kafka 受信確認レベルを指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

MQTT プロパティをコピー

既定では、MQTT プロパティのコピー設定は有効になっています。 これらのユーザー プロパティには、メッセージを送信する資産の名前を格納する subject などの値が含まれます。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[MQTT プロパティのコピー] フィールドの横にあるチェックボックスを使用して、MQTT プロパティのコピーを有効または無効にします。

以降のセクションでは、設定が有効になっているときに MQTT プロパティを Kafka ユーザー ヘッダーに変換する方法と、その逆を行う方法について説明します。

Kafka エンドポイントが宛先である

Kafka エンドポイントがデータ フローの宛先である場合には、MQTT v5 仕様に定義されているすべてのプロパティが、Kafka ユーザー ヘッダーに変換されます。 たとえば、Kafka に転送される MQTT v5 メッセージに "Content Type" が指定されている場合、Kafka のユーザー ヘッダー"Content Type":{specifiedValue} に変換されます。 次の表に定義されている他の組み込みの MQTT プロパティにも同様の規則が適用されます。

MQTT プロパティ 変換動作
Payload Format Indicator キー: "Payload Format Indicator"
値: "0" (ペイロードはバイト) または "1" (ペイロードは UTF-8)
Response Topic キー: "Response Topic"
値: 元のメッセージからの応答トピックのコピー。
Message Expiry Interval キー: "Message Expiry Interval"
値: メッセージの有効期限が切れるまでの秒数の UTF-8 表現。 詳細については、「Message Expiry Interval プロパティ」を参照してください。
Correlation Data: キー: "Correlation Data"
値: 元のメッセージからの相関関係データのコピー。 UTF-8 でエンコードされた多くの MQTT v5 プロパティとは異なり、相関関係データは無作為のデータになる可能性があります。
コンテンツの種類: キー: "Content Type"
値: 元のメッセージからの Content Type のコピー。

MQTT v5 ユーザー プロパティのキーと値のペアは、Kafka のユーザー ヘッダーに直接変換されます。 メッセージ内のユーザー ヘッダーに、組み込みの MQTT プロパティと同じ名前 (たとえば、"Correlation Data" という名前のユーザー ヘッダー) がある場合、MQTT v5 仕様プロパティ値やユーザー プロパティを転送するかどうかは未定義になります。

データ フローが、MQTT ブローカーからこれらのプロパティを受け取ることはありません。 このため、データ フローで次の項目は転送されません。

  • トピックの別名
  • サブスクリプション識別子
Message Expiry Interval プロパティ

Message Expiry Interval は、メッセージが破棄されるまで MQTT ブローカー内に滞在できる長さを指定します。

Message Expiry Interval が指定された MQTT メッセージをデータ フローが受信すると、次のようになります。

  • メッセージが受信された日時を記録します。
  • メッセージが宛先に出力される前に、元の有効期限間隔の時間から、メッセージがキューに入れられてからの時間が減算されます。
  • メッセージの有効期限が切れていない (上記の演算結果が > 0 である) 場合、メッセージは宛先に出力され、更新された Message Expiry Time が格納されます。
  • メッセージの有効期限が切れている (上記の演算結果が 0 以下である) 場合、メッセージはターゲットによって出力されません。

例 :

  • データ フローは、Message Expiry Interval = 3,600 秒の MQTT メッセージをを受信します。 対応する宛先は一時的に切断されますが、再接続できます。 この MQTT メッセージがターゲットに送信されるまでに 1,000 秒が経過します。 この場合、宛先のメッセージの Message Expiry Interval は 2,600 (3,600 - 1,000) 秒に設定されます。
  • データ フローは、Message Expiry Interval = 3,600 秒の MQTT メッセージをを受信します。 対応する宛先は一時的に切断されますが、再接続できます。 ただし、この場合、再接続には 4,000 秒かかります。 メッセージの有効期限が切れ、データ フローはこのメッセージを宛先に転送しません。

Kafka エンドポイントがデータ フロー ソースである

Note

Event Hubs エンドポイントをデータ フロー ソースとして使用する場合に、MQTT に変換された Kafka ヘッダーが破損するという既知の問題があります。 これは、内部で AMQP を使用するイベント ハブ クライアントでイベント ハブを使用する場合にのみ発生します。 たとえば、"foo"="bar" の場合、"foo" は変換されますが、値は "\xa1\x03bar" になります。

Kafka エンドポイントがデータ フロー ソースであるときは、Kafka のユーザー ヘッダーが MQTT v5 プロパティに変換されます。 次の表で、Kafka のユーザー ヘッダーを MQTT v5 プロパティに変換する方法について説明します。

Kafka ヘッダー 変換動作
キー キー: "Key"
値: 元のメッセージからの Key のコピー。
タイムスタンプ キー: "Timestamp"
値: Kafka タイムスタンプの UTF-8 エンコード。これは、Unix エポック以降のミリ秒数です。

Kafka のユーザー ヘッダーのキーと値のペア (すべて UTF-8 でエンコードされている場合) は、MQTT のユーザー キーと値のプロパティに直接変換されます。

UTF-8/バイナリの不一致

MQTT v5 は、UTF-8 ベースのプロパティのみをサポートできます。 データ フローが UTF-8 以外のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、データ フローは次を行います。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。

Kafka のソース ヘッダーを MQTT ターゲット プロパティにバイナリ転送する必要があるアプリケーションは、まずそれらを UTF-8 で (たとえば、Base64 経由で) エンコードする必要があります。

64KB 以上のプロパティという不一致

MQTT v5 プロパティは 64 KB 未満である必要があります。 データ フローが> = 64KB のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、データ フローは次を行います。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。
AMQP を使用する Event Hubs およびプロデューサーを使用するときのプロパティ変換

次のいずれかのアクションを実行している Kafka データ フロー ソース エンドポイントに、クライアントがメッセージを転送している場合。

  • Azure.Messaging.EventHubs などのクライアント ライブラリを使用して Event Hubs にメッセージを送信
  • AMQP を直接使用

注意する必要があるプロパティ変換の微妙な違いがあります。

次のいずれかを実行する必要があります。

  • プロパティの送信を避ける。
  • プロパティを送信する必要がある場合は、UTF-8 としてエンコードされた値を送信する。

Event Hubs は、プロパティを AMQP から Kafka に変換するときに、基盤の AMQP エンコード型をメッセージに含めます。 動作の詳細については、「異なるプロトコルを使用してコンシューマーとプロデューサー間でイベントを交換する (英語)」を参照してください。

次のコード例の中で、データ フロー エンドポイントは値 "foo":"bar" を受け取る際に、<0xA1 0x03 "bar"> としてプロパティを受け取っています。

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

データが UTF-8 ではないため、データ フロー エンドポイントは、ペイロード プロパティ <0xA1 0x03 "bar"> を MQTT メッセージに転送できません。 ただし、UTF-8 文字列を指定していれば、データ フロー エンドポイントは MQTT に送信する前に文字列を変換します。 UTF-8 文字列を使用すると、MQTT メッセージにユーザー プロパティとして "foo":"bar" が含められます。

UTF-8 ヘッダーのみが変換されます。 たとえば、プロパティが float として設定されている次のシナリオを考えます。

Properties = 
{
  {"float-value", 11.9 },
}

データ フロー エンドポイントは、"float-value" フィールドを含むパケットを破棄します。

propertyEventData.correlationId を含むすべてのイベント データ プロパティが転送されないわけではありません。 詳細については、「イベントのユーザー プロパティ (英語)」を参照してください。

CloudEvents

CloudEvents は、イベント データを一般的な方法で記述する方法です。 CloudEvents の設定は、CloudEvents 形式でメッセージを送受信するために使われます。 CloudEvents は、同じ、または異なるクラウド プロバイダー内にある異なるサービスが相互に通信する必要があるイベント駆動型アーキテクチャに使用できます。

CloudEventAttributes オプションは、Propagate または CreateOrRemap です。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択し、[クラウド イベント属性] フィールドを使用して CloudEvents 設定を指定します。

以下のセクションでは、CloudEvent プロパティがどのように伝達または作成され、再マップされるかについて説明します。

Propagate の設定

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合、メッセージはそのまま通過します。 必要なプロパティが存在する場合は、ce_ プレフィックスが CloudEvent プロパティ名に追加されます。

名前 必須 サンプルの値 出力名 出力値
specversion はい 1.0 ce-specversion そのままパススルーされます
type はい ms.aio.telemetry ce-type そのままパススルーされます
source はい aio://mycluster/myoven ce-source そのままパススルーされます
id はい A234-1234-1234 ce-id そのままパススルーされます
subject いいえ aio/myoven/telemetry/temperature ce-subject そのままパススルーされます
time いいえ 2018-04-05T17:31:00Z ce-time そのままパススルーされます。 タイムスタンプは更新されません。
datacontenttype いいえ application/json ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更されます。
dataschema いいえ sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 変換構成に出力データ変換スキーマが指定されている場合、dataschema は出力スキーマに変更されます。

CreateOrRemap の設定

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合は、プロパティが生成されます。

名前 必須 出力名 不足している場合に生成される値
specversion はい ce-specversion 1.0
type はい ce-type ms.aio-dataflow.telemetry
source はい ce-source aio://<target-name>
id はい ce-id ターゲット クライアントで生成された UUID
subject いいえ ce-subject メッセージが送信される出力トピック
time いいえ ce-time ターゲット クライアントで RFC 3339 として生成
datacontenttype いいえ ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更
dataschema いいえ ce-dataschema スキーマ レジストリで定義されているスキーマ

次のステップ

データ フローの詳細については、「データ フローを作成する」を参照してください。