Partilhar via


Conectar o Apache Flink® no HDInsight em AKS com os Hubs de Eventos do Azure para o Apache Kafka®

Importante

O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.

Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.

Importante

Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, pré-visualização ou ainda não disponibilizadas para o público em geral. Para obter informações sobre esta visualização específica, consulte as informações de pré-visualização do Azure HDInsight no AKS . Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para mais atualizações na Comunidade Azure HDInsight .

Um caso de uso bem conhecido para o Apache Flink é a análise de fluxo. A escolha popular de muitos utilizadores é usar os fluxos de dados ingeridos com o Apache Kafka. As instalações típicas de Flink e Kafka começam com fluxos de eventos sendo enviados para o Kafka, que podem ser consumidos por trabalhos do Flink. Os Hubs de Eventos do Azure fornecem um ponto de extremidade Apache Kafka em um hub de eventos, que permite que os usuários se conectem ao hub de eventos usando o protocolo Kafka.

Neste artigo, exploramos como conectar de Hubs de Eventos do Azure com Apache Flink no HDInsight no AKS e abordamos o seguinte

  • Criar um namespace de Hubs de Eventos
  • Criar um HDInsight no cluster AKS com o Apache Flink
  • Executar o produtor Flink
  • Pacote Jar para Apache Flink
  • Submissão de Trabalhos & Validação

Criar namespace de Hubs de Eventos e Hubs de Eventos

  1. Para criar o namespace dos Hubs de Eventos e os próprios Hubs de Eventos, consulte aqui

    Captura de ecrã a mostrar a configuração dos Hubs de Eventos.

  1. Ao utilizar o HDInsight existente no pool de clusters AKS, é possível criar um cluster Flink

  2. Execute o produtor Flink adicionando o bootstrap.servers e as informações de producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Substitua {YOUR.EVENTHUBS.CONNECTION.STRING} pela cadeia de conexão do namespace Hubs de Eventos. Para obter instruções sobre como obter a cadeia de ligação, consulte os detalhes sobre como obter uma cadeia de ligação dos Hubs de Eventos.

    Por exemplo

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Pacote com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Adicione o trecho para executar o Flink Producer.

    Captura de tela mostrando como testar o Flink em Hubs de Eventos.

  3. Uma vez que o código é executado, os eventos são armazenados no tópico "topic1"

    Captura de ecrã mostrando Hubs de Eventos armazenados no tópico.

Referência