다음을 통해 공유


HDInsight에서 Apache Kafka®를 사용하고 AKS의 HDInsight에서 Apache Flink® 사용

중요하다

AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 이 공지 자세히 알아보세요.

워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.

중요하다

이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure Preview에 대한 추가 사용 약관은 베타, 미리 보기 또는 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 법적 조건을 포함할 있습니다. 이 특정 미리 보기에 대한 정보는 Azure HDInsight on AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안을 하시려면 AskHDInsight에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주십시오.

Apache Flink의 잘 알려진 사용 사례는 스트림 분석입니다. Apache Kafka를 사용하여 수집되는 데이터 스트림을 사용하기 위해 많은 사용자가 선택하는 인기 있는 선택입니다. Flink 및 Kafka의 일반적인 설치는 Flink 작업에서 사용할 수 있는 Kafka로 푸시되는 이벤트 스트림으로 시작합니다.

이 예제에서는 Flink 1.17.0을 실행하는 AKS 클러스터의 HDInsight를 사용하여 Kafka 토픽을 사용하고 생성하는 스트리밍 데이터를 처리합니다.

메모

FlinkKafkaConsumer는 더 이상 사용되지 않으며 Flink 1.17에서 제거됩니다. KafkaSource를 대신 사용하세요. FlinkKafkaProducer는 더 이상 사용되지 않으며 Flink 1.15에서 제거됩니다. 대신 KafkaSink를 사용하세요.

필수 구성 요소

  • Kafka와 Flink는 모두 동일한 VNet에 있어야 하거나 두 클러스터 간에 vnet 피어링이 있어야 합니다.

  • VNet만들기.

  • 동일한 VNetKafka 클러스터를 만듭니다. 현재 사용량에 따라 HDInsight에서 Kafka 3.2 또는 2.4를 선택할 수 있습니다.

    동일한 VNet에서 Kafka 클러스터를 만드는 방법을 보여 주는 스크린샷

  • 가상 네트워크 섹션에서 VNet 세부 정보를 추가합니다.

  • AKS 클러스터 풀 에 동일한 VNet을 사용하여 HDInsight를 만드세요.

  • 만들어진 클러스터 풀에 Flink 클러스터를 만듭니다.

Apache Kafka 커넥터

Flink는 정확히 한 번 보장을 제공하면서 Kafka 토픽에서 데이터를 읽고 쓸 수 있는 Apache Kafka Connector를 제공합니다.

maven 종속성

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Kafka 싱크 생성

Kafka 싱크는 KafkaSink의 인스턴스를 생성하는 작성기 클래스를 제공합니다. 동일한 방법을 사용하여 싱크를 구성하고 AKS의 HDInsight에서 실행되는 Flink 클러스터와 함께 사용합니다.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Java 프로그램 Event.java 작성

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Webssh에서 jar을 업로드하고 jar를 제출합니다.

Flink에서 실행되는 작업을 보여 주는 스크린샷

Flink 대시보드 UI에서

Kafka 토픽 패키지 jar를 작업으로 Flink에 제출하는 방법을 보여 주는 스크린샷

토픽 생성 - Kafka 클릭

Kafka 토픽을 생성하는 방법을 보여 주는 스크린샷

항목 사용 - Kafka의 이벤트

Kafka 토픽을 사용하는 방법을 보여 주는 스크린샷

참조

  • Apache Kafka 커넥터
  • Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.