Använda Apache Kafka® i HDInsight med Apache Flink® i HDInsight på AKS
Viktig
Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Lär dig mer genom det här meddelandet.
Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.
Viktig
Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller ännu inte har släppts för allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. För frågor eller funktionsförslag, vänligen skicka en begäran på AskHDInsight- med detaljerna och följ oss för fler uppdateringar från Azure HDInsight Community.
Ett välkänt användningsfall för Apache Flink är stream analytics. Det populära valet av många användare att använda dataströmmarna, som matas in med Apache Kafka. Typiska installationer av Flink och Kafka börjar med att händelseströmmar skickas till Kafka, som sedan kan konsumeras av Flink-jobb.
I det här exemplet används HDInsight i AKS-kluster som kör Flink 1.17.0 för att bearbeta strömningsdata för att konsumera och producera Kafka-ämnen.
Observera
FlinkKafkaConsumer är inaktuell och tas bort med Flink 1.17. Använd KafkaSource i stället. FlinkKafkaProducer är inaktuell och tas bort med Flink 1.15, använd KafkaSink i stället.
Förutsättningar
Både Kafka och Flink måste finnas i samma virtuella nätverk eller så bör det finnas vnet-peering mellan de två klustren.
Skapa ett Kafka-kluster i samma VNet-. Du kan välja Kafka 3.2 eller 2.4 i HDInsight baserat på din aktuella användning.
Lägg till VNet-information i avsnittet virtuellt nätverk.
Skapa en HDInsight i AKS-klusterpool med samma VNet.
Skapa ett Flink-kluster till klusterpoolen som skapats.
Apache Kafka-anslutningsprogram
Flink tillhandahåller en Apache Kafka-connector för att läsa data från och skriva data till ämnen i Kafka med garantier för exakt en gång.
Maven-beroende
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Skapa Kafka-mottagare
Kafka-sink tillhandahåller en byggare-klass för att konstruera en instans av en KafkaSink. Vi använder samma metod för att konstruera vår datamottagare och använder den tillsammans med Flink-kluster som körs på HDInsight på AKS
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Skriva ett Java-program Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Paketera jar-filen och skicka jobbet till Flink
Ladda upp jar-filen på Webssh och skicka jar-filen
Flink-dashboardens användargränssnitt
Skapa ämnet – klickar på Kafka
Konsumera ämnet – händelser i Kafka
Hänvisning
- Apache Kafka Connector
- Apache, Apache Kafka, Kafka, Apache Flink, Flink och associerade projektnamn med öppen källkod är varumärken för Apache Software Foundation (ASF).