Azure Data Explorer と Apache Flink® の統合
Azure Data Explorer は、大量のデータをほぼリアルタイムで簡単に分析できるようにする、フル マネージドかつハイ パフォーマンスなビッグ データ分析プラットフォームです。
ADX は、ユーザーがストリーミング アプリケーション、Web サイト、IoT デバイスなどから大量のデータを分析するのに役立ちます。Apache Flink と ADX の統合は、リアルタイム データを処理して ADX で分析するのに役立ちます。
前提条件
Azure Data Explorer を Flink でシンクとして使用するための手順
データベースを持つ ADX を作成し、必要に応じてテーブルを作成します。
Kusto のマネージド ID のインジェスター アクセス許可を追加します。
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Kusto クラスター URI (Uniform Resource Identifier)、使用されるデータベースとマネージド ID、および書き込む必要があるテーブルを定義するサンプル プログラムを実行します。
flink-connector-kusto プロジェクトを複製します: https://github.com/Azure/flink-connector-kusto.git
次のコマンドを使用して、ADX にテーブルを作成します
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
適切な Kusto クラスター URI、データベース、および使用されるマネージド ID を使用して、FlinkKustoSinkSample.java ファイルを更新します。
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
後で "mvn clean package" を使用してプロジェクトをビルドします
'sample-java/target' フォルダーの下で 'samples-java-1.0-SNAPSHOT-shaded.jar' という名前の JAR ファイルを見つけて、Flink UI でこの JAR ファイルをアップロードし、ジョブを送信します。
Kusto テーブルにクエリを実行して出力を確認します
Flink から Kusto テーブルへのデータの書き込みに、遅延はありません。
リファレンス
- Apache Flink Web サイト
- Apache、Apache Flink、Flink、関連するオープン ソース プロジェクト名は、Apache Software Foundation (ASF) の商標です。