AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터 처리
Azure IoT Hub는 IoT 애플리케이션과 연결된 디바이스 간의 통신을 위한 중앙 메시지 허브 역할을 하는 클라우드에서 호스트되는 관리되는 서비스입니다. 수백만 개의 디바이스와 백 엔드 솔루션을 안정적이고 안전하게 연결할 수 있습니다. 거의 모든 디바이스를 IoT Hub에 연결할 수 있습니다.
이 예제에서 코드는 AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터를 처리하고 ADLS gen2 스토리지로 싱크합니다.
필수 구성 요소
- Azure IoTHub 만들기
- AKS HDInsight에서 Flink 클러스터 1.17.0 만들기
- MSI를 사용하여 ADLS Gen2 액세스
- 개발을 위한 IntelliJ
메모
이 데모에서는 MAVEN 프로젝트가 AKS의 HDInsight와 동일한 VNET에서 인베브를 개발할 때 Window VM을 사용합니다.
AKS 상의 HDInsight에서 운영되는 Flink 클러스터 1.17.0
Azure Portal의 Azure IOT Hub
연결 문자열 내에서 Kafka 원본에서 부트스트랩 서버로 추가해야 하는 서비스 버스 URL(기본 이벤트 허브 네임스페이스의 URL)을 찾을 수 있습니다. 이 예제에서는 iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
.
Azure IOT 디바이스에 메시지 준비
각 IoT Hub에는 시스템 및 디바이스 메시지를 처리하는 기본 제공 시스템 엔드포인트가 함께 제공됩니다.
자세한 내용은 VS Code를 IoT Hub 디바이스 시뮬레이터사용하는 방법을 참조하세요.
Flink의 코드
IOTdemo.java
KafkaSource: IoTHub는 이벤트 허브를 기반으로 빌드되므로 kafka와 유사한 API를 지원합니다. 따라서 Flink 작업에서 IoTHub의 메시지를 사용하기 위한 적절한 매개 변수를 사용하여 KafkaSource를 정의할 수 있습니다.
FileSink: ABFS 싱크를 정의합니다.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
jar 파일을 패키징하고 Flink 클러스터에 작업을 제출하세요.
jar을 webssh Pod에 업로드하고 jar를 제출합니다.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Flink 대시보드 UI에서 작업 확인
Azure Portal의 ADLS gen2에서 결과 확인
참조
- Apache Flink 웹 사이트
- Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 연관된 오픈 소스 프로젝트 이름들은 Apache Software Foundation (ASF)의 상표입니다.