Connecter Apache Flink® sur HDInsight sur AKS avec Azure Event Hubs pour Apache Kafka®
Remarque
Nous allons mettre hors service Azure HDInsight sur AKS le 31 janvier 2025. Avant le 31 janvier 2025, vous devrez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent afin d’éviter leur arrêt brutal. Les clusters restants de votre abonnement seront arrêtés et supprimés de l’hôte.
Seul le support de base est disponible jusqu’à la date de mise hors service.
Important
Cette fonctionnalité est disponible actuellement en mode Aperçu. Les Conditions d’utilisation supplémentaires pour les préversions de Microsoft Azure contiennent davantage de conditions légales qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou ne se trouvant pas encore en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les Informations sur la préversion d’Azure HDInsight sur AKS. Pour toute question ou pour des suggestions à propos des fonctionnalités, veuillez envoyer vos requêtes et leurs détails sur AskHDInsight, et suivez-nous sur la Communauté Azure HDInsight pour plus de mises à jour.
Un cas d’usage bien connu d’Apache Flink est l’analyse de flux. Le choix populaire de nombreux utilisateurs d’utiliser les flux de données ingérés à l’aide d’Apache Kafka. Les installations typiques de Flink et Kafka commencent par le transfert de flux d’événements vers Kafka, qui peuvent être consommés par les tâches Flink. Azure Event Hubs fournit un point de terminaison Apache Kafka sur un hub d’événements, qui permet aux utilisateurs de se connecter au hub d’événements à l’aide du protocole Kafka.
Dans cet article, nous allons découvrir comment connecter Azure Event Hubs avec Apache Flink sur HDInsight sur AKS et couvrir les éléments suivants
- Créer un espace de noms Event Hubs
- Créer un cluster HDInsight sur AKS avec Apache Flink
- Exécuter le producteur de Flink
- Empaqueter un fichier Jar pour Apache Flink
- Soumission de tâche et validation
Créer un espace de noms Event Hubs et des Event Hubs
Pour créer un espace de noms Event Hubs et des Event Hubs, consultez cet article
Configurer un cluster Flink sur HDInsight sur AKS
À l’aide du pool de clusters HDInsight existant sur AKS, vous pouvez créer un cluster Flink.
Exécutez le producteur Flink en ajoutant les bootstrap.servers et les informations
producer.config
.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Remplacez
{YOUR.EVENTHUBS.CONNECTION.STRING}
par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez les détails sur la façon d’obtenir une chaîne de connexion Event Hubs.Par exemple,
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Empaquetage du fichier JAR pour Flink
Empaqueter com.example.app ;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Ajoutez l’extrait de code pour exécuter le producteur de Flink.
Une fois le code exécuté, les événements sont stockés dans la rubrique "topic1"
Référence
- Site web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).