Kusto Java SDK를 사용하여 데이터 수집
Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다. Java 클라이언트 라이브러리를 사용하여 Azure Data Explorer 클러스터에서 데이터를 수집하고, 관리 명령을 실행하고, 데이터를 쿼리할 수 있습니다.
이 문서에서는 Azure Data Explorer Java 라이브러리를 사용하여 데이터를 수집하는 방법을 알아봅니다. 먼저 테스트 클러스터에서 테이블과 데이터 매핑을 만듭니다. 그런 다음, Java SDK를 사용하여 Blob 스토리지에서 클러스터로 수집을 큐에 대기하고 결과의 유효성을 검사합니다.
필수 조건
- Microsoft 계정 또는 Microsoft Entra 사용자 ID입니다. Azure 구독이 필요하지 않습니다.
- Azure Data Explorer 클러스터 및 데이터베이스. 클러스터 및 데이터베이스를 만듭니다.
- Git
- JDK 버전 1.8 이상
- Maven
- 앱 등록을 만들고 데이터베이스에 대한 권한을 부여합니다. 나중에 사용할 수 있도록 클라이언트 ID 및 클라이언트 암호를 저장합니다.
코드 검토
이 섹션은 선택 사항입니다. 다음 코드 조각을 검토하여 코드가 작동하는 방법을 알아봅니다. 이 섹션을 건너뛰려면 애플리케이션 실행으로 이동합니다.
인증
이 프로그램은 ConnectionStringBuilder에서 Microsoft Entra 인증 자격 증명을 사용합니다.
쿼리 및 관리를 위해
com.microsoft.azure.kusto.data.Client
를 만듭니다.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
com.microsoft.azure.kusto.ingest.IngestClient
를 만들고 사용하여 데이터 수집을 Azure Data Explorer에 대기합니다.static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
관리 명령
관리 명령(예: .drop
및.create
)은 개체를 com.microsoft.azure.kusto.data.Client
호출 execute
하여 실행됩니다.
예를 들어 StormEvents
테이블은 다음과 같이 생성됩니다.
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
데이터 수집
큐 수집은 기존 Azure Blob Storage 컨테이너의 파일을 사용하여 수행됩니다.
BlobSourceInfo
를 사용하여 Blob Storage 경로를 지정합니다.IngestionProperties
를 사용하여 테이블, 데이터베이스, 매핑 이름 및 데이터 형식을 정의합니다. 다음 예제에서 데이터 형식은CSV
입니다.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
수집 프로세스는 별도의 스레드에서 시작되고 main
스레드는 수집 스레드가 완료될 때까지 대기합니다. 이 프로세스에서는 CountdownLatch를 사용합니다. 수집 API(IngestClient#ingestFromBlob
)는 비동기가 아닙니다. while
루프는 5초마다 현재 상태를 폴링하고 수집 상태가 Pending
에서 다른 상태로 변경될 때까지 대기하는 데 사용됩니다. 최종 상태는 Succeeded
, Failed
또는 PartiallySucceeded
일 수 있습니다.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
팁
다른 애플리케이션에 대한 수집을 비동기식으로 처리하는 다른 메서드가 있습니다. 예를 들어 CompletableFuture
를 사용하여 테이블 쿼리 등의 작업 사후 수집을 정의하는 파이프라인을 만들거나 IngestionStatus
에 보고된 예외를 처리할 수 있습니다.
애플리케이션 실행
일반
샘플 코드를 실행하면 다음 작업이 수행됩니다.
- 테이블 삭제:
StormEvents
테이블이 있는 경우 삭제됩니다. - 테이블 만들기:
StormEvents
테이블이 생성됩니다. - 매핑 만들기:
StormEvents_CSV_Mapping
매핑이 만들어집니다. - 파일 수집: CSV 파일(Azure Blob Storage)은 수집을 위해 큐에 대기됩니다.
다음 샘플 코드는 App.java
에서 가져온 것입니다.
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
팁
다양한 작업 조합을 시도하려면 App.java
에서 각 메서드의 주석을 처리하거나 제거할 있습니다.
애플리케이션 실행
GitHub에서 샘플 코드를 복제합니다.
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
다음 정보를 사용하여 서비스 주체 정보를 프로그램에서 사용하는 환경 변수로 설정합니다.
- 클러스터 엔드포인트
- 데이터베이스 이름
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
빌드하고 실행합니다.
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
다음과 유사하게 출력됩니다.
Table dropped Table created Mapping created Waiting for ingestion to complete...
수집 프로세스가 완료될 때까지 기다립니다. 성공적으로 완료되면 다음과 같은 로그 메시지가 표시됩니다. Ingestion completed successfully
이 시점에서 프로그램을 종료하고 이미 큐에 대기된 수집 프로세스에 영향을 주지 않고 다음 단계로 이동할 수 있습니다.
유효성 검사
큐에 넣은 수집 프로세스가 예약되고 데이터가 Azure Data Explorer에 로드될 때까지 10분 정도 기다립니다.
https://dataexplorer.azure.com에 로그인하고 클러스터에 연결합니다.
다음 명령을 실행하여
StormEvents
테이블의 레코드 수를 가져옵니다.StormEvents | count
문제 해결
지난 4시간 동안 수집 오류를 확인하려면 데이터베이스에서 다음 명령을 실행합니다.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인하려면 다음 명령을 실행합니다.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
리소스 정리
방금 만든 리소스를 사용하지 않으려면 데이터베이스에서 다음 명령을 실행하여 StormEvents
테이블을 삭제합니다.
.drop table StormEvents