Usando o Apache Kafka® no HDInsight com o Apache Flink® no HDInsight no AKS
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para as Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam às funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas em geral. Para obter informações sobre essa visualização específica, consulte Azure HDInsight no AKS informações de visualização. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre a Comunidade do Azure HDInsight .
Um caso de uso bem conhecido para o Apache Flink é a análise de fluxo. A escolha popular de muitos utilizadores é usar os fluxos de dados que são ingeridos através do Apache Kafka. As instalações típicas de Flink e Kafka começam com fluxos de eventos sendo transmitidos para o Kafka, que podem ser consumidos por tarefas Flink.
Este exemplo usa o HDInsight em clusters AKS que executam o Flink 1.17.0 para processar o consumo de dados de streaming e a produção do tópico Kafka.
Observação
FlinkKafkaConsumer foi preterido e será removido com Flink 1.17, use KafkaSource em vez disso. FlinkKafkaProducer foi preterido e será removido com Flink 1.15, use KafkaSink em vez disso.
Pré-requisitos
Tanto Kafka quanto Flink precisam estar na mesma VNet ou deve haver vnet-peering entre os dois clusters.
Criação deVNet.
Crie um cluster Kafka na mesma rede virtual. Você pode escolher Kafka 3.2 ou 2.4 no HDInsight com base no seu uso atual.
Adicione os detalhes da rede virtual na seção de rede virtual.
Crie um HDInsight no pool de clusters AKS com a mesma rede virtual.
Crie um cluster Flink para o pool de clusters criado.
Conector Apache Kafka
Flink fornece um Apache Kafka Connector para carregar dados de e escrever dados em tópicos Kafka com garantias de entrega uma única vez.
de dependência do Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Edifício Kafka Sink
Kafka sink fornece uma classe builder para construir uma instância de um KafkaSink. Usamos o mesmo para construir o nosso sink e utilizá-lo juntamente com o cluster Flink em execução no HDInsight no AKS.
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Escrevendo um programa Java Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Empacote o frasco e envie o trabalho para Flink
Em Webssh, carregue o ficheiro jar e envie o ficheiro jar
Na interface do usuário do painel Flink
Crie o tópico - interações em Kafka
Consuma o tema - eventos sobre Kafka
Referência
- Apache Kafka Connector
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).