Udostępnij za pośrednictwem


Korzystanie z platformy Apache Kafka® w usłudze HDInsight z platformą Apache Flink® w usłudze HDInsight w usłudze AKS

Ważny

Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej dzięki temu ogłoszeniu.

Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.

Ważny

Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla wczesnych wersji platformy Microsoft Azure zawierają więcej warunków prawnych, które dotyczą funkcji platformy Azure będących w fazie beta, w wersjach zapoznawczych lub w inny sposób jeszcze niedostępnych ogólnie. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie dotyczące AskHDInsight, aby uzyskać więcej informacji na temat społeczności usługi Azure HDInsight.

Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór wielu użytkowników to korzystanie ze strumieni danych pozyskiwanych przy użyciu Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wysyłania strumieni zdarzeń do Kafka, które mogą być konsumowane przez zadania Flink.

W tym przykładzie użyto usługi HDInsight na klastrach AKS z Flink 1.17.0 do przetwarzania danych strumieniowych, konsumując i produkując tematy w platformie Kafka.

Notatka

FlinkKafkaConsumer jest przestarzały i zostanie usunięty z Flink 1.17, zamiast tego użyj platformy KafkaSource. FlinkKafkaProducer jest przestarzały i zostanie usunięty z Flink 1.15, zamiast tego użyj platformy KafkaSink.

Warunki wstępne

Łącznik Apache Kafka

Flink udostępnia konektor Apache Kafka do odczytywania danych z tematów Kafka i zapisywania ich z gwarancją dokładnie raz.

zależności narzędzia Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Tworzenie odbiornika Kafka

Kafka Sink udostępnia klasę budowniczego do konstruowania instancji KafkaSink. Używamy tego samego narzędzia do konstrukcji Sinka oraz do współpracy z klastrem Flink, działającym na platformie HDInsight w środowisku AKS.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Tworzenie programu Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

W witrynie Webssh przekaż plik jar i prześlij plik jar

Zrzut ekranu przedstawiający zadanie uruchomione na Flinku.

W interfejsie użytkownika pulpitu nawigacyjnego Flink

Zrzut ekranu przedstawiający sposób przesyłania pliku JAR, zawierającego temat Kafka, jako zadanie do Flink.

Tworzenie tematu — klika pozycję Kafka

Zrzut ekranu przedstawiający sposób tworzenia topiku w Kafce.

Korzystanie z tematu — zdarzenia na platformie Kafka

Zrzut ekranu przedstawiający sposób konsumowania tematu Kafka.

Odniesienie