Pozyskiwanie danych przy użyciu zestawu Sdk Języka Java kusto
Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Biblioteka kliencka Języka Java może służyć do pozyskiwania danych, wydawania poleceń zarządzania i wykonywania zapytań dotyczących danych w klastrach usługi Azure Data Explorer.
Z tego artykułu dowiesz się, jak pozyskiwać dane przy użyciu biblioteki Java usługi Azure Data Explorer. Najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie utworzysz kolejkę pozyskiwania z magazynu obiektów blob do klastra przy użyciu zestawu SDK języka Java i zweryfikujesz wyniki.
Wymagania wstępne
- Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danych.
- Git.
- Zestaw JDK w wersji 1.8 lub nowszej.
- Maven.
- Utwórz rejestrację aplikacji i przyznaj jej uprawnienia do bazy danych. Zapisz identyfikator klienta i klucz tajny klienta do późniejszego użycia.
Przeglądanie kodu
Ta sekcja jest opcjonalna. Zapoznaj się z poniższymi fragmentami kodu, aby dowiedzieć się, jak działa kod. Aby pominąć tę sekcję, przejdź do uruchomienia aplikacji.
Uwierzytelnianie
Program używa poświadczeń uwierzytelniania entra firmy Microsoft z elementem ConnectionStringBuilder".
Utwórz element
com.microsoft.azure.kusto.data.Client
do wykonywania zapytań i zarządzania nimi.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Utwórz i użyj elementu
com.microsoft.azure.kusto.ingest.IngestClient
do kolejki pozyskiwania danych w usłudze Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Polecenia zarządzania
Polecenia zarządzania, takie jak .drop
i .create
, są wykonywane przez wywołanie execute
obiektu com.microsoft.azure.kusto.data.Client
.
Na przykład tabela jest tworzona StormEvents
w następujący sposób:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Pozyskiwanie danych
Pozyskiwanie kolejek przy użyciu pliku z istniejącego kontenera usługi Azure Blob Storage.
- Użyj
BlobSourceInfo
polecenia , aby określić ścieżkę usługi Blob Storage. - Służy
IngestionProperties
do definiowania tabeli, bazy danych, nazwy mapowania i typu danych. W poniższym przykładzie typ danych toCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Proces pozyskiwania rozpoczyna się w osobnym wątku, a main
wątek czeka na zakończenie wątku pozyskiwania. W tym procesie jest używana funkcja CountdownLatch. Interfejs API pozyskiwania (IngestClient#ingestFromBlob
) nie jest asynchroniczny. Pętla while
służy do sondowania bieżącego stanu co 5 sekund i oczekiwania na zmianę stanu pozyskiwania z Pending
na inny stan. Ostatnim stanem może być Succeeded
, Failed
lub PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Napiwek
Istnieją inne metody asynchroniczne obsługi pozyskiwania dla różnych aplikacji. Można na przykład użyć CompletableFuture
polecenia , aby utworzyć potok definiujący akcję po pozyskaniu, taką jak zapytanie dotyczące tabeli, lub obsługiwać wyjątki zgłoszone do elementu IngestionStatus
.
Uruchamianie aplikacji
Ogólne
Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:
- Usuwanie tabeli:
StormEvents
tabela jest usuwana (jeśli istnieje). - Tworzenie tabeli:
StormEvents
tworzona jest tabela. - Tworzenie mapowania:
StormEvents_CSV_Mapping
mapowanie jest tworzone. - Pozyskiwanie plików: plik CSV (w usłudze Azure Blob Storage) jest kolejkowany do pozyskiwania.
Poniższy przykładowy kod pochodzi z pliku App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Napiwek
Aby wypróbować różne kombinacje operacji, usuń komentarz/komentarz odpowiednich metod w pliku App.java
.
Uruchamianie aplikacji
Sklonuj przykładowy kod z usługi GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Ustaw informacje o jednostce usługi przy użyciu następujących informacji jako zmiennych środowiskowych używanych przez program:
- Punkt końcowy klastra
- Nazwa bazy danych
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Kompilowanie i uruchamianie:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Dane wyjściowe będą podobne do następujących:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Poczekaj kilka minut na zakończenie procesu pozyskiwania. Po pomyślnym zakończeniu zobaczysz następujący komunikat dziennika: Ingestion completed successfully
. Możesz w tym momencie zamknąć program i przejść do następnego kroku bez wpływu na proces pozyskiwania, który został już w kolejce.
Sprawdź poprawność
Poczekaj od pięciu do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.
Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem.
Uruchom następujące polecenie, aby uzyskać liczbę rekordów w
StormEvents
tabeli:StormEvents | count
Rozwiązywanie problemów
Aby wyświetlić błędy pozyskiwania w ciągu ostatnich czterech godzin, uruchom następujące polecenie w bazie danych:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Aby wyświetlić stan wszystkich operacji pozyskiwania w ciągu ostatnich czterech godzin, uruchom następujące polecenie:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Czyszczenie zasobów
Jeśli nie planujesz używania utworzonych zasobów, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents
.
.drop table StormEvents