Conectar o Apache Flink® com o HDInsight no AKS aos Hubs de Eventos do Azure para Apache Kafka®
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre a Comunidade do Azure HDInsight .
Um caso de uso bem conhecido para o Apache Flink é a análise de fluxo. A escolha comum entre muitos usuários é usar os fluxos de dados que são ingeridos usando o Apache Kafka. As instalações típicas do Flink e do Kafka começam com fluxos de eventos sendo enviados para o Kafka, que podem ser consumidos por tarefas do Flink. Os Hubs de Eventos do Azure fornecem um ponto de extremidade do Apache Kafka em um hub de eventos, que permite que os usuários se conectem ao hub de eventos usando o protocolo Kafka.
Neste artigo, exploramos como conectar hubs de eventos do Azure com Apache Flink no HDInsight no AKS e cobrimos o seguinte.
- Criar um namespace dos Hubs de Eventos
- Criar um HDInsight no cluster do AKS com o Apache Flink
- Executar o produtor do Flink
- Pacote Jar para Apache Flink
- Validação da Submissão de Trabalho &
Criar namespace dos Hubs de Eventos e Hubs de Eventos
Para criar o namespace dos Hubs de Eventos e os Hubs de Eventos, consulte aqui
Configurar o cluster Flink no HDInsight no AKS
Usando o HDInsight existente no pool do cluster AKS, você pode criar um cluster Flink
Execute o produtor do Flink adicionando o bootstrap.servers e as informações
producer.config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Substitua
{YOUR.EVENTHUBS.CONNECTION.STRING}
pela cadeia de conexão do namespace dos Hubs de Eventos. Para obter instruções sobre como obter a string de conexão, consulte detalhes sobre como obter uma string de conexão dos Hubs de Eventos.Por exemplo
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Empacotando o JAR para a plataforma Flink
Pacote com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Adicione o snippet de código para executar o Produtor de Flink.
Depois que o código é executado, os eventos são armazenados no tópico "topic1"
Referência
- site do Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projeto de software livre associados são marcas comerciais da Apache Software Foundation (ASF).