Ingestování dat pomocí sady Kusto Java SDK
Azure Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Klientská knihovna Java se dá použít k příjmu dat, příkazů pro správu problémů a dotazování dat v clusterech Azure Data Exploreru.
V tomto článku se dozvíte, jak ingestovat data pomocí knihovny Java v Azure Data Exploreru. Nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak zařadíte příjem dat z úložiště objektů blob do clusteru pomocí sady Java SDK a ověříte výsledky.
Požadavky
- Účet Microsoft nebo identita uživatele Microsoft Entra. Předplatné Azure není povinné.
- Cluster a databáze Azure Data Exploreru. Vytvořte cluster a databázi.
- Git
- Sada JDK verze 1.8 nebo novější
- Maven.
- Vytvořte registraci aplikace a udělte jí oprávnění k databázi. Uložte ID klienta a tajný klíč klienta pro pozdější použití.
Kontrola kódu
Tato část je nepovinná. Projděte si následující fragmenty kódu a zjistěte, jak kód funguje. Pokud chcete tuto část přeskočit, přejděte ke spuštění aplikace.
Ověřování
Program používá přihlašovací údaje ověřování Microsoft Entra s ConnectionStringBuilder'.
Vytvořte dotaz
com.microsoft.azure.kusto.data.Client
a správu.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Vytvoření a použití k zařazování dat do fronty do Azure Data Exploreru
com.microsoft.azure.kusto.ingest.IngestClient
:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Příkazy pro správu
Příkazy pro správu, například .drop
a .create
, se spouští voláním execute
objektu com.microsoft.azure.kusto.data.Client
.
Například StormEvents
tabulka se vytvoří takto:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Příjem dat
Příjem fronty pomocí souboru z existujícího kontejneru Azure Blob Storage
- Slouží
BlobSourceInfo
k určení cesty ke službě Blob Storage. - Slouží
IngestionProperties
k definování tabulky, databáze, názvu mapování a datového typu. V následujícím příkladu jeCSV
datový typ .
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Proces příjmu dat začíná v samostatném vlákně a main
vlákno čeká na dokončení vlákna příjmu dat. Tento proces používá CountdownLatch. Rozhraní API pro příjem dat (IngestClient#ingestFromBlob
) není asynchronní. Smyčka while
se používá k dotazování aktuálního stavu každých 5 sekund a čeká na změnu stavu příjmu dat z Pending
jiného stavu. Konečný stav může být Succeeded
, Failed
nebo PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tip
Existují další metody pro asynchronní zpracování příjmu dat pro různé aplikace. Můžete například použít CompletableFuture
k vytvoření kanálu definující akci po ingestování, například dotaz na tabulku nebo zpracování výjimek, které byly hlášeny IngestionStatus
.
Spuštění aplikace
OBECNÉ
Při spuštění ukázkového kódu se provádějí následující akce:
- Drop table:
StormEvents
table is dropd (if it exists). - Vytvoření tabulky:
StormEvents
vytvoří se tabulka. - Vytvoření mapování:
StormEvents_CSV_Mapping
vytvoří se mapování. - Příjem souborů: Soubor CSV (ve službě Azure Blob Storage) se zařadí do fronty pro příjem dat.
Následující vzorový kód pochází z App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tip
Chcete-li vyzkoušet různé kombinace operací, odkomentujte/okomentujte příslušné metody v App.java
.
Spuštění aplikace
Naklonujte ukázkový kód z GitHubu:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Nastavte informace instančního objektu s následujícími informacemi jako proměnné prostředí používané programem:
- Koncový bod clusteru
- Název databáze
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Sestavení a spuštění:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Výstup bude vypadat přibližně takto:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Počkejte několik minut, než se proces příjmu dat dokončí. Po úspěšném dokončení se zobrazí následující zpráva protokolu: Ingestion completed successfully
. V tomto okamžiku můžete program ukončit a přejít k dalšímu kroku, aniž by to mělo vliv na proces příjmu dat, který už byl zařazen do fronty.
Ověřit
Počkejte pět až 10 minut, než se příjem dat ve frontě naplánuje a načte data do Azure Data Exploreru.
Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru.
Spuštěním následujícího příkazu získejte počet záznamů v
StormEvents
tabulce:StormEvents | count
Odstraňování potíží
Pokud chcete zobrazit selhání příjmu dat za posledních čtyři hodiny, spusťte v databázi následující příkaz:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Pokud chcete zobrazit stav všech operací příjmu dat za posledních čtyři hodiny, spusťte následující příkaz:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Vyčištění prostředků
Pokud nemáte v úmyslu používat prostředky, které jste vytvořili, spusťte v databázi následující příkaz, kterým tabulku vypustíte StormEvents
.
.drop table StormEvents