Processar dados de IoT em tempo real no Apache Flink® com o Azure HDInsight no AKS
O Hub IoT do Azure é um serviço gerenciado hospedado na nuvem que atua como um hub de mensagens central para comunicação entre um aplicativo IoT e seus dispositivos anexados. Você pode conectar milhões de dispositivos e suas soluções de back-end de forma confiável e segura. Quase qualquer dispositivo pode ser conectado a um hub IoT.
Neste exemplo, o código processa dados de IoT em tempo real no Apache Flink® com Azure HDInsight no AKS e armazena no ADLS gen2.
Pré-requisitos
- Criar um Azure IoTHub
- Criar o cluster Flink 1.17.0 no HDInsight no AKS
- Usar o MSI para acessar o ADLS Gen2
- IntelliJ para desenvolvimento
Nota
Para essa demonstração, estamos usando uma VM Windows como ambiente de desenvolvimento de projetos Maven na mesma VNET que o HDInsight em execução no AKS.
Cluster Flink 1.17.0 no HDInsight no AKS
Hub IOT do Azure no portal do Azure
Dentro da string de conexão, você pode encontrar uma URL do barramento de serviço (URL do namespace do hub de eventos subjacente), que você precisa adicionar como um servidor de inicialização na fonte do Kafka. Neste exemplo, é iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Preparar mensagem para o dispositivo IOT do Azure
Cada hub IoT vem com pontos de extremidade internos para lidar com mensagens do sistema e do dispositivo.
Para obter mais informações, consulte Como usar o VS Code como simulador de dispositivo do Hub IoT.
Código no Flink
IOTdemo.java
KafkaSource: O IoTHub é baseado no hub de eventos e, portanto, dá suporte a uma API semelhante a kafka. Portanto, em nosso trabalho Flink, podemos definir um KafkaSource com parâmetros apropriados para consumir mensagens do IoTHub.
FileSink: definir o coletor ABFS.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Empacotar o jar e enviar o trabalho no cluster Flink
Carregue o jar no pod webssh e envie o jar.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Verificar o trabalho na página do Dashboard do Flink
Verificar resultado no ADLS gen2 no portal do Azure
Referência
- site do Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projeto de software livre associados são marcas comerciais do ASF (Apache Software Foundation).