Collega Apache Flink® su HDInsight su AKS con Azure Event Hubs per Apache Kafka®
Importante
Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più su con questo annuncio.
È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.
Importante
Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviare una richiesta su AskHDInsight con i dettagli e seguire noi per altri aggiornamenti su Community di Azure HDInsight.
Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con l'invio dei flussi di eventi a Kafka, che possono essere consumati dai processi Flink. Hub eventi di Azure fornisce un endpoint Apache Kafka in un hub eventi, che consente agli utenti di connettersi all'hub eventi usando il protocollo Kafka.
Questo articolo illustra come connettersi hub eventi di Azure con Apache Flink in HDInsight nel servizio Azure Kubernetes e illustra quanto segue
- Creare un namespace per Event Hubs
- Creare un cluster HDInsight su AKS con Apache Flink
- Eseguire il produttore di dati Flink
- Jar del pacchetto per Apache Flink
- Convalida dell'invio di lavoro &
Creare uno spazio dei nomi di Hub eventi e Hub eventi
Per creare uno spazio dei nomi di Hub eventi e Hub eventi, vedere qui
Impostare il cluster Flink in HDInsight su AKS
Usando l'HDInsight esistente nel pool di cluster di AKS, è possibile creare un cluster Flink
Eseguire il producer Flink aggiungendo il bootstrap.servers e le informazioni
producer.config
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Sostituisci
{YOUR.EVENTHUBS.CONNECTION.STRING}
con la stringa di connessione per lo spazio dei nomi di Event Hub. Per istruzioni su come ottenere la stringa di connessione, vedere i dettagli su come ottenere una stringa di connessione di Event Hubs.Per esempio
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Creazione del pacchetto JAR per Flink
Pacchetto com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Aggiungere il frammento di codice per eseguire Flink Producer.
Dopo l'esecuzione del codice, gli eventi vengono archiviati nell'argomento "topic1"
Riferimento
- sito Web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi registrati della Apache Software Foundation (ASF).