次の方法で共有


Azure Data Explorer と Apache Flink® の統合

Azure Data Explorer は、大量のデータをほぼリアルタイムで簡単に分析できるようにする、フル マネージドかつハイ パフォーマンスなビッグ データ分析プラットフォームです。

ADX は、ユーザーがストリーミング アプリケーション、Web サイト、IoT デバイスなどから大量のデータを分析するのに役立ちます。Apache Flink と ADX の統合は、リアルタイム データを処理して ADX で分析するのに役立ちます。

前提条件

  1. Flink クラスターを作成します

  2. データベースを持つ ADX を作成し、必要に応じてテーブルを作成します。

  3. Kusto のマネージド ID のインジェスター アクセス許可を追加します。

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Kusto クラスター URI (Uniform Resource Identifier)、使用されるデータベースとマネージド ID、および書き込む必要があるテーブルを定義するサンプル プログラムを実行します。

  5. flink-connector-kusto プロジェクトを複製します: https://github.com/Azure/flink-connector-kusto.git

  6. 次のコマンドを使用して、ADX にテーブルを作成します

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 適切な Kusto クラスター URI、データベース、および使用されるマネージド ID を使用して、FlinkKustoSinkSample.java ファイルを更新します。

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    後で "mvn clean package" を使用してプロジェクトをビルドします

  8. 'sample-java/target' フォルダーの下で 'samples-java-1.0-SNAPSHOT-shaded.jar' という名前の JAR ファイルを見つけて、Flink UI でこの JAR ファイルをアップロードし、ジョブを送信します。

  9. Kusto テーブルにクエリを実行して出力を確認します

    screenshot shows query the Kusto table to verify the output.

    Flink から Kusto テーブルへのデータの書き込みに、遅延はありません。

リファレンス