Traiter les données IoT en temps réel sur Apache Flink® avec Azure HDInsight sur AKS
Azure IoT Hub est un service managé, hébergé dans le cloud, qui joue le rôle de hub de messages central pour la communication entre une application IoT et les appareils attachés. Vous pouvez connecter des millions d’appareils et leurs solutions back-end de manière fiable et sécurisée. Presque tous les appareils peuvent être connectés à un hub IoT.
Dans cet exemple, le code traite les données IoT en temps réel sur Apache Flink® avec Azure HDInsight sur AKS et s’écoule vers le stockage ADLS gen2.
Prérequis
- Créer un Azure IoTHub
- Créer un cluster Flink 1.17.0 sur HDInsight sur AKS
- Utiliser MSI pour accéder à ADLS Gen2
- IntelliJ pour le développement
Remarque
Pour cette démonstration, nous utilisons une machine virtuelle Windows comme environnement de développement de projet Maven dans le même réseau virtuel que HDInsight sur AKS.
Cluster Flink 1.17.0 sur HDInsight sur AKS
Azure IOT Hub sur le portail Azure
Dans la chaîne de connexion, vous pouvez trouver une URL de bus de service (URL de l’espace de noms du hub d’événements sous-jacent), que vous devez ajouter en tant que serveur d’amorçage dans votre source Kafka. Dans cet exemple, il s’agit de iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Préparer le message dans un appareil Azure IOT
Chaque hub IoT est fourni avec des points de terminaison système intégrés pour gérer les messages système et d'appareil.
Pour plus d’informations, consultez Comment utiliser VS Code en tant que simulateur d’appareil IoT Hub.
Code dans Flink
IOTdemo.java
KafkaSource : IoTHub est construit sur Event Hub et prend donc en charge une API de type Kafka. Ainsi, dans notre travail Flink, nous pouvons définir un KafkaSource avec les paramètres appropriés pour consommer les messages d’IoTHub.
FileSink : définissez le récepteur ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Emballer le jar et soumettre le job dans le cluster Flink
Chargez le jar dans le pod webssh et soumettez le jar.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Vérifier le travail sur l’interface utilisateur du tableau de bord Flink
Vérifier le résultat sur ADLS gen2 sur le portail Azure
Référence
- Site web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).