다음을 통해 공유


AKS의 HDInsight에서 Apache Flink®를 Azure Event Hubs for Apache Kafka®에 연결하기

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 과 함께를 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기 버전에 대한 추가 사용 약관에는 베타 상태, 미리 보기 상태, 또는 아직 일반 공급으로 릴리즈되지 않은 Azure 기능에 적용되는 법적 조건들이 더 포함되어 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문 또는 기능 제안에 대한 요청은 AskHDInsight에 제출하십시오. 더 많은 업데이트를 받으려면 Azure HDInsight Community를 팔로우하세요.

Apache Flink의 잘 알려진 사용 사례는 스트림 분석입니다. Apache Kafka를 사용하여 수집되는 데이터 스트림을 사용하기 위해 많은 사용자가 선택하는 인기 있는 선택입니다. Flink 및 Kafka의 일반적인 설치는 Flink 작업에서 사용할 수 있는 Kafka로 푸시되는 이벤트 스트림으로 시작합니다. Azure Event Hubs는 사용자가 Kafka 프로토콜을 사용하여 이벤트 허브에 연결할 수 있도록 하는 이벤트 허브에 Apache Kafka 엔드포인트를 제공합니다.

이 문서에서는 AKS HDInsight의 Apache Flink와 Azure Event Hubs 연결하는 방법을 살펴보고 다음을 다룹니다.

  • Event Hubs 네임스페이스 만들기
  • Apache Flink를 사용하여 AKS 클러스터에서 HDInsight 만들기
  • Flink 생산자 실행
  • Apache Flink용 패키지 Jar
  • 작업 제출 & 유효성 검사

Event Hubs 네임스페이스와 Event Hubs를 생성하세요.

  1. Event Hubs 네임스페이스와 Event Hubs를 만들려면 여기에서 참조하세요.

    Event Hubs 설정을 보여 주는 스크린샷

  1. AKS 클러스터 풀에서 기존 HDInsight를 사용하여 Flink 클러스터 만들 수 있습니다.

  2. bootstrap.serversproducer.config 정보를 추가하여 Flink 생산자를 실행합니다.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. {YOUR.EVENTHUBS.CONNECTION.STRING} Event Hubs 네임스페이스의 연결 문자열로 바꿉니다. 연결 문자열을 가져오는 방법에 대한 지침은 Event Hubs 연결 문자열 가져오는방법에 대한 세부 정보를 참조하세요.

    예를 들어

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. 패키지 com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. 코드 조각을 추가하여 Flink 생산자를 실행합니다.

    Event Hubs에서 Flink를 테스트하는 방법을 보여 주는 스크린샷

  3. 코드가 실행되면 이벤트는 토픽 "topic1" 저장됩니다.

    스크린샷 : 토픽에 저장된 Event Hubs를 보여줍니다.

참조