AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터 처리
Azure IoT Hub는 클라우드에서 호스트되는 관리 서비스이며, IoT 애플리케이션과 연결된 디바이스 간의 통신을 위한 중앙 메시지 허브 역할을 합니다. 수백만 개의 디바이스와 백 엔드 솔루션을 안정적으로 안전하게 연결할 수 있습니다. 거의 모든 디바이스를 IoT 허브에 연결할 수 있습니다.
이 예제에서 코드는 AKS의 Azure HDInsight를 사용하여 Apache Flink® 에서 실시간 IoT 데이터를 처리하고 ADLS gen2 스토리지로 싱크합니다.
필수 조건
- Azure IoTHub 만들기
- AKS의 HDInsight에 Flink 클러스터 1.17.0 만들기
- MSI를 사용하여 ADLS Gen2에 액세스
- 개발용 IntelliJ
참고 항목
이 데모에서는 HDInsight on AKS와 동일한 VNET에서 Window VM을 Maven 프로젝트 개발 환경으로 사용합니다.
HDInsight on AKS의 Flink 클러스터 1.17.0
Azure Portal의 Azure IOT Hub
연결 문자열 내에서 Kafka 원본에 부트스트랩 서버로 추가해야 하는 서비스 버스 URL(기본 이벤트 허브 네임스페이스의 URL)을 찾을 수 있습니다. 이 예에서는 iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093
입니다.
Azure IOT 디바이스에 대한 메시지 준비
각 IoT 허브가 기본 제공 시스템 엔드포인트와 함께 제공되어 시스템 및 디바이스 메시지를 처리합니다.
자세한 내용은 VS Code를 IoT Hub 디바이스 시뮬레이터로 사용하는 방법을 참조하세요.
Flink의 코드
IOTdemo.java
KafkaSource: IoTHub는 이벤트 허브 위에 빌드되므로 kafka와 유사한 API를 지원합니다. 따라서 Flink 작업에서는 IoTHub의 메시지를 사용하기 위해 적절한 매개 변수로 KafkaSource를 정의할 수 있습니다.
FileSink: ABFS 싱크를 정의합니다.
package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.time.Duration;
public class IOTdemo {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
String connectionString = "<your iot hub connection string>";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("<your iot hub's service bus url>:9093")
.setTopics("<name of your iot hub>")
.setGroupId("$Default")
.setProperty("partition.discovery.interval.ms", "10000")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
kafka.sinkTo(sink);
env.execute("Sink Azure IOT hub to ADLS gen2");
}
}
Maven pom.xml
<groupId>contoso.example</groupId>
<artifactId>FlinkIOTDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Flink 클러스터에서 jar 준비 및 작업 제출
webssh Pod로 jar를 업로드하고 jar를 제출합니다.
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
Flink 대시보드 UI에서 작업 확인
Azure Portal의 ADLS gen2에서 결과 확인
참조
- Apache Flink 웹 사이트
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, 관련 오픈 소스 프로젝트 이름은 ASF(Apache Software Foundation)의 상표입니다.