Condividi tramite


Collega Apache Flink® su HDInsight su AKS con Azure Event Hubs per Apache Kafka®

Importante

Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più su con questo annuncio.

È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.

Importante

Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviare una richiesta su AskHDInsight con i dettagli e seguire noi per altri aggiornamenti su Community di Azure HDInsight.

Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con l'invio dei flussi di eventi a Kafka, che possono essere consumati dai processi Flink. Hub eventi di Azure fornisce un endpoint Apache Kafka in un hub eventi, che consente agli utenti di connettersi all'hub eventi usando il protocollo Kafka.

Questo articolo illustra come connettersi hub eventi di Azure con Apache Flink in HDInsight nel servizio Azure Kubernetes e illustra quanto segue

  • Creare un namespace per Event Hubs
  • Creare un cluster HDInsight su AKS con Apache Flink
  • Eseguire il produttore di dati Flink
  • Jar del pacchetto per Apache Flink
  • Convalida dell'invio di lavoro &

Creare uno spazio dei nomi di Hub eventi e Hub eventi

  1. Per creare uno spazio dei nomi di Hub eventi e Hub eventi, vedere qui

    Screenshot che mostra la configurazione dell'Event Hubs.

  1. Usando l'HDInsight esistente nel pool di cluster di AKS, è possibile creare un cluster Flink

  2. Eseguire il producer Flink aggiungendo il bootstrap.servers e le informazioni producer.config

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Sostituisci {YOUR.EVENTHUBS.CONNECTION.STRING} con la stringa di connessione per lo spazio dei nomi di Event Hub. Per istruzioni su come ottenere la stringa di connessione, vedere i dettagli su come ottenere una stringa di connessione di Event Hubs.

    Per esempio

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Pacchetto com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Aggiungere il frammento di codice per eseguire Flink Producer.

    Screenshot che mostra come testare Flink in Event Hubs.

  3. Dopo l'esecuzione del codice, gli eventi vengono archiviati nell'argomento "topic1"

    Screenshot che mostra Event Hubs memorizzati nell'argomento.

Riferimento