次の方法で共有


Azure Event Hubs と Kafka データフロー エンドポイントを構成する

重要

Azure Arc によって実現されている Azure IoT Operations プレビューは、現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。

Azure IoT Operations の一般公開リリースが提供されたときには、新規インストールをデプロイすることが必要になります。 プレビュー インストールからのアップグレードはできません。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

Azure IoT Operations プレビューと Apache Kafka ブローカー間の双方向通信を設定するために、データフロー エンドポイントを構成できます。 この構成では、エンドポイント、トランスポート層セキュリティ (TLS)、認証、およびその他の設定を指定できます。

前提条件

Azure Event Hubs

Azure Event Hubs は Kafka プロトコルと互換性があり、いくつかの制限付きでデータフローとともに使用できます。

Azure Event Hubs 名前空間を作成し、そこにイベント ハブを作成する

まず、Kafka 対応の Azure Event Hubs 名前空間を作成します

次に、名前空間にイベント ハブを作成します。 個々のイベント ハブは、Kafka トピックに対応します。 同じ名前空間に複数のイベント ハブを作成して、複数の Kafka トピックを表すことができます。

マネージド ID を Event Hubs 名前空間に割り当てる

Kafka エンドポイントのデータフロー エンドポイントを構成するには、Azure Arc 対応 Kubernetes クラスターのマネージド ID を使用することをお勧めします。 この方法は安全であり、シークレット管理の必要がなくなります。

  1. Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。
  2. Azure IoT Operations Arc 拡張機能の後一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7
  3. 拡張機能の名前を使用して、Azure portal でマネージド ID を検索します。 たとえば、azure-iot-operations-xxxx7 を検索します。
  4. Azure Event Hubs Data Sender または Azure Event Hubs Data Receiver ロールを使用して、Azure IoT Operations Arc 拡張機能のマネージド ID を Event Hubs 名前空間に割り当てます。

データフロー エンドポイントを作成する

最後に、DataflowEndpoint リソースを作成します。 実際の値を使用して、<ENDPOINT_NAME> などのプレースホルダー値を置き換えます。

  1. Operations Experience で、[データフロー エンドポイント] タブを選択します。

  2. [新しいデータフロー エンドポイントの作成] で、[Azure Event Hubs]>[新規] を選びます。

    Operations Experience を使用して Azure Event Hubs データフロー エンドポイントを作成するスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 内容
    Name データフロー エンドポイントの名前。
    Host 形式 <NAMEPSACE>.servicebus.windows.net:9093 の Kafka ブローカーのホスト名。 Event Hubs のホスト設定にポート番号 9093 を含めます。
    認証方法 認証に使用する方式。 システム割り当てマネージド ID を選択する
  4. [適用] を選択してエンドポイントをプロビジョニングします。

Note

Kafka トピックまたは個々のイベント ハブは、後でデータフローを作成するときに構成されます。 Kafka トピックは、データフロー メッセージの宛先です。

Event Hubs への認証に接続文字列を使用する

Operations Experience のデータフロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[SASL] を選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 Plain を選択します。
同期されたシークレット名 接続文字列を含む Kubernetes シークレットの名前。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。
パスワード参照またはトークン シークレット SASL 認証に使用されるパスワードへの参照またはトークン シークレット。

制限事項

Azure Event Hubs は、Kafka がサポートするすべての圧縮の種類をサポートするわけではありません。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。 他の圧縮の種類を使用すると、エラーが発生する可能性があります。

カスタム Kafka ブローカー

Event-Hub 以外の Kafka ブローカーのデータフロー エンドポイントを構成するには、必要に応じてホスト、TLS、認証、およびその他の設定を設定します。

  1. Operations Experience で、[データフロー エンドポイント] タブを選択します。

  2. [新しいデータフロー エンドポイントの作成] で、[Custom Kafka Broker] (カスタム Kafka ブローカー)>[新規] を選びます。

    Operations Experience を使用して Kafka データフロー エンドポイントを作成するスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 内容
    Name データフロー エンドポイントの名前。
    Host 形式 <Kafa-broker-host>:xxxx の Kafka ブローカーのホスト名。 ホスト設定にポート番号を含めます。
    認証方法 認証に使用する方式。 [SASL] または [X509 証明書] を選択します。
    SASL の種類 SASL 認証の種類。 [Plain]、[ScramSha256]、または [ScramSha512] を選びます。 [SASL] を使用する場合は必須です。
    同期されたシークレット名 シークレットの名前。 [SASL] または [X509] を使用する場合は必須です。
    Username reference of token secret (トークン シークレットのユーザー名参照) SASL トークン シークレット内のユーザー名への参照。 [SASL] を使用する場合は必須です。
    X509 クライアント証明書 認証に使用する X.509 クライアント証明書。 [X509] を使用する場合は必須です。
    X509 中間証明書 X.509 クライアント証明書チェーンの中間証明書。 [X509] を使用する場合は必須です。
    X509 クライアント キー X.509 クライアント証明書に対応する秘密キー。 [X509] を使用する場合は必須です。
  4. [適用] を選択してエンドポイントをプロビジョニングします。

Note

現在、Operations Experience は、ソースとしての Kafka データフロー エンドポイントの使用をサポートしていません。 ソース Kafka データフロー エンドポイントを使用したデータフローの作成は、Kubernetes または Bicep を使用して行うことができます。

エンドポイント設定をカスタマイズする場合は、次のセクションを使用して詳細を確認してください。

使用可能な認証方法

Kafka データ フロー エンドポイントで使用できる認証方法は次のとおりです。 Azure Key Vault を構成し、ワークロード ID を有効にすることで、安全な設定を有効にする方法の詳細については、「Azure IoT Operations Preview 展開で安全な設定を有効にする」を参照してください。

SASL

認証に SASL を使用するには、SASL 認証方法を指定し、SASL の種類と、SASL トークンを含むシークレットの名前を持つシークレット参照を構成します。

Operations Experience のデータフロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[SASL] を選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 使用する SASL 認証の種類。 サポートされている型は、PlainScramSha256ScramSha512 です。
同期されたシークレット名 SASL トークンを含む Kubernetes シークレットの名前。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。
パスワード参照またはトークン シークレット SASL 認証に使用されるパスワードへの参照またはトークン シークレット。

サポートされている SASL の種類は次のとおりです。

  • Plain
  • ScramSha256
  • ScramSha512

このシークレットは、Kafka データフロー エンドポイントと同じ名前空間に存在する必要があります。 シークレットには、SASL トークンが、キーと値のペアの形式で存在する必要があります。 次に例を示します。

X.509

認証に X.509 を使用するには、X509Certificate メソッドを使用するように Kafka 設定の認証セクションを更新し、X.509 証明書を保持するシークレットへの参照を指定します。

Operations Experience のデータフロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[X509 証明書] を選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
同期されたシークレット名 シークレットの名前。
X509 クライアント証明書 認証に使用する X.509 クライアント証明書。
X509 中間証明書 X.509 クライアント証明書チェーンの中間証明書。
X509 クライアント キー X.509 クライアント証明書に対応する秘密キー。

システム割り当てマネージド ID

認証にシステム割り当てマネージド ID を使用するには、Event Hubs からメッセージを送受信するアクセス許可を付与するロールを Azure IoT Operation マネージド ID に割り当てます。

  1. Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。
  2. Azure IoT Operations Arc 拡張機能の後一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7
  3. 拡張機能の名前を使用して、Azure portal でマネージド ID を検索します。 たとえば、azure-iot-operations-xxxx7 を検索します。
  4. Azure Event Hubs データ所有者Azure Event Hubs データ送信者Azure Event Hubs データ受信者 などのメッセージを送受信するアクセス許可を付与するロールを Azure IoT Operations Arc 拡張機能マネージド ID に割り当てます。 詳細については、「Event Hubs リソースにアクセスするための Microsoft Entra ID によりアプリケーションを認証する」を参照してください。
  5. Kafka 設定でマネージド ID 認証方法を指定します。 ほとんどの場合、他の設定を指定する必要はありません。

Operations Experience のデータフロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[システム割り当てマネージド ID] を選択します。

この構成では、既定の対象ユーザーを使用してマネージド ID を作成します。これは、https://<NAMESPACE>.servicebus.windows.net という形式の Event Hubs 名前空間のホスト値と同じです。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、audience フィールドを目的の値に設定できます。

Operations Experience ではサポートされていません。

ユーザー割り当てマネージド ID

認証にユーザー マネージド ID を使用するには、まず、セキュリティで保護された設定を有効にして Azure IoT Operations をデプロイする必要があります。 詳細については、「Azure IoT Operations プレビューのデプロイでセキュリティで保護された設定を有効にする」を参照してください。

その後、Kafka 設定で、ユーザー割り当てマネージド ID 認証方法を、マネージド ID のクライアント ID とテナント ID と共に指定します。

Operations Experience のデータフロー エンドポイント設定ページで、[基本] タブを選択し、[認証方法]>[ユーザー割り当てマネージド ID] を選択します。

ここでは、スコープはマネージド ID の対象ユーザーです。 既定値は、Event Hubs 名前空間のホスト値と同じで、形式は https://<NAMESPACE>.servicebus.windows.net です。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、Bicep または Kubernetes を使用してスコープ フィールドを目的の値に設定できます。

匿名

匿名認証を使用するには、Anonymous メソッドを使用するように Kafka 設定の認証セクションを更新します。

Operations Experience ではまだサポートされていません。 既知の問題を参照してください。

詳細設定

TLS、信頼された証明機関証明書、Kafka メッセージング設定、バッチ処理、CloudEvents などの、Kafka データフロー エンドポイントの詳細設定を設定できます。 これらの設定は、データフロー エンドポイントの [詳細設定] ポータル タブ、またはデータフロー エンドポイントのリソース内で設定できます。

Operations Experience で、データフロー エンドポイントの [詳細設定] タブを選択します。

Operations Experience を使って Kafka データフロー エンドポイントの詳細設定を行っているスクリーンショット。

TLS の設定

TLS モード

Kafka エンドポイントの TLS を有効または無効にするには、TLS 設定の mode 設定を更新します。

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[TLS モードが有効] の横にあるチェックボックス使用します。

TLS モードは、Enabled または Disabled に設定できます。 モードが Enabled に設定されている場合、データフローは Kafka ブローカーに対して安全な接続を使用します。 モードが Disabled に設定されている場合、データフローは Kafka ブローカーに対して安全ではない接続を使用します。

信頼された証明機関証明書

Kafka ブローカーへのセキュリティで保護された接続を確立するように、Kafka エンドポイントの信頼された CA 証明書を構成します。 この設定は、Kafka ブローカーが自己署名証明書または既定では信頼されていないカスタム証明機関によって署名された証明書を使用する場合に重要です。

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[信頼された CA 証明書構成マップ] フィールドを使用して、信頼された CA 証明書を含む ConfigMap を指定します。

この ConfigMap には、PEM 形式の証明機関証明書が含まれている必要があります。 この ConfigMap は、Kafka データフロー リソースと同じ名前空間に存在する必要があります。 次に例を示します。

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

ヒント

Azure Event Hubs に接続する場合、Event Hubs サービスは既定で信頼されているパブリック CA によって署名された証明書を使用するため、CA 証明書は必要ありません。

コンシューマー グループ ID

コンシューマー グループ ID は、Kafka トピックからのメッセージの読み取りにデータフローが使用するコンシューマー グループを識別するために使用されます。 コンシューマー グループ ID は、Kafka ブローカー内で一意である必要があります。

重要

Kafka エンドポイントをソースとして使用する場合は、コンシューマー グループ ID が必要です。 それ以外の場合、データフローでは Kafka トピックからメッセージを読み取ることができないので、"Kafka 型のソース エンドポイントには consumerGroupId が定義されている必要があります" というエラーが表示されます。

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[コンシューマー グループ ID] フィールドを使用してコンシューマー グループ ID を指定します。

この設定は、ソースとして (つまり、データフローがコンシューマーである) エンドポイントが使用される場合にのみ有効になります。

圧縮

圧縮フィールドを使用すると、Kafka トピックに送信されるメッセージの圧縮が可能になります。 圧縮は、データ転送に必要なネットワーク帯域幅とストレージ領域を減らすのに役立ちます。 ただし、圧縮により、オーバーヘッドと待機時間もプロセスに追加されます。 サポートされている圧縮の種類を次の表に示します。

説明
None 圧縮またはバッチ処理は適用されません。 compression が指定されていない場合、既定値は None です。
Gzip GZIP 圧縮とバッチ処理が適用されます。 GZIP は、圧縮率と速度のバランスが良い汎用圧縮アルゴリズムです。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。
Snappy すばやい圧縮とバッチ処理が適用されます。 Snappy は、中程度の圧縮率と速度を提供する高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。
Lz4 LZ4 圧縮とバッチ処理が適用されます。 LZ4は、低圧縮比と高速を提供する高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。

圧縮を構成するには:

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[圧縮] フィールドを使用して圧縮の種類を指定します。

この設定は、データフローがプロデューサーである宛先としてエンドポイントが使用される場合にのみ有効になります。

バッチ処理

圧縮とは別に、Kafka トピックに送信する前にメッセージのバッチ処理を構成することもできます。 バッチ処理を使用すると、複数のメッセージをグループ化して 1 つの単位として圧縮できるため、圧縮効率が向上し、ネットワークのオーバーヘッドが削減されます。

フィールド Description 必須
mode Enabled または Disabled を指定できます。 Kafka には "バッチ処理されない" メッセージングという概念がないため、既定値は Enabled です。 Disabled に設定すると、バッチ処理が最小化され、毎回 1 つのメッセージでバッチが作成されます。 いいえ
latencyMs メッセージをバッファー処理してから送信できる最大時間間隔 (ミリ秒単位)。 この間隔に達した場合、バッファー内のすべてのメッセージは、メッセージの数や大きさに関係なく、バッチとして送信されます。 設定しない場合、既定値は 5 です。 いいえ
maxMessages 送信前にバッファーに格納できるメッセージの最大数。 この数に達すると、バッファーに格納されているメッセージの大きさやバッファーの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 設定しない場合、既定値は 100000 です。 いいえ
maxBytes 送信前にバッファーに格納できる最大サイズ (バイト単位)。 このサイズに達すると、バッファーに格納されているメッセージの数やバッファーの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 既定値は 1000000 (1 MB) です。 いいえ

たとえば、latencyMs を 1000 に、maxMessages を 100 に、maxBytes を 1024 に設定した場合、バッファーのメッセージが 100 件になるか、バッファーのバイト数が 1,024 になるか、最後の送信から 1,000 ミリ秒が経過するかのいずれかが最初に発生したときにメッセージが送信されます。

バッチ処理を構成するには:

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[バッチ処理が有効] フィールドを使用してバッチ処理を有効にします。 [バッチ処理の待機時間][最大バイト数][メッセージ数] フィールドを使用してバッチ処理の設定を指定します。

この設定は、データフローがプロデューサーである宛先としてエンドポイントが使用される場合にのみ有効になります。

パーティション処理戦略

パーティション処理戦略は、Kafka トピックに送信するときにメッセージを Kafka パーティションに割り当てる方法を制御します。 Kafka パーティションは、並列処理とフォールト トレランスを可能にする Kafka トピックの論理セグメントです。 Kafka トピック内の各メッセージには、メッセージの識別と順序付けに使用されるパーティションとオフセットがあります。

この設定は、データフローがプロデューサーである宛先としてエンドポイントが使用される場合にのみ有効になります。

既定では、データフローはラウンド ロビン アルゴリズムを使用して、ランダムなパーティションにメッセージを割り当てます。 ただし、MQTT トピック名や MQTT メッセージ プロパティなど、いくつかの条件に基づいてパーティションにメッセージを割り当てるには、さまざまな方法を使用できます。 これは、負荷分散、データの局所性、またはメッセージの順序付けを改善するのに役立ちます。

説明
Default ラウンド ロビン アルゴリズムを使用して、ランダム パーティションにメッセージを割り当てます。 これは、戦略が指定されていない場合の既定値です。
Static データフローのインスタンス ID から派生した固定パーティション番号にメッセージを割り当てます。 これは、各データフロー インスタンスが異なるパーティションにメッセージを送信することを意味します。 これにより、負荷分散とデータの局所性が向上します。
Topic パーティション分割のキーとして、データフロー ソースから取得する MQTT トピック名を使用します。 これは、同じ MQTT トピック名のメッセージが同じパーティションに送信されることを意味します。 これは、メッセージの順序付けとデータの局所性の向上に役立ちます。
Property パーティション分割のキーとして、データフロー ソースから取得する MQTT メッセージ プロパティを使用します。 partitionKeyProperty フィールドにプロパティの名前を指定します。 これは、同じプロパティ値を持つメッセージが同じパーティションに送信されることを意味します。 これにより、カスタム条件に基づいてメッセージの順序付けとデータの局所性を向上させることができます。

たとえば、パーティション処理戦略を Property に設定し、パーティション キー プロパティを device-id に設定した場合、同じ device-id プロパティを持つメッセージは同じパーティションに送信されます。

パーティション処理戦略を構成するには:

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[パーティション処理戦略] フィールドを使用してパーティション処理戦略を指定します。 戦略が Property に設定されている場合は、パーティション キー プロパティ フィールドを使用して、パーティション化に使用するプロパティを指定します。

Kafka の受信確認

Kafka 受信確認 (ACK) は、Kafka トピックに送信されるメッセージの持続性と一貫性を制御するために使用されます。 プロデューサーは、Kafka トピックにメッセージを送信するときに、メッセージがトピックに正常に書き込まれ、Kafka クラスター全体にレプリケートされたことを確認するために、Kafka ブローカーにさまざまなレベルの受信確認を要求できます。

この設定は、宛先として (つまり、データフローがプロデューサーである) エンドポイントが使用される場合にのみ有効になります。

Value 説明
None データフローは、Kafka ブローカーからの受信確認を待機しません。 この設定は最も高速ですが、最も持続性の低いオプションです。
All データフローは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるのを待機します。 この設定は最も低速ですが、最も持続性の高いオプションです。 この設定は既定のオプションでもあります
One データフローは、メッセージがリーダー パーティションと少なくとも 1 個のフォロワー パーティションに書き込まれるのを待機します。
Zero データフローは、メッセージがリーダー パーティションに書き込まれるのを待機しますが、フォロワーからの受信確認を待機しません。 これは One よりも高速ですが、持続性は低くなります。

たとえば、Kafka 受信確認を All に設定した場合、データフローでは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるまで待機してから、次のメッセージを送信します。

Kafka の受信確認を構成するには:

操作エクスペリエンスのデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[Kafka 受信確認] フィールドを使用して Kafka 受信確認レベルを指定します。

この設定は、データフローがプロデューサーである宛先としてエンドポイントが使用される場合にのみ有効になります。

MQTT プロパティをコピー

既定では、MQTT プロパティのコピー設定は有効になっています。 これらのユーザー プロパティには、メッセージを送信する資産の名前を格納する subject などの値が含まれます。

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[MQTT プロパティのコピー] フィールドの横にあるチェックボックスを使用して、MQTT プロパティのコピーを有効または無効にします。

以降のセクションでは、設定が有効になっているときに MQTT プロパティを Kafka ユーザー ヘッダーに変換する方法と、その逆を行う方法について説明します。

Kafka エンドポイントが宛先である

Kafka エンドポイントがデータフローの宛先であるとき、MQTT v5 仕様に定義されているすべてのプロパティが Kafka ユーザー ヘッダーに変換されます。 たとえば、Kafka に転送される MQTT v5 メッセージに "Content Type" が指定されている場合、Kafka のユーザー ヘッダー"Content Type":{specifiedValue} に変換されます。 次の表に定義されている他の組み込みの MQTT プロパティにも同様の規則が適用されます。

MQTT プロパティ 変換動作
Payload Format Indicator キー: "Payload Format Indicator"
値: "0" (ペイロードはバイト) または "1" (ペイロードは UTF-8)
Response Topic キー: "Response Topic"
値: 元のメッセージからの応答トピックのコピー。
Message Expiry Interval キー: "Message Expiry Interval"
値: メッセージの有効期限が切れるまでの秒数の UTF-8 表現。 詳細については、「Message Expiry Interval プロパティ」を参照してください。
Correlation Data: キー: "Correlation Data"
値: 元のメッセージからの相関関係データのコピー。 UTF-8 でエンコードされた多くの MQTT v5 プロパティとは異なり、相関関係データは無作為のデータになる可能性があります。
コンテンツの種類: キー: "Content Type"
値: 元のメッセージからの Content Type のコピー。

MQTT v5 ユーザー プロパティのキーと値のペアは、Kafka のユーザー ヘッダーに直接変換されます。 メッセージ内のユーザー ヘッダーに、組み込みの MQTT プロパティと同じ名前 (たとえば、"Correlation Data" という名前のユーザー ヘッダー) がある場合、MQTT v5 仕様プロパティ値やユーザー プロパティを転送するかどうかは未定義になります。

データフローが、MQTT ブローカーからこれらのプロパティを受け取ることはありません。 このため、データフローで次の項目は転送されません。

  • トピックの別名
  • サブスクリプション識別子
Message Expiry Interval プロパティ

Message Expiry Interval は、メッセージが破棄されるまで MQTT ブローカー内に滞在できる長さを指定します。

Message Expiry Interval が指定された MQTT メッセージをデータフローが受信すると、次のようにします。

  • メッセージが受信された日時を記録します。
  • メッセージが宛先に出力される前に、元の有効期限間隔の時間から、メッセージがキューに入れられてからの時間が減算されます。
  • メッセージの有効期限が切れていない (上記の演算結果が > 0 である) 場合、メッセージは宛先に出力され、更新された Message Expiry Time が格納されます。
  • メッセージの有効期限が切れている (上記の演算結果が 0 以下である) 場合、メッセージはターゲットによって出力されません。

例 :

  • Message Expiry Interval が 3,600 秒に指定された MQTT メッセージをデータフローが受信するとします。 対応する宛先は一時的に切断されますが、再接続できます。 この MQTT メッセージがターゲットに送信されるまでに 1,000 秒が経過します。 この場合、宛先のメッセージの Message Expiry Interval は 2,600 (3,600 - 1,000) 秒に設定されます。
  • Message Expiry Interval が 3,600 秒に指定された MQTT メッセージをデータフローが受信するとします。 対応する宛先は一時的に切断されますが、再接続できます。 ただし、この場合、再接続には 4,000 秒かかります。 メッセージの有効期限が切れ、データフローはこのメッセージを宛先に転送しません。

Kafka エンドポイントがデータフロー ソースである

Note

Event Hubs エンドポイントをデータフロー ソースとして使用するとき、Kafka ヘッダーが MQTT に変換されると破損するという既知の問題があります。 これは、内部で AMQP を使用するイベント ハブ クライアントでイベント ハブを使用する場合にのみ発生します。 たとえば、"foo"="bar" の場合、"foo" は変換されますが、値は "\xa1\x03bar" になります。

Kafka エンドポイントがデータフロー ソースであるときは、Kafka のユーザー ヘッダーが MQTT v5 プロパティに変換されます。 次の表で、Kafka のユーザー ヘッダーを MQTT v5 プロパティに変換する方法について説明します。

Kafka ヘッダー 変換動作
キー キー: "Key"
値: 元のメッセージからの Key のコピー。
タイムスタンプ キー: "Timestamp"
値: Kafka タイムスタンプの UTF-8 エンコード。これは、Unix エポック以降のミリ秒数です。

Kafka のユーザー ヘッダーのキーと値のペア (すべて UTF-8 でエンコードされている場合) は、MQTT のユーザー キーと値のプロパティに直接変換されます。

UTF-8/バイナリの不一致

MQTT v5 は、UTF-8 ベースのプロパティのみをサポートできます。 データフローが UTF-8 以外のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、データフローは次のようにします。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。

Kafka のソース ヘッダーを MQTT ターゲット プロパティにバイナリ転送する必要があるアプリケーションは、まずそれらを UTF-8 で (たとえば、Base64 経由で) エンコードする必要があります。

64KB 以上のプロパティという不一致

MQTT v5 プロパティは 64 KB 未満である必要があります。 データフローが 64KB 以上のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、データフローは次のようにします。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。
AMQP を使用する Event Hubs およびプロデューサーを使用するときのプロパティ変換

クライアントがメッセージを転送している場合、Kafka データフロー ソース エンドポイントは次のいずれかのアクションを実行しています。

  • Azure.Messaging.EventHubs などのクライアント ライブラリを使用して Event Hubs にメッセージを送信
  • AMQP を直接使用

注意する必要があるプロパティ変換の微妙な違いがあります。

次のいずれかを実行する必要があります。

  • プロパティの送信を避ける。
  • プロパティを送信する必要がある場合は、UTF-8 としてエンコードされた値を送信する。

Event Hubs は、プロパティを AMQP から Kafka に変換するときに、基盤の AMQP エンコード型をメッセージに含めます。 動作の詳細については、「異なるプロトコルを使用してコンシューマーとプロデューサー間でイベントを交換する (英語)」を参照してください。

次のコード例では、データフロー エンドポイントが値 "foo":"bar" を受け取ると、<0xA1 0x03 "bar"> としてプロパティを受け取ります。

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

データが UTF-8 ではないため、データフロー エンドポイントはペイロード プロパティ <0xA1 0x03 "bar"> を MQTT メッセージに転送できません。 ただし、UTF-8 文字列を指定した場合、データフロー エンドポイントは MQTT に送信する前に文字列を変換します。 UTF-8 文字列を使用すると、MQTT メッセージにユーザー プロパティとして "foo":"bar" が含められます。

UTF-8 ヘッダーのみが変換されます。 たとえば、プロパティが float として設定されている次のシナリオを考えます。

Properties = 
{
  {"float-value", 11.9 },
}

データフロー エンドポイントは、"float-value" フィールドを含むパケットを破棄します。

propertyEventData.correlationId を含むすべてのイベント データ プロパティが転送されないわけではありません。 詳細については、「イベントのユーザー プロパティ (英語)」を参照してください。

CloudEvents

CloudEvents は、イベント データを一般的な方法で記述する方法です。 CloudEvents の設定は、CloudEvents 形式でメッセージを送受信するために使われます。 CloudEvents は、同じ、または異なるクラウド プロバイダー内にある異なるサービスが相互に通信する必要があるイベント駆動型アーキテクチャに使用できます。

CloudEventAttributes オプションは、Propagate または CreateOrRemap です。

Operations Experience のデータフロー エンドポイント設定ページで、[詳細設定] タブを選択し、[クラウド イベント属性] フィールドを使用して CloudEvents 設定を指定します。

以下のセクションでは、CloudEvent プロパティがどのように伝達または作成され、再マップされるかについて説明します。

Propagate の設定

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合、メッセージはそのまま通過します。 必要なプロパティが存在する場合は、ce_ プレフィックスが CloudEvent プロパティ名に追加されます。

名前 必須 サンプルの値 出力名 出力値
specversion はい 1.0 ce-specversion そのままパススルーされます
type はい ms.aio.telemetry ce-type そのままパススルーされます
source はい aio://mycluster/myoven ce-source そのままパススルーされます
id はい A234-1234-1234 ce-id そのままパススルーされます
subject いいえ aio/myoven/telemetry/temperature ce-subject そのままパススルーされます
time いいえ 2018-04-05T17:31:00Z ce-time そのままパススルーされます。 タイムスタンプは更新されません。
datacontenttype いいえ application/json ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更されます。
dataschema いいえ sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 変換構成に出力データ変換スキーマが指定されている場合、dataschema は出力スキーマに変更されます。

CreateOrRemap の設定

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合は、プロパティが生成されます。

名前 必須 出力名 不足している場合に生成される値
specversion はい ce-specversion 1.0
type イエス ce-type ms.aio-dataflow.telemetry
source イエス ce-source aio://<target-name>
id はい ce-id ターゲット クライアントで生成された UUID
subject いいえ ce-subject メッセージが送信される出力トピック
time いいえ ce-time ターゲット クライアントで RFC 3339 として生成
datacontenttype いいえ ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更
dataschema いいえ ce-dataschema スキーマ レジストリで定義されているスキーマ

次のステップ

データフローの詳細については、データフローの作成に関するページを参照してください。