Kusto Java SDK を使用してデータを取り込む
Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Java クライアント ライブラリを使用して、Azure Data Explorer クラスターでデータの取り込み、管理コマンドの発行、データのクエリを実行できます。
この記事では、Azure Data Explorer の Java ライブラリを使用してデータを取り込む方法について説明します。 まず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、Java SDK を使用して Blob Storage からクラスターへのインジェストをキューに登録し、結果を確認します。
前提条件
- Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- Git.
- JDK バージョン 1.8 以降。
- Maven。
- アプリの登録を作成し、データベースに対するアクセス許可を付与します。 後で使用するために、クライアント ID とクライアント シークレットを保存します。
コードの確認
このセクションは省略可能です。 コードの動作方法については、次のコード スニペットを確認してください。 このセクションをスキップするには、「アプリケーションの実行」に進んでください。
認証
プログラムは、ConnectionStringBuilder で Microsoft Entra 認証資格情報を使用します。
クエリと管理のために
com.microsoft.azure.kusto.data.Client
を作成します。static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
com.microsoft.azure.kusto.ingest.IngestClient
を作成して使用し、Azure Data Explorer へのデータ インジェストをキューに入れます。static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
管理コマンド
.drop
や.create
などの管理コマンドは、com.microsoft.azure.kusto.data.Client
オブジェクトでexecute
を呼び出すことによって実行されます。
たとえば、StormEvents
テーブルは次のように作成されます。
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
データ インジェスト
既存の Azure Blob Storage コンテナーのファイルを使用してインジェストをキューに登録します。
BlobSourceInfo
を使用して、Blob Storage パスを指定します。IngestionProperties
を使用して、テーブル、データベース、マッピング名、およびデータ型を定義します。 次の例では、データ型はCSV
です。
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
インジェスト プロセスは別のスレッドで開始され、main
スレッドはインジェスト スレッドが完了するまで待機します。 このプロセスでは、CountdownLatch が使用されます。 インジェスト API (IngestClient#ingestFromBlob
) は非同期ではありません。 while
ループを使用して、現在の状態が 5 秒ごとにポーリングされ、インジェストの状態が Pending
から別の状態に変わるまで待機します。 最終的な状態は、Succeeded
、Failed
、または PartiallySucceeded
です。
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
ヒント
さまざまなアプリケーションで非同期にインジェストを処理するほかの方法があります。 たとえば、CompletableFuture
を使用して、テーブルに対してクエリを実行したり、IngestionStatus
に報告された例外を処理するなど、インジェスト後のアクションを定義するパイプラインを作成できます。
アプリケーションの実行
全般
サンプル コードを実行すると、次のアクションが行われます。
- テーブルを削除する:
StormEvents
テーブルが削除されます (存在する場合)。 - テーブルの作成:
StormEvents
テーブルが作成されます。 - マッピングの作成:
StormEvents_CSV_Mapping
マッピングが作成されます。 - ファイル インジェスト: (Azure Blob Storage 内の) CSV ファイルがインジェストのためにキューに入れられます。
次のサンプル コードは、App.java
からのものです。
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
ヒント
操作のさまざまな組み合わせを試すには、App.java
内の各メソッドをコメント解除したり、コメント化したりします。
アプリケーションの実行
GitHub から次のサンプル コードをクローンします。
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
プログラムで使用される環境変数として、次の情報を使用してサービス プリンシパル情報を設定します。
- クラスター エンドポイント
- データベース名
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
ビルドして実行します。
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
次のように出力されます。
Table dropped Table created Mapping created Waiting for ingestion to complete...
インジェスト プロセスが完了するまで数分待ちます。 正常に完了すると、Ingestion completed successfully
というログ メッセージが表示されます。 この時点でプログラムを終了し、既にキューに登録されているインジェスト プロセスに影響を与えずに次の手順に進むことができます。
検証
キューに登録されたインジェストでインジェスト プロセスがスケジュールされ、Azure Data Explorer にデータが読み込まれるまで、5 分から 10 分待ちます。
https://dataexplorer.azure.com にサインインして、クラスターに接続します。
次のコマンドを実行して、
StormEvents
テーブル内のレコードの数を取得します。StormEvents | count
トラブルシューティング
過去 4 時間以内のインジェスト エラーを表示するには、データベースで次のコマンドを実行します。
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
過去 4 時間以内のすべてのインジェスト操作の状態を表示するには、次のコマンドを実行します。
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
リソースをクリーンアップする
作成したリソースを使用する予定がない場合は、データベースで次のコマンドを実行して、StormEvents
テーブルを削除します。
.drop table StormEvents