Procesamiento de datos de IoT en tiempo real en Apache Flink® con Azure HDInsight en AKS
Azure IoT Hub es un servicio administrado hospedado en la nube que actúa como un centro de mensajes central para la comunicación entre una aplicación de IoT y sus dispositivos conectados. Puede conectar millones de dispositivos y sus soluciones de back-end de forma confiable y segura. Casi cualquier dispositivo se puede conectar a un centro de IoT.
En este ejemplo, el código procesa datos de IoT en tiempo real con Apache Flink® utilizando Azure HDInsight y AKS y se almacena en el almacenamiento de ADLS Gen2.
Prerrequisitos
- Crear un Azure IoTHub
- Creación de un clúster de Flink 1.17.0 en HDInsight en AKS
- Uso de MSI para acceder a ADLS Gen2
- IntelliJ para desarrollo
Nota
Para esta demostración, estamos usando una máquina virtual con Windows como entorno de desarrollo para proyectos Maven en la misma VNET que HDInsight en AKS.
Clúster Flink 1.17.0 en HDInsight sobre AKS
Azure IOT Hub en Azure Portal
Dentro de la cadena de conexión, puede encontrar una dirección URL de Service Bus (dirección URL del espacio de nombres del centro de eventos subyacente), que debe agregar como servidor de arranque en el origen de Kafka. En este ejemplo, es iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Preparación del mensaje en un dispositivo IOT de Azure
Cada centro de IoT incluye puntos de conexión del sistema integrados para controlar los mensajes del sistema y del dispositivo.
Para obtener más información, consulte Uso de VS Code como simulador de dispositivos de IoT Hub.
Código en Flink
IOTdemo.java
KafkaSource: IoTHub se basa en el centro de eventos y, por tanto, admite una API similar a kafka. Por lo tanto, en nuestro trabajo de Flink, podemos definir un KafkaSource con los parámetros adecuados para consumir mensajes de IoTHub.
FileSink: defina el receptor ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Empaquetar el archivo jar y enviar el trabajo en el clúster de Flink
Cargue el archivo jar en el pod de webssh y envíe el archivo jar.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Comprobación del trabajo en la interfaz de usuario del panel de Flink
Comprobación del resultado en ADLS Gen2 en Azure Portal
Referencia
- Sitio web de Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de la Apache Software Foundation (ASF).