연습 - Kafka 생산자 만들기

완료됨

이제 Kafka 및 Spark 클러스터가 배포되었으므로 Kafka 헤드 노드에 Kafka 생산자를 추가할 수 있습니다. 이 생산자는 인공 주식 가격을 생성하는 주가 시뮬레이터입니다.

샘플 다운로드

  1. 이전 모듈에서 아직 수행하지 않은 경우 인터넷 브라우저에서 https://github.com/Azure/hdinsight-mslearn로 이동하여 샘플을 로컬로 다운로드하거나 복제합니다.
  2. Spark 구조적 스트리밍\python-producer-simulator-template.py 파일을 로컬로 엽니다.

Kafka Broker URL 검색

그런 다음 헤드 노드에서 SSH를 사용하고 Python 파일에 URL을 추가하여 Kafka broker URL을 검색해야 합니다.

  1. Apache Kafka 클러스터의 기본 헤드 노드에 연결하려면 노드에 SSH를 수행해야 합니다. Azure Portal의 Azure Cloud Shell은 연결에 권장되는 방법입니다. Azure Portal의 맨 위 도구 모음에서 Azure Cloud Shell 단추를 클릭하고 Bash를 선택합니다. Git Bash와 같은 SSH 사용 명령 프롬프트를 사용할 수도 있습니다.

  2. 이전에 Azure Cloud Shell를 사용하지 않은 경우에는 탑재된 스토리지가 없다는 알림이 표시됩니다. 구독 상자에서 Azure 구독을 선택하고 스토리지 만들기를 클릭합니다.

  3. 클라우드 프롬프트에 다음 명령을 붙여넣습니다. sshuser를 SSH 사용자 이름으로 바꿉니다. kafka-mslearn-stock을 Apache Kafka 클러스터의 이름으로 바꾸고, -SSH를 클러스터 이름 뒤에 포함해야 합니다.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. 클러스터에 처음 연결할 때 호스트의 신뢰성을 설정할 수 없다는 경고가 SSH 클라이언트에 표시될 수도 있습니다. 메시지가 표시되면 예 를 입력한 다음, Enter 키를 눌러 SSH 클라이언트의 신뢰할 수 있는 서버 목록에 호스트를 추가합니다.

  5. 확인 메시지가 표시되면 SSH 사용자의 암호를 입력합니다.

    연결되면 다음 텍스트와 유사한 정보가 표시됩니다.

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. 간단한 명령줄 JSON 프로세서인 jq를 설치합니다. 이 유틸리티는 JSON 문서를 구문 분석하는 데 사용되며, 호스트 정보를 구문 분석할 때 유용합니다. 열린 SSH 연결에서 다음 명령을 실행하여 jq를 설치합니다.

    sudo apt -y install jq
    
  7. 암호 변수를 설정합니다. PASSWORD를 클러스터 로그인 암호로 바꾼 다음, 다음 명령을 입력합니다.

    export password='PASSWORD'
    
  8. 대/소문자가 올바르게 지정된 클러스터 이름을 추출합니다. 클러스터 생성 방법에 따라 클러스터 이름의 실제 대/소문자가 예상과 다를 수 있습니다. 이 명령은 실제 대/소문자를 가져온 다음, 변수에 저장합니다. 다음 명령을 입력합니다.

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    이 명령에는 응답이 없습니다.

  9. Zookeeper 호스트 정보를 사용하여 환경 변수를 설정하려면 아래 명령을 사용합니다. 이 명령은 모든 Zookeeper 호스트를 검색한 다음, 처음 두 개의 항목만 반환합니다. 하나의 호스트에 연결할 수 없는 경우 일부 중복이 필요하기 때문입니다.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    참고

    이 명령에는 Ambari 액세스 권한이 필요합니다. 클러스터가 NSG 뒤에 있는 경우 Ambari에 액세스할 수 있는 머신에서 이 명령을 실행합니다.

    이 명령 또한 응답이 없습니다.

  10. 환경 변수가 올바르게 설정되었는지 확인하려면 다음 명령을 사용합니다.

    echo $KAFKAZKHOSTS
    

    이 명령은 다음 텍스트와 유사한 정보를 반환합니다.

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Apache Kafka broker 호스트 정보를 사용하여 환경 변수를 설정하려면 다음 명령을 사용합니다.

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    참고

    이 명령에는 Ambari 액세스 권한이 필요합니다. 클러스터가 NSG 뒤에 있는 경우 Ambari에 액세스할 수 있는 머신에서 이 명령을 실행합니다.

    이 명령에는 출력이 없습니다.

  12. 환경 변수가 올바르게 설정되었는지 확인하려면 다음 명령을 사용합니다.

    echo $KAFKABROKERS
    

    이 명령은 다음 텍스트와 유사한 정보를 반환합니다.

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. 이전 단계에서 반환된 Kafka Broker 값 중 하나를 19번째 줄의 python-producer-simulator-template.py 파일에 복사하 고 값 주위에 작은따옴표를 포함합니다. 예를 들면 다음과 같습니다.

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Python-producer-simulator-template-simulator-template.py 파일을 저장합니다.

  15. SSH 연결 창으로 돌아가서 다음 명령을 사용하여 토픽을 만듭니다.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

이 명령은 $KAFKAZKHOSTS에 저장된 호스트 정보를 사용하여 Zookeeper에 연결합니다. 그런 다음 python-producer-simulator-template.py의 토픽 이름과 일치하도록 stockVals라는 Apache Kafka 토픽을 만듭니다.

Python 파일을 헤드 노드에 복사하고 파일을 실행하여 데이터를 스트리밍합니다.

  1. 새 git 창에서 python-producer-simulator-template.py 파일의 위치로 이동하고 다음 명령을 사용하여 로컬 컴퓨터의 파일을 기본 헤드 노드로 복사합니다. kafka-mslearn-stock을 Apache Kafka 클러스터의 이름으로 바꾸고, -SSH를 클러스터 이름 뒤에 포함해야 합니다.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    연결을 계속할지 묻는 메시지가 표시되면 예를 입력합니다. 메시지가 표시되면 클러스터 사용자 계정의 암호를 입력합니다. 파일이 전송된 후에는 다음과 같은 출력이 표시됩니다.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. 이제 broker 정보를 검색한 Azure 명령 프롬프트로 다시 전환하고 다음 명령을 실행하여 Kafka를 설치합니다.

    sudo pip install kafka-python
    

    Kafka가 성공적으로 설치되면 다음 출력이 표시됩니다.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. 동일한 창에서 다음 명령을 사용하여 요청을 설치합니다.

    sudo apt-get install python-requests
    
  4. “이 작업 후, 4327kB의 추가 디스크 공간이 사용됩니다. 계속할까요? [Y/n]”라는 메시지가 표시되면 y를 입력합니다.

    요청이 성공적으로 설치되면 다음과 유사한 출력이 표시됩니다.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. 동일한 창에서 다음 명령을 사용하여 Python 파일을 실행합니다.

    python python-producer-simulator-template.py
    

    다음과 비슷한 내용이 출력됩니다.

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

이 출력은 python-producer-simulated-template.py 파일에 나열된 주식의 시뮬레이션된 주가를 제공하고 이어서 토픽, 파티션, 토픽의 메시지 오프셋을 제공합니다. 생산자가 트리거될 때마다(초마다) 주가의 새로운 일괄 처리가 생성되고 각 새 메시지가 특정 오프셋의 파티션에 추가됩니다.