Condividi tramite


Elaborare dati IoT in tempo reale su Apache Flink® con Azure HDInsight su AKS

L'hub IoT di Azure è un servizio gestito ospitato nel cloud che funge da hub messaggi centrale per la comunicazione tra un'applicazione IoT e i relativi dispositivi collegati. È possibile connettere milioni di dispositivi e le relative soluzioni back-end in modo affidabile e sicuro. Quasi tutti i dispositivi possono essere connessi a un hub IoT.

In questo esempio, il codice elabora i dati IoT in tempo reale su Apache Flink® con Azure HDInsight su AKS e trasferisce nell'archiviazione ADLS Gen2.

Prerequisiti

Nota

Per questa dimostrazione, utilizziamo una macchina virtuale Windows come ambiente di sviluppo del progetto Maven nella stessa VNET di HDInsight su AKS.

Diagramma che mostra la barra di ricerca nel portale di Azure.

Hub IOT di Azure nel portale di Azure

All'interno della stringa di connessione è possibile trovare un URL del bus di servizio (URL dello spazio dei nomi dell'hub eventi sottostante), che è necessario aggiungere come server bootstrap nell'origine Kafka. In questo esempio, è iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093.

La screenshot mostra gli endpoint predefiniti.

Preparare il messaggio nel dispositivo IOT di Azure

Ogni hub IoT include endpoint di sistema predefiniti per gestire i messaggi di sistema e dei dispositivi.

Per altre informazioni, vedere Come usare VS Code come simulatore di dispositivi dell'hub IoT.

Screenshot mostra come inviare messaggi.

IOTdemo.java

  • KafkaSource: IoTHub si basa sull'hub eventi e supporta quindi un'API simile a kafka. Nel processo Flink è quindi possibile definire un Oggetto KafkaSource con parametri appropriati per l'utilizzo dei messaggi da IoTHub.

  • FileSink: definire il sink ABFS.

package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.time.Duration;
public class IOTdemo {

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        String connectionString  = "<your iot hub connection string>";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("<your iot hub's service bus url>:9093")
                .setTopics("<name of your iot hub>")
                .setGroupId("$Default")
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        String outputPath  = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        kafka.sinkTo(sink);

        env.execute("Sink Azure IOT hub to ADLS gen2");
    }
}

Maven pom.xml

    <groupId>contoso.example</groupId>
    <artifactId>FlinkIOTDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Caricare il file JAR nel pod Webssh e inviare il file JAR.

user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858

Screenshot che mostra il dashboard dell'interfaccia utente Flink.

Controllare il risultato in ADLS Gen2 nel portale di Azure

Screenshot che mostra i risultati.

Riferimento