Elaborare dati IoT in tempo reale su Apache Flink® con Azure HDInsight su AKS
L'hub IoT di Azure è un servizio gestito ospitato nel cloud che funge da hub messaggi centrale per la comunicazione tra un'applicazione IoT e i relativi dispositivi collegati. È possibile connettere milioni di dispositivi e le relative soluzioni back-end in modo affidabile e sicuro. Quasi tutti i dispositivi possono essere connessi a un hub IoT.
In questo esempio, il codice elabora i dati IoT in tempo reale su Apache Flink® con Azure HDInsight su AKS e trasferisce nell'archiviazione ADLS Gen2.
Prerequisiti
- Creare un Azure IoT Hub
- Creare un cluster Flink 1.17.0 su HDInsight su AKS
- Usare MSI per accedere ad ADLS Gen2
- IntelliJ per lo sviluppo
Nota
Per questa dimostrazione, utilizziamo una macchina virtuale Windows come ambiente di sviluppo del progetto Maven nella stessa VNET di HDInsight su AKS.
Cluster Flink 1.17.0 su HDInsight su AKS
Hub IOT di Azure nel portale di Azure
All'interno della stringa di connessione è possibile trovare un URL del bus di servizio (URL dello spazio dei nomi dell'hub eventi sottostante), che è necessario aggiungere come server bootstrap nell'origine Kafka. In questo esempio, è iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Preparare il messaggio nel dispositivo IOT di Azure
Ogni hub IoT include endpoint di sistema predefiniti per gestire i messaggi di sistema e dei dispositivi.
Per altre informazioni, vedere Come usare VS Code come simulatore di dispositivi dell'hub IoT.
Codice in Flink
IOTdemo.java
KafkaSource: IoTHub si basa sull'hub eventi e supporta quindi un'API simile a kafka. Nel processo Flink è quindi possibile definire un Oggetto KafkaSource con parametri appropriati per l'utilizzo dei messaggi da IoTHub.
FileSink: definire il sink ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Confezionare il jar e inviare il job nel cluster Flink
Caricare il file JAR nel pod Webssh e inviare il file JAR.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Controllare il job nella UI del Flink Dashboard
Controllare il risultato in ADLS Gen2 nel portale di Azure
Riferimento
- sito Web Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi dell' Apache Software Foundation (ASF).