Partager via


Utilisation d’Apache Kafka® sur HDInsight avec Apache Flink® sur HDInsight sur AKS

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les informations sur la préversion d'Azure HDInsight sur AKS . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Un cas d’usage bien connu pour Apache Flink est l’analytique de flux. Le choix populaire par de nombreux utilisateurs d’utiliser les flux de données, qui sont ingérés à l’aide d’Apache Kafka. Les installations classiques de Flink et Kafka commencent par l'envoi de flux d'événements à Kafka, qui peuvent être consommés par les tâches Flink.

Cet exemple utilise HDInsight sur des clusters AKS exécutant Flink 1.17.0 pour traiter des flux de données consommés et produits par un sujet Kafka.

Note

FlinkKafkaConsumer est déconseillé et sera supprimé avec Flink 1.17, utilisez KafkaSource à la place. FlinkKafkaProducer est déconseillé et sera supprimé avec Flink 1.15, utilisez KafkaSink à la place.

Conditions préalables

Connecteur Apache Kafka

Flink fournit un Connecteur Apache Kafka pour lire des données et écrire des données dans des rubriques Kafka avec exactement une seule garantie.

Dépendance Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Génération d’un récepteur Kafka

Le KafkaSink fournit une classe 'Builder' permettant de construire une instance de KafkaSink. Nous utilisons le même pour construire notre puits et l'utiliser avec le cluster Flink fonctionnant sur HDInsight sur AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Écriture d’un programme Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Sur Webssh, chargez le fichier jar et envoyez le fichier jar

Capture d’écran montrant le travail en cours d’exécution sur Flink.

Sur l’interface utilisateur du tableau de bord Flink

Capture d’écran montrant comment soumettre le fichier jar empaqueté de la rubrique Kafka en tant que tâche à Flink.

Produire la rubrique - clics sur Kafka

Capture d’écran montrant comment produire une rubrique Kafka.

Consommer le sujet - événements sur Kafka

Capture d’écran montrant comment utiliser la rubrique Kafka.

Référence