다음을 통해 공유


AKS의 Azure HDInsight를 사용하여 Apache Flink®에서 실시간 IoT 데이터 처리

Azure IoT Hub는 클라우드에서 호스트되는 관리 서비스이며, IoT 애플리케이션과 연결된 디바이스 간의 통신을 위한 중앙 메시지 허브 역할을 합니다. 수백만 개의 디바이스와 백 엔드 솔루션을 안정적으로 안전하게 연결할 수 있습니다. 거의 모든 디바이스를 IoT 허브에 연결할 수 있습니다.

이 예제에서 코드는 AKS의 Azure HDInsight를 사용하여 Apache Flink® 에서 실시간 IoT 데이터를 처리하고 ADLS gen2 스토리지로 싱크합니다.

필수 조건

참고 항목

이 데모에서는 HDInsight on AKS와 동일한 VNET에서 Window VM을 Maven 프로젝트 개발 환경으로 사용합니다.

Azure Portal의 검색 창을 보여 주는 스크린샷

Azure Portal의 Azure IOT Hub

연결 문자열 내에서 Kafka 원본에 부트스트랩 서버로 추가해야 하는 서비스 버스 URL(기본 이벤트 허브 네임스페이스의 URL)을 찾을 수 있습니다. 이 예에서는 iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093입니다.

기본 제공 엔드포인트를 보여 주는 스크린샷

Azure IOT 디바이스에 대한 메시지 준비

각 IoT 허브가 기본 제공 시스템 엔드포인트와 함께 제공되어 시스템 및 디바이스 메시지를 처리합니다.

자세한 내용은 VS Code를 IoT Hub 디바이스 시뮬레이터로 사용하는 방법을 참조하세요.

메시지 전송 방법을 보여 주는 스크린샷

IOTdemo.java

  • KafkaSource: IoTHub는 이벤트 허브 위에 빌드되므로 kafka와 유사한 API를 지원합니다. 따라서 Flink 작업에서는 IoTHub의 메시지를 사용하기 위해 적절한 매개 변수로 KafkaSource를 정의할 수 있습니다.

  • FileSink: ABFS 싱크를 정의합니다.

package contoso.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.time.Duration;
public class IOTdemo {

    public static void main(String[] args) throws Exception {

        // create execution environment
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        String connectionString  = "<your iot hub connection string>";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("<your iot hub's service bus url>:9093")
                .setTopics("<name of your iot hub>")
                .setGroupId("$Default")
                .setProperty("partition.discovery.interval.ms", "10000")
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        String outputPath  = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";

        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        kafka.sinkTo(sink);

        env.execute("Sink Azure IOT hub to ADLS gen2");
    }
}

Maven pom.xml

    <groupId>contoso.example</groupId>
    <artifactId>FlinkIOTDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

webssh Pod로 jar를 업로드하고 jar를 제출합니다.

user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858

Flink UI 대시보드를 보여 주는 스크린샷

Azure Portal의 ADLS gen2에서 결과 확인

결과를 보여 주는 스크린샷

참조