Azure IoT Hub를 통해 HDInsight에서 Apache Kafka 사용
HDInsight의 Apache Kafka 및 Azure IoT Hub 간에 데이터를 이동하려면 Apache Kafka 연결 Azure IoT Hub 커넥터를 사용하는 방법에 대해 알아봅니다. 이 문서에서는 클러스터의 에지 노드에서 IoT Hub 커넥터를 실행하는 방법을 설명합니다.
Kafka Connect API를 사용하면 Kafka로 데이터를 지속적으로 끌어오거나 Kafka에서 다른 시스템으로 데이터를 푸시하는 커넥터를 구현할 수 있습니다. Apache Kafka 연결 Azure IoT Hub는 Azure IoT Hub에서 데이터를 Kafka로 끌어오는 커넥터입니다. Kafka에서 데이터를 IoT Hub로 밀어넣기할 수도 있습니다.
IoT Hub에서 끌어오는 경우 원본 커넥터를 사용합니다. IoT Hub로 밀어넣기하는 경우 싱크 커넥터를 사용합니다. IoT Hub 커넥터는 원본 및 싱크 커넥터 모두를 제공합니다.
다음 다이어그램에서는 커넥터를 사용하는 경우 HDInsight의 Kafka 및 Azure IoT Hub 간의 데이터 흐름을 표시합니다.
API를 연결하는 방법에 대한 자세한 내용은 다음을 참조하세요 https://kafka.apache.org/documentation/#connect.
필수 조건
HDInsight의 Apache Kafka 클러스터. 자세한 내용은 HDInsight의 Kafka 빠른 시작 문서를 참조하세요.
Kafka 클러스터의 에지 노드입니다. 자세한 내용은 HDInsight 문서에서 에지 노드 사용을 참조하세요.
SSH 클라이언트 자세한 내용은 SSH를 사용하여 HDInsight(Apache Hadoop)에 연결을 참조하세요.
Azure IoT Hub 및 디바이스. 이 문서의 경우 Azure IoT Hub에 Raspberry Pi 온라인 시뮬레이터를 연결하는 것이 좋습니다.
커넥터 구축
https://github.com/Azure/toketi-kafka-connect-iothub/에서 커넥터에 대한 원본을 로컬 환경에 다운로드합니다.
명령 프롬프트에서
toketi-kafka-connect-iothub-master
디렉터리로 이동합니다. 그런 다음, 다음 명령을 사용하여 프로젝트를 빌드하고 패키징합니다.sbt assembly
빌드를 완료하는 데 몇 분이 소요됩니다. 이 명령은 프로젝트에 대한
toketi-kafka-connect-iothub-master\target\scala-2.11
디렉터리에서kafka-connect-iothub-assembly_2.11-0.7.0.jar
이라는 파일을 만듭니다.
커넥터 설치
HDInsight 클러스터에서 Kafka의 에지 노드에 .jar 파일을 업로드합니다.
CLUSTERNAME
을 실제 클러스터의 이름으로 바꾸어 다음 명령을 편집합니다. SSH 사용자 계정 및 에지 노드의 이름에 대한 기본값은 필요에 따라 수정하는 데 사용됩니다.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
파일 복사가 완료되면 SSH를 사용하여 에지 노드에 연결합니다.
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Kafka
libs
디렉터리에 커넥터를 설치하려면 다음 명령을 사용합니다.sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
나머지 단계에서 SSH 연결을 활성 상태로 유지합니다.
Apache Kafka 구성
SSH 연결에서 에지 노드까지 다음 단계를 사용하여 독립 실행형 모드에서 실행되도록 Kafka를 구성합니다.
암호 변수를 설정합니다. PASSWORD를 클러스터 로그인 암호로 바꾼 다음, 다음 명령을 입력합니다.
export password='PASSWORD'
jq 유틸리티를 설치합니다. jq는 Ambari 쿼리에서 반환된 JSON 문서 처리를 더 쉽게 해줍니다. 다음 명령을 입력합니다.
sudo apt -y install jq
Kafka broker의 주소를 가져옵니다. 클러스터에 많은 broker가 있을 수 있지만 하나 또는 두 개의 참조만 필요합니다. 두 broker 호스트의 주소를 가져오려면 다음 명령을 사용합니다.
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
나중에 사용하도록 이 값을 복사합니다. 반환되는 값은 다음 텍스트와 유사합니다.
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Apache Zookeeper 노드의 주소를 가져옵니다. 클러스터에 여러 Zookeeper 노드가 있지만 하나 또는 두 개의 참조만 필요합니다. 다음 명령을 사용하여
KAFKAZKHOSTS
변수에 주소를 저장합니다.export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
독립 실행형 모드
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
에서 커넥터를 실행하면 파일이 Kafka broker와 통신하는 데 사용됩니다.connect-standalone.properties
파일을 편집하려면 다음 명령을 사용합니다.sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
다음과 같이 편집합니다.
현재 값 새로운 값 Comment(설명) bootstrap.servers=localhost:9092
localhost:9092
값을 이전 단계의 broker 호스트로 바꿉니다.Kafka broker를 찾기 위해 에지 노드에 대한 독립 실행형 구성을 구성합니다. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
이 변경을 통해 Kafka에 포함된 콘솔 생산자를 사용하여 테스트할 수 있습니다. 다른 생산자와 소비자에 대한 다른 변환기가 필요할 수 있습니다. 다른 변환기 값의 사용에 대한 내용은 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md의 내용을 참조하세요. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
지정된 것과 동일합니다. 해당 없음 consumer.max.poll.records=10
파일의 끝에 추가합니다. 이 변경은 한 번에 10개의 레코드로 제한하여 싱크 커넥터에서 시간 제한을 방지하기 위함입니다. 자세한 내용은 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md를 참조하세요. 파일을 저장하려면 Ctrl + X, Y 및 Enter 키를 사용합니다.
커넥터에서 사용하는 항목을 만들려면 다음 명령을 사용합니다.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
iotin
및iotout
항목이 있는지 확인하려면 다음 명령을 사용합니다./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
iotin
항목은 IoT Hub에서 메시지를 수신하는 데 사용됩니다.iotout
항목은 IoT Hub로 메시지를 보내는 데 사용됩니다.
IoT Hub 연결 정보 가져오기
커넥터에서 사용하는 IoT Hub 정보를 검색하려면 다음 단계를 사용합니다.
Event Hub 호환 엔드포인트 및 IoT Hub에 대한 Event Hub 호환 엔드포인트 이름을 가져옵니다. 이 정보를 가져오려면 다음 방법 중 하나를 사용합니다.
Azure Portal에서 다음 단계를 사용하세요.
IoT Hub로 이동하고 엔드포인트를 선택합니다.
기본 제공 엔드포인트에서 이벤트를 선택합니다.
속성에서 다음 필드의 값을 복사합니다.
- Event Hub 호환 이름
- Event Hub 호환 엔드포인트
- 파티션
Important
포털에서 엔드포인트 값이 이 예제에서 필요하지 않은 추가 텍스트를 포함할 수 있습니다. 이 패턴
sb://<randomnamespace>.servicebus.windows.net/
과 일치하는 텍스트를 추출합니다.
Azure CLI에서 다음 명령을 입력합니다.
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
myhubname
을 IoT Hub 이름으로 바꿉니다. 응답은 다음 텍스트와 비슷합니다."EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
공유 액세스 정책 및 키를 가져옵니다. 이 예에서는 서비스 키를 사용합니다. 이 정보를 가져오려면 다음 방법 중 하나를 사용합니다.
Azure Portal에서 다음 단계를 사용하세요.
- 공유 액세스 정책을 선택한 다음, 서비스를 선택합니다.
- 기본 키 값을 복사합니다.
- 연결 문자열 - 기본 키 값을 복사합니다.
Azure CLI에서 다음 명령을 입력합니다.
기본 키 값을 가져오려면 다음 명령을 사용합니다.
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
myhubname
을 IoT Hub 이름으로 바꿉니다. 응답은 이 허브에 대한service
정책의 기본 키입니다.service
정책에 대한 연결 문자열을 가져오려면 다음 명령을 사용합니다.az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
myhubname
을 IoT Hub 이름으로 바꿉니다. 응답은service
정책에 대한 연결 문자열입니다.
원본 연결 구성
IoT Hub와 작동하도록 원본을 구성하려면 SSH 연결에서 에지 노드까지 다음 작업을 수행합니다.
/usr/hdp/current/kafka-broker/config/
디렉터리에서connect-iot-source.properties
파일의 복사본을 만듭니다. toketi-kafka-connect-iothub 프로젝트에서 파일을 다운로드하려면 다음 명령을 사용합니다.sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
connect-iot-source.properties
파일을 편집하고 IoT Hub 정보를 추가하려면 다음 명령을 사용합니다.sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
편집기에서 다음 항목을 찾아 변경합니다.
현재 값 편집 Kafka.Topic=PLACEHOLDER
PLACEHOLDER
를iotin
로 교체합니다. IoT Hub에서 받은 메시지는iotin
항목에 배치됩니다.IotHub.EventHubCompatibleName=PLACEHOLDER
PLACEHOLDER
를 Event Hub 호환 이름으로 바꿉니다.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
PLACEHOLDER
를 Event Hub 호환 엔드포인트로 바꿉니다.IotHub.AccessKeyName=PLACEHOLDER
PLACEHOLDER
를service
로 교체합니다.IotHub.AccessKeyValue=PLACEHOLDER
PLACEHOLDER
를service
정책의 기본 키로 바꿉니다.IotHub.Partitions=PLACEHOLDER
PLACEHOLDER
를 이전 단계의 파티션 수로 바꿉니다.IotHub.StartTime=PLACEHOLDER
PLACEHOLDER
를 UTC 날짜로 바꿉니다. 이 날짜는 커넥터가 메시지에 대한 검사를 시작할 때입니다. 날짜 형식은yyyy-mm-ddThh:mm:ssZ
입니다.BatchSize=100
100
를5
로 교체합니다. 이 변경으로 인해 IoT Hub에 5개의 새 메시지가 있는 경우 커넥터가 메시지를 Kafka로 읽어 들일 수 있습니다.구성 예제는 Azure IoT Hub용 Kafka 연결 원본 커넥터를 참조하세요.
변경을 저장하려면 Ctrl + X, Y 및 Enter 키를 사용합니다.
커넥터 원본 구성에 대한 자세한 내용은 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md의 내용을 참조하세요.
싱크 연결 구성
IoT Hub와 작동하도록 싱크 연결을 구성하려면 SSH 연결에서 에지 노드까지 다음 작업을 수행합니다.
/usr/hdp/current/kafka-broker/config/
디렉터리에서connect-iothub-sink.properties
파일의 복사본을 만듭니다. toketi-kafka-connect-iothub 프로젝트에서 파일을 다운로드하려면 다음 명령을 사용합니다.sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
connect-iothub-sink.properties
파일을 편집하고 IoT Hub 정보를 추가하려면 다음 명령을 사용합니다.sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
편집기에서 다음 항목을 찾아 변경합니다.
현재 값 편집 topics=PLACEHOLDER
PLACEHOLDER
를iotout
로 교체합니다.iotout
항목에 기록된 메시지는 IoT Hub로 전달됩니다.IotHub.ConnectionString=PLACEHOLDER
PLACEHOLDER
를service
정책에 대한 연결 문자열로 바꿉니다.구성 예제는 Azure IoT Hub용 Kafka 연결 싱크 커넥터를 참조하세요.
변경을 저장하려면 Ctrl + X, Y 및 Enter 키를 사용합니다.
커넥터 싱크 구성에 대한 자세한 내용은 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md의 내용을 참조하세요.
원본 커넥터 시작
원본 커넥터를 시작하려면 SSH 연결에서 에지 노드까지 다음 명령을 사용합니다.
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
커넥터가 시작하면 해당 디바이스에서 IoT Hub로 메시지를 보냅니다. 커넥터는 IoT Hub에서 메시지를 읽고 Kafka 토픽에 저장하므로 콘솔에 정보를 기록합니다.
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
참고 항목
커넥터가 시작할 때 여러 경고가 표시될 수 있습니다. 이러한 경고는 IoT Hub에서 메시지를 수신하는 문제를 일으키지 않습니다.
몇 분 후에 Ctrl + C를 두 번 사용하여 커넥터를 중지합니다. 커넥터를 중지하는 데 몇 분 정도 걸릴 수 있습니다.
싱크 커넥터 시작
SSH 연결에서 에지 노드까지 다음 명령을 사용하여 독립 실행형 모드에서 싱크 커넥터를 시작합니다.
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
커넥터가 실행될 때 다음 텍스트와 유사한 정보가 표시됩니다.
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
참고 항목
커넥터가 시작할 때 여러 경고를 확인할 수 있습니다. 무시해도 됩니다.
메시지 보내기
커넥터를 통해 메시지를 보내려면 다음 단계를 사용합니다.
Kafka 클러스터에 ‘두 번째’ SSH 세션을 엽니다.
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
새 ssh 세션의 Kafka broker 주소를 가져옵니다. PASSWORD를 클러스터 로그인 암호로 바꾼 다음, 다음 명령을 입력합니다.
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
iotout
항목에 메시지를 보내려면 다음 명령을 사용합니다./usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
이 명령은 기본 Bash 프롬프트로 반환하지 않습니다. 대신
iotout
항목에 키보드 입력을 보냅니다.디바이스에 메시지를 보내려면
kafka-console-producer
에 대한 SSH 세션에 JSON 문서를 붙여넣습니다.Important
디바이스 ID에
"deviceId"
항목 값을 설정해야 합니다. 다음 예제에서 디바이스는myDeviceId
이라고 합니다.{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
이 JSON 문서에 대한 스키마는 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md에 자세히 설명되어 있습니다.
시뮬레이션된 Raspberry Pi 디바이스를 사용하고 실행 중인 경우 디바이스는 다음 메시지를 기록합니다.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
싱크 커넥터 사용에 대한 자세한 내용은 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md의 내용을 참조하세요.
다음 단계
이 문서에서는 HDInsight에서 IoT Kafka 커넥터를 시작하기 위해 Apache Kafka 연결 API를 사용하는 방법을 알아보았습니다. Kafka를 사용하는 다른 방법을 찾으려면 다음 링크를 사용하세요.