Azure Stream Analytics を使用して Confluent Cloud Kafka からデータをストリーミングする
この記事では、入力として Azure Stream Analytics ジョブを Confluent Cloud Kafka に直接接続する方法について説明します。
前提条件
- Confluent Cloud Kafka クラスターがある。
- Kafka クラスター用の API キー ファイルがあり、ユーザー名として使用する API キー、パスワードとして使用する API シークレット、ブートストラップ サーバー アドレスが含まれている。
- Azure Stream Analytics ジョブがある。 「クイック スタート: Azure portal を使用して Stream Analytics ジョブを作成する」のドキュメントに従って、Azure Stream Analytics ジョブを作成できます
- Confluent Cloud Kafka クラスターは、パブリックにアクセス可能であり、ファイアウォールの内側や、仮想ネットワークで保護されていない必要があります。
- Confluent Cloud Kafka クラスターのトピックのタイムスタンプの種類は、LogAppendTime である必要があります。 Confluent Cloud Kafka トピックの既定値は、CreateTime です。
- 既存のキー コンテナーがある。 「クイック スタート: Azure portal を使用してキー コンテナーを作成する」のドキュメントに従って、キー コンテナー リソースを作成できます
マネージド ID を使用するように Azure Stream Analytics を構成する
Azure Stream Analytics では、キー コンテナーにアクセスするマネージド ID を構成する必要があります。 マネージド ID を使うように Stream Analytics ジョブを構成するには、左側の [構成] の下にある [マネージド ID] タブに移動します。
- [構成] の [マネージド ID] タブをクリックします。
- [ID の切り替え] を選択し、ジョブで使用する ID (システム割り当て ID またはユーザー割り当て ID) を選択します。
- ユーザー割り当て ID の場合は、ユーザー割り当て ID があるサブスクリプションを選択し、ID の名前を選択します。
- 確認して保存します。
LetsEncrypt から証明書をダウンロードする
Azure Stream Analytics は librdkafka ベースのクライアントであり、Confluent Cloud に接続するには、Confluent Cloud がサーバー認証に使う TLS 証明書が必要です。 Confluent Cloud では、オープン証明機関 (CA) である Let's Encrypt の TLS 証明書が使用されます。
LetsEncrypt のサイトで、PEM 形式の ISRG Root X1 証明書をダウンロードしてください。
アクセス許可を使ってキー コンテナーを構成する
Azure Stream Analytics は、Azure Key Vault とシームレスに統合され、認証と暗号化に必要な格納済みシークレットにアクセスできます。 Azure Stream Analytics ジョブは、マネージド ID を使ってユーザーの Azure キー コンテナーに接続し、セキュリティで保護された接続を確保して、シークレットの流出を防ぎます。 ダウンロードした証明書を使用するには、まずキー コンテナーにアップロードする必要があります。
証明書をアップロードするには、キー コンテナーに対する "Key Vault Administrator" アクセス権が必要です。 次の手順に従って、管理者アクセス権を付与してください。
Note
他のキー コンテナーのアクセス許可を付与するには、"所有者" アクセス許可が必要です。
キー コンテナーで、[アクセス制御 (IAM)] を選択します。
[追加]>[ロールの割り当ての追加] を選択して、[ロールの割り当ての追加] ページを開きます。
次の構成を使ってロールを割り当てます。
設定 | 値 |
---|---|
Role | Key Vault Administrator |
アクセスの割り当て先 | ユーザー、グループ、またはサービス プリンシパル |
メンバー | <アカウント情報またはメール アドレス> |
Azure CLI を使用して、証明書をシークレットとしてキー コンテナーにアップロードします
重要
このコマンドを正常に機能させるには、Key Vault に対する "Key Vault Administrator" アクセス許可を持っている必要があります。証明書はシークレットとしてアップロードする必要があります。 Azure CLI を使って、証明書をシークレットとしてキー コンテナーにアップロードする必要があります。 認証に使用される証明書の有効期限が切れると、Azure Stream Analytics ジョブは失敗します。 これを解決するには、キー コンテナー内の証明書を更新または置換し、Azure Stream Analytics ジョブを再起動する必要があります。
PowerShell を使用して Azure CLI が構成され、ローカルにインストールされていることを確認します。 Azure CLI の設定方法に関するガイダンスについては、次のページを参照してください: Azure CLI の概要
Azure CLI にログインする:
az login
キー コンテナーを含むサブスクリプションに接続する:
az account set --subscription <subscription name>
次に例を示します。
az account set --subscription mymicrosoftsubscription
次のコマンドを実行すると、証明書をシークレットとしてキー コンテナーにアップロードできる:
<your key vault>
は、証明書をアップロードするキー コンテナーの名前です。 <name of the secret>
は、シークレットに付ける任意の名前であり、キー コンテナーで表示されるものです。 <file path to certificate>
は証明書が配置される場所へのパスです。 右クリックして、証明書へのパスをコピーできます。
az keyvault secret set --vault-name <your key vault> --name <name of the secret> --file <file path to certificate>
次に例を示します。
az keyvault secret set --vault-name mykeyvault --name confluentsecret --file C:\Users\Downloads\isrgrootx1.pem
キー コンテナー内の証明書にアクセスするためのアクセス許可を Stream Analytics ジョブに付与する
Azure Stream Analytics ジョブでキー コンテナー内のシークレットを読み取る場合、ジョブにはキー コンテナーにアクセスするためのアクセス許可が必要です。 Stream Analytics ジョブに特別なアクセス許可を付与するには、次の手順に従います。
キー コンテナーで、[アクセス制御 (IAM)] を選択します。
[追加]>[ロールの割り当ての追加] を選択して、[ロールの割り当ての追加] ページを開きます。
次の構成を使ってロールを割り当てます。
設定 | 値 |
---|---|
Role | キー コンテナー シークレット ユーザー |
マネージド ID | システム割り当てマネージド ID またはユーザー割り当てマネージド ID の Stream Analytics ジョブ |
メンバー | <Stream Analytics ジョブの名前> または <ユーザー割り当て ID の名前> |
Stream Analytics ジョブで Kafka 入力を構成する
重要
Kafka クラスターを入力として構成するには、入力トピックのタイムスタンプの種類を LogAppendTime にする必要があります。 Azure Stream Analytics でサポートされる唯一のタイムスタンプの種類は、LogAppendTime です。 Azure Stream Analytics は、数値の 10 進形式のみをサポートしています。
Stream Analytics ジョブで、[ジョブ トポロジ] で [入力] を選択します。
[入力の追加] > [Kafka] を選択して、[Kafka の新しい入力] 構成ブレードを開きます。
次の構成を使用します。
Note
SASL_SSL と SASL_PLAINTEXT の場合、Azure Stream Analytics では PLAIN SASL メカニズムのみがサポートされます。
プロパティ名 | 説明 |
---|---|
入力のエイリアス | 入力を参照するためにクエリで使うフレンドリ名 |
ブートストラップ サーバー アドレス | Confluent Cloud Kafka クラスターへの接続を確立するホストとポートのペアのリスト。 例: pkc-56d1g.eastus.azure.confluent.cloud:9092 |
Kafka トピック | Confluent Cloud Kafka クラスター内の Kafka トピックの名前。 |
セキュリティ プロトコル | [SASL_SSL] を選択します。 サポートされるメカニズムは PLAIN です。 |
コンシューマー グループ ID | 入力が属する必要がある Kafka コンシューマー グループの名前。 指定されていない場合は、自動的に割り当てられます。 |
イベント シリアル化形式 | 受信データ ストリームのシリアル化形式 (JSON、CSV、Parquet、Protobuf)。 |
重要
Confluent Cloud は、API キー、OAuth、または SAML シングル サインオン (SSO) を使用した認証をサポートしています。 Azure Stream Analytics は、OAuth または SAML シングル サインオン (SSO) を使用した認証をサポートしていません。 トピック レベルのアクセス権を持つ API キーを使い、SASL_SSL セキュリティ プロトコルを介して、Confluent Cloud に接続できます。 Confluent Cloud を認証するには、SASL_SSL を使用し、API キーを使用して Confluent Cloud を認証するようにジョブを構成する必要があります。
次の構成を使用します。
設定 | 値 |
---|---|
ユーザー名 | Confluent Cloud API キー |
Password | Confluent Cloud API シークレット |
キー コンテナー名 | アップロードされた証明書を含む Azure Key Vault の名前 |
トラストストアの証明書 | ISRG Root X1 証明書を保持する Key Vault シークレットの名前 |
構成を保存して接続をテストする
構成を保存します。 Azure Stream Analytics ジョブは、指定された構成を使用して検証されます。 Stream Analytics から Kafka クラスターに接続できた場合、成功した接続がポータルに表示されます。
制限事項
- キー コンテナーにアップロードされた証明書は PEM 形式である必要があります。
- Kafka のバージョンは、バージョン 0.10 以上である必要があります。
- Azure Stream Analytics は、OAuth または SAML シングル サインオン (SSO) を使用した Confluent Cloud の認証をサポートしていません。 SASL_SSL プロトコルを介して API キーを使用する必要があります。
- Azure CLI を使って、証明書をシークレットとしてキー コンテナーにアップロードする必要があります。 Azure portal を使用して、複数行のシークレットを含む証明書をキー コンテナーにアップロードすることはできません。
Note
Azure Stream Analytics Kafka 入力の使用に関して直接のサポートが必要な場合は、askasa@microsoft.com までお問い合わせください。