Freigeben über


Verwenden von Apache Kafka® auf HDInsight gemeinsam mit Apache Flink® auf HDInsight in AKS

Wichtig

Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr über in dieser Ankündigung.

Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.

Wichtig

Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Azure HDInsight auf AKS-Vorschauinformationen. Für Fragen oder Funktionsvorschläge senden Sie bitte eine Anfrage mit den Details an AskHDInsight und folgen Sie uns für weitere Updates zur Azure HDInsight Community.

Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Die beliebte Wahl vieler Benutzer, die Datenströme zu verwenden, die mit Apache Kafka aufgenommen werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisströmen, die an Kafka gesendet werden und von Flink-Aufträgen genutzt werden können.

In diesem Beispiel wird HDInsight auf AKS-Clustern verwendet, die Flink 1.17.0 ausführen, um Streamingdaten zu verarbeiten, die das Kafka-Thema verarbeiten und produzieren.

Anmerkung

FlinkKafkaConsumer ist veraltet und wird mit Flink 1.17 entfernt, verwenden Sie stattdessen KafkaSource. FlinkKafkaProducer ist veraltet und wird mit Flink 1.15 entfernt, verwenden Sie stattdessen KafkaSink.

Voraussetzungen

  • Sowohl Kafka als auch Flink müssen sich im gleichen VNet befinden, oder es sollte vnet-peering zwischen den beiden Clustern vorhanden sein.

  • Erstellung von VNet.

  • Erstellen eines Kafka-Clusters im selben VNet. Sie können Kafka 3.2 oder 2.4 auf HDInsight basierend auf Ihrer aktuellen Nutzung auswählen.

    Screenshot, der die Erstellung eines Kafka-Clusters im selben VNet zeigt.

  • Fügen Sie die VNet-Details im Abschnitt "Virtuelles Netzwerk" hinzu.

  • Erstellen Sie ein HDInsight im AKS-Clusterpool mit demselben VNet.

  • Erstellen Sie einen Flink-Cluster zum erstellten Clusterpool.

Apache Kafka Connector

Flink bietet einen Apache Kafka Connector zum Lesen von Daten aus und Schreiben von Daten in Kafka-Topics mit Garantien für genau einmalige Lieferung.

Maven-Abhängigkeit

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Gebäude Kafka Sink

Kafka Sink stellt eine Generatorklasse bereit, um eine Instanz eines KafkaSink zu erstellen. Wir verwenden das gleiche Verfahren, um unser Sink zu konstruieren, und verwenden es zusammen mit dem auf HDInsight auf AKS laufenden Flink-Cluster.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Ein Java-Programm namens Event.java schreiben

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Laden Sie auf Webssh die Jar-Datei hoch und übermitteln Sie die Jar-Datei.

Screenshot eines laufenden Jobs auf Flink.

Auf der Benutzeroberfläche des Flink-Dashboards

Screenshot, der zeigt, wie das Kafka-Thema als verpackte JAR-Datei als Job an Flink übermittelt wird.

Produzieren Sie das Thema - Klick auf Kafka

Screenshot, der zeigt, wie Kafka-Thema erstellt wird.

Verarbeitung des Themas - Veranstaltungen zu Kafka

Screenshot, der zeigt, wie ein Kafka-Topic konsumiert wird.

Referenz