Azure Data Explorer と Apache Flink® の統合
Azure Data Explorer は、フル マネージドの高パフォーマンスのビッグ データ分析プラットフォームであり、大量のデータをほぼリアルタイムで簡単に分析できます。
ADX は、ユーザーがストリーミング アプリケーション、Web サイト、IoT デバイスなどの大量のデータを分析するのに役立ちます。Apache Flink と ADX の統合は、リアルタイム データを処理し、ADX で分析するのに役立ちます。
前提 条件
Flink でシンクとして Azure Data Explorer を使用する手順
Flink クラスターを作成します。
必要に応じて、データベース とテーブルを使用して ADX を作成します。
Kusto でマネージド ID に対するインジェスター権限を追加します。
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Kusto クラスター URI (Uniform Resource Identifier)、使用されるデータベースとマネージド ID、および書き込む必要があるテーブルを定義するサンプル プログラムを実行します。
flink-connector-kusto プロジェクトを複製する: https://github.com/Azure/flink-connector-kusto.git
次のコマンドを使用して ADX でテーブルを作成する
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
適切な Kusto クラスター URI、データベース、および使用されるマネージド ID を使用して、FlinkKustoSinkSample.java ファイルを更新します。
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
後で "mvn clean package" を使用してプロジェクトをビルドする
'sample-java/target' フォルダーの下に 'samples-java-1.0-SNAPSHOT-shaded.jar' という名前の JAR ファイルを見つけて、Flink UI でこの JAR ファイルをアップロードし、ジョブを送信します。
Kusto テーブルにクエリを実行して出力を確認する
Flink から Kusto テーブルにデータを書き込むのに遅延はありません。
参考
- Apache Flink ウェブサイト
- Apache、Apache Flink、Flink、関連するオープンソースプロジェクトの名称は、Apache Software Foundation(ASF)の商標です。