Aufnehmen von Daten mithilfe des Kusto Java SDK
Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Die Java-Clientbibliothek kann zum Aufnehmen von Daten, Problemverwaltungsbefehlen und Abfragen von Daten in Azure Data Explorer-Clustern verwendet werden.
In diesem Artikel erfahren Sie, wie Sie Daten mithilfe der Azure Data Explorer-Java-Bibliothek erfassen. Sie erstellen zunächst eine Tabelle und eine Datenzuordnung in einem Testcluster. Anschließend reihen Sie eine Erfassung aus Blobspeicher für den Cluster mithilfe des Java SDK in die Warteschlange ein und überprüfen die Ergebnisse.
Voraussetzungen
- Ein Microsoft-Konto oder eine Microsoft Entra-Benutzeridentität. Ein Azure-Abonnement ist nicht erforderlich.
- Schnellstart: Erstellen eines Azure Data Explorer-Clusters und einer Datenbank. Erstellen eines Clusters und einer Datenbank
- Git.
- Mindestens JDK-Version 1.8
- Maven.
- Erstellen Sie eine App-Registrierung, und gewähren Sie ihr Berechtigungen für die Datenbank. Speichern Sie die Client-ID und den geheimen Clientschlüssel zur späteren Verwendung.
Überprüfen des Codes
Dieser Abschnitt ist optional. Anhand der folgenden Codeausschnitte können Sie sich mit der Funktionsweise des Codes vertraut machen. Wenn Sie Abschnitt überspringen möchten, gehen Sie zu Ausführen der Anwendung.
Authentifizierung
Das Programm verwendet Microsoft Entra-Authentifizierungsanmeldeinformationen mit ConnectionStringBuilder'.
Erstellen Sie
com.microsoft.azure.kusto.data.Client
für Abfrage und Verwaltung.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Erstellen und verwenden Sie ein Element vom Typ
com.microsoft.azure.kusto.ingest.IngestClient
, um die Datenerfassung in Azure Data Explorer in eine Warteschlange einzureihen:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Befehle für Verwaltung
Verwaltungsbefehle, z .drop
. B. und .create
, werden durch Aufrufen execute
eines com.microsoft.azure.kusto.data.Client
Objekts ausgeführt.
Die Tabelle StormEvents
wird beispielsweise wie folgt erstellt:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Datenerfassung
Reihen Sie eine Erfassung mithilfe einer Datei aus einem vorhandenen Azure Blob Storage-Container in eine Warteschlange ein.
- Verwenden Sie
BlobSourceInfo
zum Angeben des Blob Storage-Pfads. - Verwenden Sie
IngestionProperties
zum Definieren der Tabelle, der Datenbank, des Zuordnungsnamens und des Datentyps. Im folgenden Beispiel lautet der DatentypCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Der Erfassungsprozess startet in einem separaten Thread, und der main
-Thread wartet auf den Abschluss des Erfassungsthreads. Bei diesem Prozess wird CountdownLatch verwendet. Die Erfassungs-API (IngestClient#ingestFromBlob
) ist nicht asynchron. Mit einer while
-Schleife wird alle fünf Sekunden der aktuelle Status abgefragt, und es wird gewartet, bis der Erfassungsstatus Pending
in einen anderen Status geändert wird. Der endgültige Status kann Succeeded
, Failed
oder PartiallySucceeded
lauten.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tipp
Es gibt noch weitere Methoden, um die Erfassung für verschiedene Anwendungen asynchron zu verarbeiten. Beispielsweise können Sie CompletableFuture
verwenden, um eine Pipeline zu erstellen, die die Aktion nach der Erfassung definiert, etwa das Abfragen der Tabelle oder das Behandeln von Ausnahmen, die an IngestionStatus
gemeldet wurden.
Ausführen der Anwendung
Allgemein
Wenn Sie den Beispielcode ausführen, werden folgende Aktionen ausgeführt:
- Tabellenlöschung: Die Tabelle
StormEvents
wird gelöscht (sofern vorhanden). - Tabellenerstellung: Die Tabelle
StormEvents
wird erstellt. - Zuordnungserstellung: Die Zuordnung
StormEvents_CSV_Mapping
wird erstellt. - Dateiaufnahme: Eine CSV-Datei (in Azure Blob Storage) wird für die Aufnahme in die Warteschlange gestellt.
Der folgende Beispielcode stammt aus App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tipp
Sie können verschiedene Vorgangskombinationen ausprobieren, indem Sie die entsprechenden Methoden in App.java
auskommentieren bzw. kommentieren.
Ausführen der Anwendung
Klonen Sie den Beispielcode von GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Legen Sie die Dienstprinzipalinformationen mit den folgenden Informationen als Umgebungsvariablen fest, die vom Programm verwendet werden:
- Clusterendpunkt
- Datenbankname
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Erstellen und führen Sie Folgendes aus:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Die Ausgabe ähnelt der folgenden:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Warten Sie einige Minuten, bis die Erfassung abgeschlossen ist. Bei erfolgreicher Ausführung wird die folgende Protokollmeldung angezeigt: Ingestion completed successfully
. Sie können das Programm an dieser Stelle beenden und mit dem nächsten Schritt fortfahren, ohne den Erfassungsprozess zu beeinträchtigen, der bereits in die Warteschlange eingereiht wurde.
Überprüfen
Warten Sie fünf bis zehn Minuten, bis die in der Warteschlange befindliche Erfassung geplant wurde und die Daten in Azure Data Explorer geladen wurden.
Melden Sie sich bei https://dataexplorer.azure.com an, und stellen Sie eine Verbindung mit Ihrem Cluster her.
Führen Sie den folgenden Befehl aus, um die Anzahl von Datensätzen in der Tabelle
StormEvents
zu erhalten:StormEvents | count
Problembehandlung
Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Erfassungsfehler aufgetreten sind:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Führen Sie den folgenden Befehl aus, um den Status aller Erfassungsvorgänge in den letzten vier Stunden anzuzeigen:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Bereinigen von Ressourcen
Wenn Sie die erstellten Ressourcen nicht verwenden möchten, führen Sie in Ihrer Datenbank den folgenden Befehl aus, um die Tabelle StormEvents
zu löschen:
.drop table StormEvents
Zugehöriger Inhalt
- Write queries (Schreiben von Abfragen)