次の方法で共有


Azure Data Explorer と Apache Flink® の統合

Azure Data Explorer は、フル マネージドの高パフォーマンスのビッグ データ分析プラットフォームであり、大量のデータをほぼリアルタイムで簡単に分析できます。

ADX は、ユーザーがストリーミング アプリケーション、Web サイト、IoT デバイスなどの大量のデータを分析するのに役立ちます。Apache Flink と ADX の統合は、リアルタイム データを処理し、ADX で分析するのに役立ちます。

前提 条件

  • AKS 上の HDInsight で Apache Flink クラスターを作成する
  • Azure データ エクスプローラー を作成する
  1. Flink クラスターを作成します。

  2. 必要に応じて、データベース とテーブルを使用して ADX を作成します。

  3. Kusto でマネージド ID に対するインジェスター権限を追加します。

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Kusto クラスター URI (Uniform Resource Identifier)、使用されるデータベースとマネージド ID、および書き込む必要があるテーブルを定義するサンプル プログラムを実行します。

  5. flink-connector-kusto プロジェクトを複製する: https://github.com/Azure/flink-connector-kusto.git

  6. 次のコマンドを使用して ADX でテーブルを作成する

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 適切な Kusto クラスター URI、データベース、および使用されるマネージド ID を使用して、FlinkKustoSinkSample.java ファイルを更新します。

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    後で "mvn clean package" を使用してプロジェクトをビルドする

  8. 'sample-java/target' フォルダーの下に 'samples-java-1.0-SNAPSHOT-shaded.jar' という名前の JAR ファイルを見つけて、Flink UI でこの JAR ファイルをアップロードし、ジョブを送信します。

  9. Kusto テーブルにクエリを実行して出力を確認する

    スクリーンショットは、Kusto テーブルにクエリを実行して出力を確認する方法を示しています。

    Flink から Kusto テーブルにデータを書き込むのに遅延はありません。

参考