Kafka から Azure Stream Analytics へのデータのストリーム
Kafka は、レコードのストリームを公開およびサブスクライブするために使用される分散ストリーミング プラットフォームです。 Kafka は、アプリで発生したレコードを処理できるように設計されています。 これは、Apache Software Foundation によって開発されたオープンソースのシステムで、Java と Scala で記述されています。
主なユース ケースを次に示します。
- メッセージング
- Web サイト アクティビティの追跡
- メトリック
- ログの集計
- ストリーム処理
Azure Stream Analytics では、Kafka クラスターに直接接続してデータを取り込むことができます。 このソリューションはロー コードで、Microsoft の Azure Stream Analytics チームによって完全に管理されているため、ビジネス コンプライアンス基準を満たすことができます。 Kafka 入力には下位互換性があり、バージョン 0.10 以降の最新のクライアント リリースを含むすべてのバージョンをサポートしています。 ユーザーは、構成に応じて、仮想ネットワーク内の Kafka クラスターやパブリック エンドポイントを含む Kafka クラスターに接続できます。 この構成は、既存の Kafka 構成規則に依存しています。 サポートされている圧縮の種類は、None、Gzip、Snappy、LZ4、Zstd です。
手順
この記事では、Azure Stream Analytics の入力ソースとして Kafka を設定する方法について説明します。 以下の 6 つの手順があります。
- Azure Stream Analytics ジョブを作成します。
- mTLS または SASL_SSL セキュリティ プロトコルを使用している場合は、マネージド ID を使用するように Azure Stream Analytics ジョブを構成します。
- mTLS または SASL_SSL セキュリティ プロトコルを使用している場合は、Azure Key Vault を構成します。
- Azure Key Vault に証明書をシークレットとしてアップロードします。
- アップロードされた証明書にアクセスするためのアクセス許可を Azure Stream Analytics に付与します。
- Azure Stream Analytics ジョブで Kafka 入力を構成します。
Note
Kafka クラスターの構成方法と使用している Kafka クラスターの種類によっては、上記の手順の一部が適用されない場合があります。 例: confluent cloud Kafka を使用している場合、Kafka コネクタを使用するために証明書をアップロードする必要はありません。 Kafka クラスターが仮想ネットワーク (VNET) 内またはファイアウォールの内側にある場合は、プライベート リンクまたは専用のネットワーク構成を使用して Kafka トピックにアクセスするように Azure Stream Analytics ジョブを構成する必要がある可能性があります。
構成
次の表に、Kafka 入力を作成するためのプロパティの名前とその説明を示します。
重要
Kafka クラスターを入力として構成するには、入力トピックのタイムスタンプの種類を LogAppendTime にする必要があります。 Azure Stream Analytics でサポートされる唯一のタイムスタンプの種類は、LogAppendTime です。 Azure Stream Analytics では、数値の 10 進形式のみがサポートされています。
プロパティ名 | 説明 |
---|---|
入出力エイリアス | 入力や出力を参照するためにクエリで使用されるフレンドリ名 |
ブートストラップ サーバー アドレス | Kafka クラスターへの接続を確立するホストとポートのペアのリスト。 |
Kafka トピック | メッセージのパブリッシュ/サブスクライブおよびイベント ドリブン処理を可能にする、名前付き、順序付き、およびパーティション分割されたデータ ストリーム。 |
セキュリティ プロトコル | Kafka クラスターへの接続方法。 Azure Stream Analytics は、mTLS、SASL_SSL、SASL_PLAINTEXT、または None をサポートしています。 |
コンシューマー グループ ID | 入力が属する必要がある Kafka コンシューマー グループの名前。 指定されていない場合は、自動的に割り当てられます。 |
イベント シリアル化形式 | 受信データ ストリームのシリアル化形式 (JSON、CSV、Parquet、Protobuf)。 |
認証と暗号化
Kafka クラスターへの接続には、次の 4 種類のセキュリティ プロトコルを使用できます:
プロパティ名 | 説明 |
---|---|
mTLS | 暗号化および認証。 PLAIN、SCRAM-SHA-256、および SCRAM-SHA-512 のセキュリティ メカニズムをサポートします。 |
SASL_SSL | SASL (簡易認証およびセキュリティ層) と Secure Sockets Layer (SSL) の異なる 2 つのセキュリティ メカニズムを組み合わせることで、認証と暗号化の両方がデータ伝送に適用されます。 SASL_SSL プロトコルは、PLAIN、SCRAM-SHA-256、および SCRAM-SHA-512 セキュリティ メカニズムをサポートします。 |
SASL_PLAINTEXT | 暗号化なしのユーザー名とパスワードを使用した標準認証 |
なし | 認証と暗号化はありません。 |
重要
Confluent Cloud は、API キー、OAuth、または SAML シングル サインオン (SSO) を使用した認証をサポートしています。 Azure Stream Analytics では、OAuth または SAML シングル サインオン (SSO) 認証はサポートされていません。 トピック レベルのアクセス権のある API キーを使用し、SASL_SSL セキュリティ プロトコルを介して、Confluent Cloud に接続できます。
Confluent Cloud Kafka への接続に関する詳細なチュートリアルについては、次のドキュメントを参照してください:
- Confluent Cloud Kafka の入力: Azure Stream Analytics を使用して Confluent Cloud Kafka からデータをストリーミングする
- Confluent Cloud Kafka の出力: Azure Stream Analytics から Confluent Cloud Kafka にデータをストリーミングする
Key Vault の統合
Note
mTLS または SASL_SSL セキュリティ プロトコルで信頼ストア証明書を使用する場合は、Azure Stream Analytics ジョブに Azure Key Vault とマネージド ID を設定する必要があります。 キー コンテナーのネットワーク設定を確認し、[すべてのネットワークからのパブリック アクセスを許可する] がオンになっていることを確認します。 キー コンテナーが VNET 内にあるか、特定のネットワークからのアクセスのみを許可するとします。 その場合は、キー コンテナーを含む VNET に ASA ジョブを挿入するか、ASA ジョブを VNET に挿入してから、サービス エンドポイントを使用して、ジョブを含む VNET にキー コンテナーを接続する必要があります。
Azure Stream Analytics は、Azure Key Vault とシームレスに統合され、mTLS または SASL_SSL セキュリティ プロトコルを使用する場合の認証と暗号化に必要な格納済みシークレットにアクセスできます。 Azure Stream Analytics ジョブは、マネージド ID を使ってユーザーの Azure キー コンテナーに接続し、セキュリティで保護された接続を確保して、シークレットの流出を防ぎます。 証明書はシークレットとしてキー コンテナーに格納され、PEM 形式である必要があります。
アクセス許可を使ってキー コンテナーを構成する
キー コンテナー リソースを作成するには、ドキュメント「クイック スタート: Azure portal を使用してキー コンテナーを作成する」に従ってください。証明書をアップロードするには、Key Vault に対する "Key Vault Administrator" アクセス権が必要です。 次の手順に従って、管理者アクセスを許可します。
Note
他のキー コンテナーのアクセス許可を付与するには、"所有者" アクセス許可が必要です。
[アクセス制御 (IAM)] を選択します。
[追加]>[ロールの割り当ての追加] を選択して、[ロールの割り当ての追加] ページを開きます。
次の構成を使ってロールを割り当てます。
設定 | 値 |
---|---|
Role | Key Vault Administrator |
アクセスの割り当て先 | ユーザー、グループ、またはサービス プリンシパル |
メンバー | <アカウント情報またはメール アドレス> |
Azure CLI を使って証明書をキー コンテナーにアップロードする
重要
このコマンドを正常に機能させるには、Key Vault に対する "Key Vault Administrator" アクセス許可を持っている必要があります。証明書はシークレットとしてアップロードする必要があります。 Azure CLI を使って、証明書をシークレットとしてキー コンテナーにアップロードする必要があります。 認証に使用される証明書の有効期限が切れると、Azure Stream Analytics ジョブは失敗します。 これを解決するには、キー コンテナー内の証明書を更新または置換し、Azure Stream Analytics ジョブを再起動する必要があります。
PowerShell を使用して Azure CLI がローカルに構成されていることを確認します。 Azure CLI の設定方法に関するガイダンスについては、次のページを参照してください: Azure CLI の概要
Azure CLI にログインする:
az login
キー コンテナーを含むサブスクリプションに接続する:
az account set --subscription <subscription name>
次のコマンドを実行すると、証明書をシークレットとしてキー コンテナーにアップロードできる:
<your key vault>
は、証明書をアップロードするキー コンテナーの名前です。 <name of the secret>
は、シークレットに付ける任意の名前であり、キー コンテナーで表示されるものです。 <file path to certificate>
は証明書が配置される場所へのパスです。 右クリックして、証明書へのパスをコピーできます。
az keyvault secret set --vault-name <your key vault> --name <name of the secret> --file <file path to certificate>
次に例を示します。
az keyvault secret set --vault-name mykeyvault --name kafkasecret --file C:\Users\Downloads\certificatefile.pem
マネージド ID を構成する
Azure Stream Analytics では、キー コンテナーにアクセスするマネージド ID を構成する必要があります。 マネージド ID を使うように ASA ジョブを構成するには、左側の [構成] の下にある [マネージド ID] タブに移動します。
- [構成] の [マネージド ID] タブを選択します。
- [ID の切り替え] を選択し、ジョブで使用する ID (システム割り当て ID またはユーザー割り当て ID) を選択します。
- ユーザー割り当て ID の場合は、ユーザー割り当て ID があるサブスクリプションを選択し、ID の名前を選択します。
- 確認して保存します。
キー コンテナー内の証明書にアクセスするためのアクセス許可を Stream Analytics ジョブに付与する
Azure Stream Analytics ジョブでキー コンテナー内のシークレットを読み取る場合、ジョブにはキー コンテナーにアクセスするためのアクセス許可が必要です。 Stream Analytics ジョブに特別なアクセス許可を付与するには、次の手順を使用します。
[アクセス制御 (IAM)] を選択します。
[追加]>[ロールの割り当ての追加] を選択して、[ロールの割り当ての追加] ページを開きます。
次の構成を使ってロールを割り当てます。
設定 | 値 |
---|---|
Role | キー コンテナー シークレット ユーザー |
マネージド ID | システム割り当てマネージド ID またはユーザー割り当てマネージド ID の Stream Analytics ジョブ |
メンバー | <Stream Analytics ジョブの名前> または <ユーザー割り当て ID の名前> |
仮想ネットワークの統合
Kafka クラスターが仮想ネットワーク内またはファイアウォールの内側にある場合は、プライベート リンクまたは専用のネットワーク構成を使用して Kafka トピックにアクセスするように Azure Stream Analytics ジョブを構成します。 詳しくは、Azure Virtual Network での Azure Stream Analytics ジョブの実行に関するドキュメントを参照してください。
制限事項
- 仮想ネットワーク/SWIFT を使うように Azure Stream Analytics ジョブを構成する場合、そのジョブは少なくとも 6 つのストリーミング ユニット、または 1 つの V2 ストリーミング ユニットで構成する必要があります。
- Azure Key Vault で mTLS または SASL_SSL を使用する場合は、Java Key Store を PEM 形式に変換する必要があります。
- Azure Stream Analytics で接続できる Kafka の最小バージョンは、バージョン 0.10 です。
- Azure Stream Analytics は、OAuth または SAML シングル サインオン (SSO) を使用した Confluent Cloud の認証をサポートしていません。 SASL_SSL プロトコルを介して API キーを使用する必要があります。
Note
Azure Stream Analytics Kafka 入力の使用に関して直接のサポートが必要な場合は、askasa@microsoft.com までお問い合わせください。