HDInsight 上の Apache Kafka を Azure IoT Hub と共に使用する
Apache Kafka Connect Azure IoT Hub コネクタを使用して HDInsight 上の Apache Kafka と Azure IoT Hub の間でデータを移動する方法を学習します。 このドキュメントでは、クラスター内のエッジ ノードから IoT Hub コネクタを実行する方法を説明します。
Kafka Connect API では、Kafka へのデータのプルまたは Kafka から別のシステムへのデータのプッシュを継続的に行うコネクタを実装できます。 Apache Kafka Connect Azure IoT Hub は、Azure IoT Hub から Kafka にデータをプルするコネクタです。 Kafka から IoT Hub にデータをプッシュすることもことできます。
IoT Hub からプルする場合は、ソース コネクタを使用します。 IoT Hub にプッシュする場合は、シンク コネクタを使用します。 IoT Hub コネクタでは、ソース コネクタとシンク コネクタの両方が提供されます。
次の図は、コネクタを使用するときの Azure IoT Hub と HDInsight 上の Kafka の間のデータ フローを示しています。
API を接続する方法の詳細については、https://kafka.apache.org/documentation/#connect を参照してください。
前提条件
HDInsight 上の Apache Kafka クラスター 詳細については、「HDInsight の Kafka のクイックスタート」ドキュメントをご覧ください。
Kafka クラスター内のエッジ ノード。 詳しくは、「HDInsight でのエッジノードの使用」ドキュメントをご覧ください。
SSH クライアント 詳細については、SSH を使用して HDInsight (Apache Hadoop) に接続する方法に関するページを参照してください。
Azure IoT Hub とデバイス この記事では、Raspberry Pi オンライン シミュレーターの Azure IoT Hub への接続を検討するようお勧めします。
コネクタを作成します
コネクタのソースを https://github.com/Azure/toketi-kafka-connect-iothub/ から開発環境にダウンロードします。
コマンドプロンプトから、
toketi-kafka-connect-iothub-master
ディレクトリに移動します。 次に、次のコマンドを使用して、プロジェクトをビルドおよびパッケージ化します。sbt assembly
ビルドが完了するまで数分かかります。 このコマンドにより、プロジェクトの
toketi-kafka-connect-iothub-master\target\scala-2.11
ディレクトリにkafka-connect-iothub-assembly_2.11-0.7.0.jar
というファイルが作成されます。
コネクタをインストールする
.jar ファイルを HDInsight クラスター上の Kafka のエッジ ノードにアップロードします。 以下のコマンドを編集して、
CLUSTERNAME
をクラスターの実際の名前に置き換えます。 SSH ユーザーアカウントの既定値とエッジノードの名前が使用されています。必要に応じて変更してください。scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
ファイルのコピーが完了したら、SSH を使用してエッジ ノードに接続します。
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Kafka の
libs
ディレクトリにコネクタをインストールするには、次のコマンドを使用します。sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
残りの手順では、SSH 接続を有効のままにしておきます。
Apache Kafka を構成する
エッジ ノードへの SSH 接続から、次の手順を使用して、コネクタをスタンドアロン モードで実行するように Kafka を構成します。
パスワード変数を設定します。 PASSWORD をクラスターのログインパスワードに置き換え、次のコマンドを入力します。
export password='PASSWORD'
jq ユーティリティをインストールします。 jq は、Ambari クエリから返された JSON ドキュメントの処理をしやすくします。 次のコマンドを入力します。
sudo apt -y install jq
Kafka ブローカーのアドレスを取得します。 クラスターには多くのブローカーがある場合がありますが、参照する必要があるのは 1 つか 2 つだけです。 2 つのブローカー ホストのアドレスを取得するには、次のコマンドを使用します。
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
後で使用するために値をコピーします。 次のテキストのような値が返されます。
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Apache ZooKeeper ノードのアドレスを取得します。 クラスターには複数の Zookeeper ノードがありますが、参照する必要があるのは 1 つか 2 つだけです。 次のコマンドを使用して、変数
KAFKAZKHOSTS
にアドレスを格納します。export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
コネクタをスタンドアロン モードで実行する場合は、Kafka ブローカーと通信するために
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
ファイルが使用されます。connect-standalone.properties
ファイルを編集するには、次のコマンドを使用します。sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
次の編集を行います。
現在の値 新しい値 解説 bootstrap.servers=localhost:9092
localhost:9092
値を前の手順のブローカー ホストに置き換えます。エッジ ノードのスタンドアロン構成を設定して、Kafka ブローカーを検索します。 key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
この変更により、Kafka に含まれているコンソール プロデューサーを使用してテストすることができます。 他のプロデューサーとコンシューマーには別のコンバーターが必要な場合があります。 他のコンバーター値の使用について、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。 value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
指定したものと同じです。 該当なし consumer.max.poll.records=10
ファイルの末尾に追加します。 この変更により、一度に 10 個のレコードに限定することによって、シンク コネクタでのタイムアウトを防ぎます。 詳細については、「https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md」を参照してください。 ファイルを保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。
コネクタで使用されるトピックを作成するには、次のコマンドを使用します。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
iotin
およびiotout
トピックが存在することを確認するには、次のコマンドを使用します。/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
iotin
トピックは、IoT ハブからメッセージを受信するために使用されます。iotout
トピックは、IoT ハブにメッセージを送信するために使用されます。
IoT ハブ接続情報の取得
コネクタによって使用される IoT ハブ情報を取得するには、次の手順を使用します。
IoT ハブのイベント ハブ互換エンドポイントとイベント ハブ互換エンドポイント名を取得します。 この情報を取得するには、次のいずれかの方法を使用します。
Azure Portal で、次の手順に従います。
IoT ハブに移動し、[エンドポイント] を選択します。
[組み込みのエンドポイント] から、[イベント] を選択します。
[プロパティ] から、次のフィールドの値をコピーします。
- イベント ハブ互換の名前
- Event Hub と互換性があるエンドポイント
- パーティション
重要
ポータルからのエンドポイント値には、この例では必要のない余分なテキストが含まれていることがあります。 このパターン
sb://<randomnamespace>.servicebus.windows.net/
に一致するテキストを抽出します。
Azure CLI で、次のコマンドを使用します。
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
myhubname
は、お使いの IoT ハブの名前に置き換えます。 応答は次のテキストのようになります。"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
共有アクセス ポリシー と キー を取得します。 たとえば、サービス キーを使用します。 この情報を取得するには、次のいずれかの方法を使用します。
Azure Portal で、次の手順に従います。
- [共有アクセス ポリシー] を選択してから、[サービス] を選択します。
- [主キー] の値をコピーします。
- [接続文字列 -- 主キー] の値をコピーします。
Azure CLI で、次のコマンドを使用します。
主キーの値を取得するには、次のコマンドを使用します。
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
myhubname
は、お使いの IoT ハブの名前に置き換えます。 応答は、このハブのservice
ポリシーの主キーです。service
ポリシーの接続文字列を取得するには、次のコマンドを使用します。az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
myhubname
は、お使いの IoT ハブの名前に置き換えます。 応答はservice
ポリシー用の接続文字列です。
ソース接続の構成
IoT ハブを使用するようにソースを構成するには、エッジ ノードへの SSH 接続から次のアクションを実行します。
/usr/hdp/current/kafka-broker/config/
ディレクトリにconnect-iot-source.properties
ファイルのコピーを作成します。 toketi-kafka-connect-iothub プロジェクトからファイルをダウンロードするには、次のコマンドを使用します。sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
connect-iot-source.properties
ファイルを編集し、IoT ハブ情報を追加するには、次のコマンドを使用します。sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
エディターで、次のエントリを検索し、変更します。
現在の値 編集 Kafka.Topic=PLACEHOLDER
PLACEHOLDER
をiotin
で置き換え IoT ハブから受信したメッセージはiotin
トピックに配置されます。IotHub.EventHubCompatibleName=PLACEHOLDER
PLACEHOLDER
をイベント ハブ互換の名前に置き換えます。IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
PLACEHOLDER
をイベント ハブ互換エンドポイントに置き換えます。IotHub.AccessKeyName=PLACEHOLDER
PLACEHOLDER
をservice
で置き換えIotHub.AccessKeyValue=PLACEHOLDER
PLACEHOLDER
をservice
ポリシーの主キーに置き換えます。IotHub.Partitions=PLACEHOLDER
PLACEHOLDER
を前の手順のパーティション数に置き換えます。IotHub.StartTime=PLACEHOLDER
PLACEHOLDER
を UTC 日付に置き換えます。 この日付は、コネクタがメッセージの検査を開始した日です。 日付の形式はyyyy-mm-ddThh:mm:ssZ
です。BatchSize=100
100
を5
で置き換え この変更により、コネクタは IoT ハブに 5 つの新しいメッセージが入った後にメッセージを Kafka に読み取ります。構成例については、「Kafka Connect Source Connector for Azure IoT Hub」を参照してください。
変更を保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。
コネクタ ソースの構成について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md をご覧ください。
シンク接続の構成
IoT ハブを使用するようにシンク接続を構成するには、エッジ ノードへの SSH 接続から次のアクションを実行します。
/usr/hdp/current/kafka-broker/config/
ディレクトリにconnect-iothub-sink.properties
ファイルのコピーを作成します。 toketi-kafka-connect-iothub プロジェクトからファイルをダウンロードするには、次のコマンドを使用します。sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
connect-iothub-sink.properties
ファイルを編集し、IoT ハブ情報を追加するには、次のコマンドを使用します。sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
エディターで、次のエントリを検索し、変更します。
現在の値 編集 topics=PLACEHOLDER
PLACEHOLDER
をiotout
で置き換えiotout
トピックに書き込まれたメッセージが IoT ハブに転送されます。IotHub.ConnectionString=PLACEHOLDER
PLACEHOLDER
をservice
ポリシーの接続文字列に置き換えます。構成例については、「Kafka Connect Source Connector for Azure IoT Hub」を参照してください。
変更を保存するには、Ctrl + X キー、Y キー、Enter キーの順に押します。
コネクタ シンクの構成について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。
ソース コネクタの開始
ソース コネクタを開始するには、エッジ ノードへの SSH 接続から次のコマンドを使用します。
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
コネクタが開始された後で、デバイスから IoT ハブにメッセージを送信します。 コネクタは、IoT ハブからのメッセージを読み取り、Kafka トピックに格納するときに、コンソールに情報を記録します。
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Note
コネクタが開始されると、複数の警告が表示されることがあります。 これらの警告は、IoT ハブからのメッセージの受信で問題が発生する原因にはなりません。
Ctrl + C を2回押して、数分後にコネクタを停止します。 コネクタが停止するまで数分かかります。
シンク コネクタの開始
エッジ ノードへの SSH 接続から、次のコマンドを使用して、シンク コネクタをスタンドアロン モードで開始します。
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
コネクタが実行されると、次のテキストのような情報が表示されます。
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Note
コネクタが開始されると、複数の警告が表示されることがあります。 これらは無視してかまいません。
メッセージを送信する
コネクタを通じてメッセージを送信するには、次の手順を使用します。
Kafka クラスターへの 2 番目 の SSH セッションを開きます。
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
新しい SSH セッションの Kafka ブローカーのアドレスを取得します。 PASSWORD をクラスターのログインパスワードに置き換え、次のコマンドを入力します。
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
iotout
トピックにメッセージを送信するには、次のコマンドを使用します。/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
このコマンドでは、通常の Bash プロンプトに戻りません。 代わりに、キーボード入力を
iotout
トピックに送信します。デバイスにメッセージを送信するには、
kafka-console-producer
の SSH セッションに JSON ドキュメントを貼り付けます。重要
"deviceId"
エントリの値をデバイスの ID に設定する必要があります。 次の例では、デバイスの名前はmyDeviceId
です。{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
この JSON ドキュメントのスキーマについては、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md で詳しく説明されています。
シミュレートされた Raspberry Pi デバイスを使用し、それが実行されている場合は、デバイスによって次のメッセージがログに記録されます。
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
シンク コネクタの使用について詳しくは、https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md をご覧ください。
次のステップ
このドキュメントでは、Apache Kafka Connect API を使用して HDInsight 上の IoT Kafka Connector を開始する方法について説明しました。 次のリンクを使用することで、Kafka のその他の活用方法を知ることができます。