Compartir vía


Uso de Apache Kafka® en HDInsight con Apache Flink® en HDInsight en AKS

Importante

Azure HDInsight en AKS se retiró el 31 de enero de 2025. Descubra más con este anuncio.

Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.

Importante

Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte la información de la versión preliminar de Azure HDInsight en AKS . Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y siganos para obtener más actualizaciones sobre comunidad de Azure HDInsight.

Un caso de uso conocido para Apache Flink es stream analytics. La opción popular de muchos usuarios para usar los flujos de datos, que se ingieren mediante Apache Kafka. Las instalaciones típicas de Flink y Kafka comienzan con flujos de eventos que se envían a Kafka, que pueden ser consumidos por los trabajos de Flink.

En este ejemplo se usa HDInsight en clústeres de AKS que ejecutan Flink 1.17.0 para procesar datos de streaming que consumen y generan un tema de Kafka.

Nota

FlinkKafkaConsumer está en desuso y se quitará con Flink 1.17, use KafkaSource en su lugar. FlinkKafkaProducer está en desuso y se quitará con Flink 1.15, use KafkaSink en su lugar.

Prerrequisitos

Conector de Apache Kafka

Flink proporciona un conector de Apache Kafka para leer datos de y escribir datos en tópicos de Kafka con garantías de exactamente una vez.

Dependencia de Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Construcción de Sumidero de Kafka

El sumidero de Kafka proporciona una clase de constructor para crear una instancia de KafkaSink. Usamos el mismo para construir nuestro sink y usarlo con el clúster de Flink que opera en HDInsight sobre AKS.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Escribir un programa de Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

En Webssh, cargue el archivo jar y envíe el archivo jar.

Captura de pantalla que muestra el trabajo que se ejecuta en Flink.

En la interfaz de usuario del panel de Flink

Captura de pantalla en la que se muestra cómo enviar el archivo jar empaquetado de Kafka como trabajo a Flink.

Generar el tema: clics en Kafka

Captura de pantalla que muestra cómo generar un tema de Kafka.

Consumir el tópico: eventos en Kafka

Captura de pantalla que muestra cómo consumir el tema de Kafka.

Referencia