다음을 통해 공유


Azure Data Explorer 및 Apache Flink® 통합

Azure Data Explorer는 대용량 데이터를 거의 실시간으로 쉽게 분석할 수 있는 완전 관리형 고성능 빅 데이터 분석 플랫폼입니다.

ADX는 사용자가 스트리밍 애플리케이션, 웹 사이트, IoT 디바이스 등에서 대량 데이터를 분석하는 데 도움이 됩니다. Apache Flink를 ADX와 통합하면 ADX에서 실시간 데이터를 처리하고 분석할 수 있습니다.

필수 조건

  1. Flink 클러스터를 만듭니다.

  2. 필요에 따라 테이블 및 데이터베이스를 사용하여 ADX를 만듭니다.

  3. Kusto에서 관리 ID에 대한 수집기 권한을 추가합니다.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Kusto 클러스터 URI(Uniform Resource Identifier), 사용되는 데이터베이스 및 관리 ID와 데이터를 써야하는 테이블을 정의하는 샘플 프로그램을 실행합니다.

  5. flink-connector-kusto 프로젝트를 복제합니다. https://github.com/Azure/flink-connector-kusto.git

  6. 다음 명령을 사용하여 ADX에서 테이블 만들기

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 올바른 Kusto 클러스터 URI, 데이터베이스 및 사용된 관리 ID로 FlinkKustoSinkSample.java 파일을 업데이트합니다.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    나중에 "mvn clean package"를 사용하여 프로젝트를 빌드합니다.

  8. 'sample-java/target' 폴더에서 'samples-java-1.0-SNAPSHOT-shaded.jar'이라는 JAR 파일을 찾은 다음 Flink UI에 이 JAR 파일을 업로드하고 작업을 제출합니다.

  9. Kusto 테이블을 쿼리하여 출력을 확인합니다.

    screenshot shows query the Kusto table to verify the output.

    Flink에서 Kusto 테이블에 데이터를 쓰는 데 지연이 발생하지 않습니다.

참조