Procesamiento de datos de IoT en tiempo real en Apache Flink® con Azure HDInsight en AKS
Azure IoT Hub es un servicio administrado, hospedado en la nube, que actúa como centro de mensajes para la comunicación entre una aplicación de IoT y los dispositivos conectados. Puede conectar millones de dispositivos y sus soluciones de back-end con confianza y de forma segura. La mayoría de los dispositivos se pueden conectar a un centro de IoT.
En este ejemplo, el código procesa datos de IoT en tiempo real en Apache Flink® con Azure HDInsight en AKS y receptores en el almacenamiento de ADLS Gen2.
Requisitos previos
- Crear un Azure IoTHub
- Crear un clúster de Flink 1.17.0 en HDInsight en AKS
- Usar MSI para acceder a ADLS Gen2
- IntelliJ para desarrollo
Nota:
Para esta demostración, utilizamos una máquina virtual de Windows como entorno de desarrollo del proyecto Maven en la misma VNET que HDInsight en AKS.
Clúster de Flink 1.17.0 en HDInsight en AKS
Azure IOT Hub en Azure Portal
Dentro de la cadena de conexión, puede encontrar una dirección URL de Service Bus (dirección URL del espacio de nombres del centro de eventos subyacente), que debe agregar como servidor de arranque en el origen de Kafka. En este ejemplo, es iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Preparación del mensaje en un dispositivo IOT de Azure
Cada centro de IoT viene con puntos de conexión del sistema integrados para controlar los mensajes del sistema y del dispositivo.
Para obtener más información, consulte Uso de VS Code como simulador de dispositivos de IoT Hub.
Código en Flink
IOTdemo.java
KafkaSource: IoTHub se basa en el centro de eventos y, por tanto, admite una API similar a Kafka. Por lo tanto, en nuestro trabajo de Flink, podemos definir un KafkaSource con los parámetros adecuados para consumir mensajes de IoTHub.
FileSink: definir el receptor ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Empaquetar el archivo jar y enviar el trabajo en el clúster de Flink
Cargue el archivo jar en el pod de webssh y envíe el archivo jar.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Comprobar el trabajo en la interfaz de usuario del panel de Flink
Comprobar el resultado en ADLS Gen2 en Azure Portal
Referencia
- Sitio web de Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de Apache Software Foundation (ASF).