Partage via


Connecter Apache Flink® sur HDInsight sur AKS avec Azure Event Hubs pour Apache Kafka®

Remarque

Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.

Seul le support de base est disponible jusqu’à la date de mise hors service.

Important

Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.

Un cas d’usage bien connu d’Apache Flink est l’analyse de flux. Le choix populaire de nombreux utilisateurs d’utiliser les flux de données ingérés à l’aide d’Apache Kafka. Les installations typiques de Flink et Kafka commencent par le transfert de flux d’événements vers Kafka, qui peuvent être consommés par les tâches Flink. Azure Event Hubs fournit un point de terminaison Apache Kafka sur un hub d’événements, qui permet aux utilisateurs de se connecter au hub d’événements à l’aide du protocole Kafka.

Dans cet article, nous allons découvrir comment connecter Azure Event Hubs avec Apache Flink sur HDInsight sur AKS et couvrir les éléments suivants

  • Créer un espace de noms Event Hubs
  • Créer un cluster HDInsight sur AKS avec Apache Flink
  • Exécuter le producteur de Flink
  • Empaqueter un fichier Jar pour Apache Flink
  • Soumission de tâche et validation

Créer un espace de noms Event Hubs et des Event Hubs

  1. Pour créer un espace de noms Event Hubs et des Event Hubs, consultez cet article

    Capture d’écran montrant la configuration d’Event Hubs.

  1. À l’aide du pool de clusters HDInsight existant sur AKS, vous pouvez créer un cluster Flink.

  2. Exécutez le producteur Flink en ajoutant les bootstrap.servers et les informations producer.config.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Remplacez {YOUR.EVENTHUBS.CONNECTION.STRING} par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez les détails sur la façon d’obtenir une chaîne de connexion Event Hubs.

    Par exemple,

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Empaqueter com.example.app ;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Ajoutez l’extrait de code pour exécuter le producteur de Flink.

    Capture d’écran montrant comment tester Flink dans Event Hubs.

  3. Une fois le code exécuté, les événements sont stockés dans la rubrique "topic1"

    Capture d’écran montrant Event Hubs stocké dans la rubrique.

Référence