AKS의 HDInsight에서 Apache Flink®를 Azure Event Hubs for Apache Kafka®에 연결하기
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 과 함께를 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기 버전에 대한 추가 사용 약관에는 베타 상태, 미리 보기 상태, 또는 아직 일반 공급으로 릴리즈되지 않은 Azure 기능에 적용되는 법적 조건들이 더 포함되어 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보 을 참조하세요. 질문 또는 기능 제안에 대한 요청은 AskHDInsight에 제출하십시오. 더 많은 업데이트를 받으려면 Azure HDInsight Community를 팔로우하세요.
Apache Flink의 잘 알려진 사용 사례는 스트림 분석입니다. Apache Kafka를 사용하여 수집되는 데이터 스트림을 사용하기 위해 많은 사용자가 선택하는 인기 있는 선택입니다. Flink 및 Kafka의 일반적인 설치는 Flink 작업에서 사용할 수 있는 Kafka로 푸시되는 이벤트 스트림으로 시작합니다. Azure Event Hubs는 사용자가 Kafka 프로토콜을 사용하여 이벤트 허브에 연결할 수 있도록 하는 이벤트 허브에 Apache Kafka 엔드포인트를 제공합니다.
이 문서에서는 AKS HDInsight의 Apache Flink와 Azure Event Hubs 연결하는 방법을 살펴보고 다음을 다룹니다.
- Event Hubs 네임스페이스 만들기
- Apache Flink를 사용하여 AKS 클러스터에서 HDInsight 만들기
- Flink 생산자 실행
- Apache Flink용 패키지 Jar
- 작업 제출 & 유효성 검사
Event Hubs 네임스페이스와 Event Hubs를 생성하세요.
Event Hubs 네임스페이스와 Event Hubs를 만들려면 여기에서 참조하세요.
AKS의 HDInsight에서 Flink 클러스터 설정
AKS 클러스터 풀에서 기존 HDInsight를 사용하여 Flink 클러스터 만들 수 있습니다.
bootstrap.servers 및
producer.config
정보를 추가하여 Flink 생산자를 실행합니다.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
{YOUR.EVENTHUBS.CONNECTION.STRING}
Event Hubs 네임스페이스의 연결 문자열로 바꿉니다. 연결 문자열을 가져오는 방법에 대한 지침은 Event Hubs 연결 문자열 가져오는방법에 대한 세부 정보를 참조하세요.예를 들어
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Flink용 JAR 패키징
패키지 com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
코드 조각을 추가하여 Flink 생산자를 실행합니다.
코드가 실행되면 이벤트는 토픽 "topic1" 저장됩니다.
참조
- Apache Flink 웹 사이트
- Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.