Utilisation d’Apache Kafka® sur HDInsight avec Apache Flink® sur HDInsight sur AKS
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base est disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.
Un cas d’usage bien connu d’Apache Flink est l’analyse de flux. Le choix populaire de nombreux utilisateurs d’utiliser les flux de données ingérés à l’aide d’Apache Kafka. Les installations typiques de Flink et Kafka commencent par le transfert de flux d’événements vers Kafka, qui peuvent être consommés par les tâches Flink.
Cet exemple utilise HDInsight sur des clusters AKS exécutant Flink 1.17.0 pour traiter les données de diffusion en continu consommant et produisant un sujet Kafka.
Remarque
FlinkKafkaConsumer est obsolète et sera supprimé avec Flink 1.17, veuillez utiliser KafkaSource à la place. FlinkKafkaProducer est obsolète et sera supprimé avec Flink 1.15, veuillez utiliser KafkaSink à la place.
Prérequis
Kafka et Flink doivent tous deux se trouver dans le même réseau virtuel, sinon il doit y avoir un peering vnet entre les deux clusters.
Créer un cluster Kafka dans le même réseau virtuel. Vous pouvez choisir Kafka 3.2 ou 2.4 sur HDInsight en fonction de votre utilisation actuelle.
Ajoutez les détails du réseau virtuel dans la section réseau virtuel.
Créez un pool de cluster HDInsight sur AKS avec le même réseau virtuel.
Créez un cluster Flink dans le pool de clusters créé.
Connecteur Apache Kafka
Flink fournit un connecteur Apache Kafka pour lire et écrire des données dans des sujets Kafka avec des garanties uniques.
Dépendance Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Construire un évier Kafka
Le récepteur Kafka fournit une classe de générateur pour construire une instance de KafkaSink. Nous utilisons la même chose pour construire notre Sink et l’utilisons avec le cluster Flink exécutant HDInsight sur AKS
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Écrire un programme Java Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Emballez le pot et soumettez le travail à Flink
Sur Webssh, chargez le fichier jar et soumettez-le
Sur l’interface utilisateur du tableau de bord Flink
Produire le sujet – clics sur Kafka
Consommer le sujet – événements sur Kafka
Référence
- Connecteur Apache Kafka
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).