Compartir vía


Conexión de Apache Flink® en HDInsight en AKS con Azure Event Hubs para Apache Kafka®

Nota:

Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.

Solo el soporte técnico básico estará disponible hasta la fecha de retirada.

Importante

Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envíe una solicitud en AskHDInsight con los detalles y síganos en la comunidad de Azure HDInsight para obtener más actualizaciones.

Un caso de uso conocido para Apache Flink es stream analytics. La opción popular de muchos usuarios para usar los flujos de datos, que se ingieren mediante Apache Kafka. Las instalaciones típicas de Flink y Kafka comienzan con flujos de eventos que se insertan en Kafka, que los trabajos de Flink pueden consumir. Azure Event Hubs proporciona un punto de conexión de Apache Kafka en un centro de eventos, que permite a los usuarios conectarse al centro de eventos mediante el protocolo Kafka.

En este artículo, exploraremos cómo conectarse Azure Event Hubs con HDInsight en AKS Flink y trataremos lo siguiente

  • Creación de un espacio de nombres de Event Hubs
  • Creación de un clúster de HDInsight en AKS con Apache Flink
  • Ejecutar el productor Flink
  • Archivo Jar de paquete para Apache Flink
  • Envío y validación de trabajos

Creación de un espacio de nombres de Event Hubs y Event Hubs

  1. Para crear un espacio de nombres de Event Hubs y Event Hubs, consulte aquí

    Captura de pantalla que muestra la configuración de Event Hubs.

  1. Con HDInsight existente en el grupo de clústeres de AKS, puede crear un clúster de Flink

  2. Ejecute el productor de Flink agregando el bootstrap.servers y la información de producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Reemplace {YOUR.EVENTHUBS.CONNECTION.STRING} por la cadena de conexión para el espacio de nombres de Event Hubs. Para obtener instrucciones sobre cómo obtener la cadena de conexión, consulte detalles sobre cómo obtener una cadena de conexión de Event Hubs.

    Por ejemplo,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Paquete com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Agregue el fragmento de código para ejecutar el productor de Flink.

    Captura de pantalla que muestra cómo probar Flink en Event Hubs.

  3. Una vez ejecutado el código, los eventos se almacenan en el tema "topic1"

    Captura de pantalla que muestra Event Hubs almacenado en el tema.

Referencia