다음을 통해 공유


AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터 처리

Azure IoT Hub는 IoT 애플리케이션과 연결된 디바이스 간의 통신을 위한 중앙 메시지 허브 역할을 하는 클라우드에서 호스트되는 관리되는 서비스입니다. 수백만 개의 디바이스와 백 엔드 솔루션을 안정적이고 안전하게 연결할 수 있습니다. 거의 모든 디바이스를 IoT Hub에 연결할 수 있습니다.

이 예제에서 코드는 AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터를 처리하고 ADLS gen2 스토리지로 싱크합니다.

필수 구성 요소

  • Azure IoTHub 만들기
  • AKS HDInsight에서 Flink 클러스터 1.17.0 만들기
  • MSI를 사용하여 ADLS Gen2 액세스
  • 개발을 위한 IntelliJ

메모

이 데모에서는 MAVEN 프로젝트가 AKS의 HDInsight와 동일한 VNET에서 인베브를 개발할 때 Window VM을 사용합니다.

Azure Portal의 검색 창을 보여 주는 다이어그램

Azure Portal의 Azure IOT Hub

연결 문자열 내에서 Kafka 원본에서 부트스트랩 서버로 추가해야 하는 서비스 버스 URL(기본 이벤트 허브 네임스페이스의 URL)을 찾을 수 있습니다. 이 예제에서는 iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093.

스크린샷은 기본 제공 엔드포인트를 보여줍니다.

Azure IOT 디바이스에 메시지 준비

각 IoT Hub에는 시스템 및 디바이스 메시지를 처리하는 기본 제공 시스템 엔드포인트가 함께 제공됩니다.

자세한 내용은 VS Code를 IoT Hub 디바이스 시뮬레이터사용하는 방법을 참조하세요.

스크린샷은 메시지를 보내는 방법을 보여줍니다.

IOTdemo.java

  • KafkaSource: IoTHub는 이벤트 허브를 기반으로 빌드되므로 kafka와 유사한 API를 지원합니다. 따라서 Flink 작업에서 IoTHub의 메시지를 사용하기 위한 적절한 매개 변수를 사용하여 KafkaSource를 정의할 수 있습니다.

  • FileSink: ABFS 싱크를 정의합니다.

package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.time.Duration;
public class IOTdemo {

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        String connectionString  = "<your iot hub connection string>";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("<your iot hub's service bus url>:9093")
                .setTopics("<name of your iot hub>")
                .setGroupId("$Default")
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        String outputPath  = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        kafka.sinkTo(sink);

        env.execute("Sink Azure IOT hub to ADLS gen2");
    }
}

Maven pom.xml

    <groupId>contoso.example</groupId>
    <artifactId>FlinkIOTDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

jar을 webssh Pod에 업로드하고 jar를 제출합니다.

user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858

Flink UI 대시보드를 보여 주는 스크린샷

Azure Portal의 ADLS gen2에서 결과 확인

결과를 보여 주는 스크린샷

참조

  • Apache Flink 웹 사이트
  • Apache, Apache Kafka, Kafka, Apache Flink, Flink 및 연관된 오픈 소스 프로젝트 이름들은 Apache Software Foundation (ASF)의 상표입니다.