Conectar o Apache Flink® no HDInsight no AKS com os Hubs de Eventos do Azure para Apache Kafka®
Nota
Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.
Apenas o apoio básico estará disponível até à data da reforma.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.
Um caso de uso bem conhecido para o Apache Flink é a análise de fluxo. A escolha popular por muitos usuários para usar os fluxos de dados, que são ingeridos usando Apache Kafka. As instalações típicas de Flink e Kafka começam com fluxos de eventos sendo empurrados para Kafka, que podem ser consumidos por trabalhos Flink. Os Hubs de Eventos do Azure fornecem um ponto de extremidade Apache Kafka em um hub de eventos, que permite que os usuários se conectem ao hub de eventos usando o protocolo Kafka.
Neste artigo, exploramos como conectar os Hubs de Eventos do Azure ao Apache Flink no HDInsight no AKS e abordamos o seguinte
- Criar um espaço de nomes dos Hubs de Eventos
- Criar um HDInsight no cluster AKS com o Apache Flink
- Executar produtor Flink
- Jar do pacote para Apache Flink
- Submissão de Trabalhos & Validação
Criar namespace de Hubs de Eventos e Hubs de Eventos
Para criar namespace de Hubs de Eventos e Hubs de Eventos, consulte aqui
Configurar o Flink Cluster no HDInsight no AKS
Usando o HDInsight existente no pool de clusters AKS, você pode criar um cluster Flink
Execute o produtor Flink adicionando o bootstrap.servers e as
producer.config
informaçõesbootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Substitua
{YOUR.EVENTHUBS.CONNECTION.STRING}
pela cadeia de conexão do namespace Hubs de Eventos. Para obter instruções sobre como obter a cadeia de conexão, consulte detalhes sobre como obter uma cadeia de conexão de Hubs de Eventos.Por exemplo,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Empacotando o JAR para Flink
Pacote com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Adicione o trecho para executar o Flink Producer.
Uma vez que o código é executado, os eventos são armazenados no tópico "topic1"
Referência
- Site do Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).