Compartir a través de


Procesamiento de datos de IoT en tiempo real en Apache Flink® con Azure HDInsight en AKS

Azure IoT Hub es un servicio administrado, hospedado en la nube, que actúa como centro de mensajes para la comunicación entre una aplicación de IoT y los dispositivos conectados. Puede conectar millones de dispositivos y sus soluciones de back-end con confianza y de forma segura. La mayoría de los dispositivos se pueden conectar a un centro de IoT.

En este ejemplo, el código procesa datos de IoT en tiempo real en Apache Flink® con Azure HDInsight en AKS y receptores en el almacenamiento de ADLS Gen2.

Requisitos previos

Nota:

Para esta demostración, utilizamos una máquina virtual de Windows como entorno de desarrollo del proyecto Maven en la misma VNET que HDInsight en AKS.

Diagrama que muestra la barra de búsqueda en Azure Portal.

Azure IOT Hub en Azure Portal

Dentro de la cadena de conexión, puede encontrar una dirección URL de Service Bus (dirección URL del espacio de nombres del centro de eventos subyacente), que debe agregar como servidor de arranque en el origen de Kafka. En este ejemplo, es iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093.

Captura de pantalla que muestra los puntos de conexión integrados.

Preparación del mensaje en un dispositivo IOT de Azure

Cada centro de IoT viene con puntos de conexión del sistema integrados para controlar los mensajes del sistema y del dispositivo.

Para obtener más información, consulte Uso de VS Code como simulador de dispositivos de IoT Hub.

Captura de pantalla que muestra cómo enviar mensajes.

IOTdemo.java

  • KafkaSource: IoTHub se basa en el centro de eventos y, por tanto, admite una API similar a Kafka. Por lo tanto, en nuestro trabajo de Flink, podemos definir un KafkaSource con los parámetros adecuados para consumir mensajes de IoTHub.

  • FileSink: definir el receptor ABFS.

package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.time.Duration;
public class IOTdemo {

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        String connectionString  = "<your iot hub connection string>";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("<your iot hub's service bus url>:9093")
                .setTopics("<name of your iot hub>")
                .setGroupId("$Default")
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        String outputPath  = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        kafka.sinkTo(sink);

        env.execute("Sink Azure IOT hub to ADLS gen2");
    }
}

Maven pom.xml

    <groupId>contoso.example</groupId>
    <artifactId>FlinkIOTDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Cargue el archivo jar en el pod de webssh y envíe el archivo jar.

user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858

Captura de pantalla que muestra el panel de la interfaz de usuario de Flink.

Comprobar el resultado en ADLS Gen2 en Azure Portal

Captura de pantalla que muestra los resultados.

Referencia