Mata in data med Kusto Java SDK
Azure Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Java-klientbiblioteket kan användas för att mata in data, köra ärendehanteringskommandon och fråga efter data i Azure Data Explorer kluster.
I den här artikeln får du lära dig hur du matar in data med hjälp av Azure Data Explorer Java-biblioteket. Först skapar du en tabell och en datamappning i ett testkluster. Sedan köar du en inmatning från bloblagring till klustret med hjälp av Java SDK och validerar resultatet.
Förutsättningar
- Ett Microsoft-konto eller en Microsoft Entra användaridentitet. En Azure-prenumeration krävs inte.
- Ett Azure Data Explorer-kluster och en databas. Skapa ett kluster och en databas.
- Git.
- JDK version 1.8 eller senare.
- Maven.
- Skapa en appregistrering och ge den behörighet till databasen. Spara klient-ID och klienthemlighet för senare användning.
Granska koden
Det här avsnittet är valfritt. Läs följande kodfragment för att lära dig hur koden fungerar. Om du vill hoppa över det här avsnittet går du till kör programmet.
Autentisering
Programmet använder Microsoft Entra autentiseringsuppgifter med ConnectionStringBuilder'.
Skapa en
com.microsoft.azure.kusto.data.Client
för frågor och hantering.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Skapa och använda en
com.microsoft.azure.kusto.ingest.IngestClient
för att köa datainmatning till Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Kommandon för hantering
Hanteringskommandon, till exempel .drop
och .create
, körs genom att anropa execute
på ett com.microsoft.azure.kusto.data.Client
objekt.
Tabellen skapas till exempel på StormEvents
följande sätt:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Datainhämtning
Köinmatning med hjälp av en fil från en befintlig Azure Blob Storage container.
- Använd
BlobSourceInfo
för att ange bloblagringssökvägen. - Använd
IngestionProperties
för att definiera tabell, databas, mappningsnamn och datatyp. I följande exempel ärCSV
datatypen .
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Inmatningsprocessen startar i en separat tråd och tråden main
väntar på att inmatningstråden ska slutföras. Den här processen använder CountdownLatch. Inmatnings-API:et (IngestClient#ingestFromBlob
) är inte asynkront. En while
loop används för att avsöka aktuell status var 5:e sekund och väntar på att inmatningsstatusen ska ändras från Pending
till en annan status. Den slutliga statusen kan vara Succeeded
, Failed
eller PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tips
Det finns andra metoder för att hantera inmatning asynkront för olika program. Du kan till exempel använda CompletableFuture
för att skapa en pipeline som definierar åtgärden efter inmatningen, till exempel fråga tabellen eller hantera undantag som har rapporterats till IngestionStatus
.
Kör programmet
Allmänt
När du kör exempelkoden utförs följande åtgärder:
-
Släpp tabell:
StormEvents
tabellen tas bort (om den finns). -
Skapa tabell:
StormEvents
tabellen skapas. -
Skapa mappning:
StormEvents_CSV_Mapping
mappning skapas. - Filinmatning: En CSV-fil (i Azure Blob Storage) placeras i kö för inmatning.
Följande exempelkod är från App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tips
Om du vill prova olika kombinationer av åtgärder avkommenterar/kommenterar du respektive metoder i App.java
.
Kör programmet
Klona exempelkoden från GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Ange informationen om tjänstens huvudnamn med följande information som miljövariabler som används av programmet:
- Klusterslutpunkt
- Databasnamn
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Skapa och kör:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Utdata kommer att likna:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Vänta några minuter tills inmatningsprocessen har slutförts. När du har slutfört det visas följande loggmeddelande: Ingestion completed successfully
. Du kan avsluta programmet nu och gå vidare till nästa steg utan att påverka inmatningsprocessen, som redan har placerats i kö.
Verifiera
Vänta fem till tio minuter tills den köade inmatningen schemalägger inmatningsprocessen och läser in data i Azure Data Explorer.
Logga in på https://dataexplorer.azure.com och anslut till klustret.
Kör följande kommando för att hämta antalet poster i
StormEvents
tabellen:StormEvents | count
Felsöka
Om du vill se inmatningsfel under de senaste fyra timmarna kör du följande kommando i databasen:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Om du vill visa status för alla inmatningsåtgärder under de senaste fyra timmarna kör du följande kommando:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Rensa resurser
Om du inte planerar att använda de resurser som du har skapat kör du följande kommando i databasen för att släppa StormEvents
tabellen.
.drop table StormEvents