Dela via


Använda Apache Kafka® i HDInsight med Apache Flink® i HDInsight på AKS

Viktig

Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Lär dig mer genom det här meddelandet.

Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.

Viktig

Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller ännu inte har släppts för allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. För frågor eller funktionsförslag, vänligen skicka en begäran på AskHDInsight- med detaljerna och följ oss för fler uppdateringar från Azure HDInsight Community.

Ett välkänt användningsfall för Apache Flink är stream analytics. Det populära valet av många användare att använda dataströmmarna, som matas in med Apache Kafka. Typiska installationer av Flink och Kafka börjar med att händelseströmmar skickas till Kafka, som sedan kan konsumeras av Flink-jobb.

I det här exemplet används HDInsight i AKS-kluster som kör Flink 1.17.0 för att bearbeta strömningsdata för att konsumera och producera Kafka-ämnen.

Observera

FlinkKafkaConsumer är inaktuell och tas bort med Flink 1.17. Använd KafkaSource i stället. FlinkKafkaProducer är inaktuell och tas bort med Flink 1.15, använd KafkaSink i stället.

Förutsättningar

  • Både Kafka och Flink måste finnas i samma virtuella nätverk eller så bör det finnas vnet-peering mellan de två klustren.

  • Skapa VNet-.

  • Skapa ett Kafka-kluster i samma VNet-. Du kan välja Kafka 3.2 eller 2.4 i HDInsight baserat på din aktuella användning.

    Skärmbild som visar hur du skapar ett Kafka-kluster i samma virtuella nätverk.

  • Lägg till VNet-information i avsnittet virtuellt nätverk.

  • Skapa en HDInsight i AKS-klusterpool med samma VNet.

  • Skapa ett Flink-kluster till klusterpoolen som skapats.

Apache Kafka-anslutningsprogram

Flink tillhandahåller en Apache Kafka-connector för att läsa data från och skriva data till ämnen i Kafka med garantier för exakt en gång.

Maven-beroende

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Skapa Kafka-mottagare

Kafka-sink tillhandahåller en byggare-klass för att konstruera en instans av en KafkaSink. Vi använder samma metod för att konstruera vår datamottagare och använder den tillsammans med Flink-kluster som körs på HDInsight på AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Skriva ett Java-program Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Ladda upp jar-filen på Webssh och skicka jar-filen

Skärmbild som visar jobb som körs på Flink.

Flink-dashboardens användargränssnitt

Skärmbild som visar hur du skickar ett paketerat Kafka-ämne som en jar-fil som ett jobb till Flink.

Skapa ämnet – klickar på Kafka

Skärmbild som visar hur du skapar Kafka-ämnet.

Konsumera ämnet – händelser i Kafka

Skärmbild som visar hur du konsumerar ett Kafka-ämne.

Hänvisning