Delen via


Apache Kafka® gebruiken in HDInsight met Apache Flink® in HDInsight op AKS

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Kom meer te weten door deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Voor meer informatie over deze specifieke preview, zie Azure HDInsight op AKS preview-informatie. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

Een bekende use case voor Apache Flink is stream analytics. De populaire keuze van veel gebruikers om van de gegevensstromen gebruik te maken die worden verwerkt met behulp van Apache Kafka. Typische installaties van Flink en Kafka beginnen met gebeurtenisstromen die naar Kafka worden gepusht, die kunnen worden verbruikt door Flink-taken.

In dit voorbeeld wordt HDInsight gebruikt op AKS-clusters met Flink 1.17.0 voor het verwerken van streaminggegevens door Kafka-topics te consumeren en produceren.

Notitie

FlinkKafkaConsumer is afgeschaft en wordt verwijderd met Flink 1.17, gebruik in plaats daarvan KafkaSource. FlinkKafkaProducer is afgeschaft en wordt verwijderd met Flink 1.15, gebruik in plaats daarvan KafkaSink.

Voorwaarden

Apache Kafka-connector

Flink biedt een Apache Kafka Connector voor het lezen van gegevens uit en het schrijven van gegevens naar Kafka-onderwerpen met exactly-once garanties.

Maven-afhankelijkheid

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Kafka-sink bouwen

Kafka-sink biedt een opbouwklasse voor het maken van een exemplaar van een KafkaSink. We gebruiken hetzelfde om onze Sink te bouwen en gebruiken het samen met het Flink-cluster dat op HDInsight op AKS draait.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Een Java-programma schrijven Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Upload op Webssh het jar-bestand en verzend het jar-bestand

Schermopname van de taak die wordt uitgevoerd op Flink.

In de Flink Dashboard-interface

Schermopname die toont hoe je de gecomprimeerde JAR voor Kafka als taak verzendt naar Flink.

Het onderwerp produceren - klikken op Kafka

schermopname waarin wordt getoond hoe u een Kafka-topic produceert.

Het onderwerp gebruiken - gebeurtenissen in Kafka

Screenshot waarin wordt getoond hoe je een Kafka-topic gebruikt.

Referentie