Udostępnij za pośrednictwem


Pozyskiwanie danych przy użyciu zestawu Sdk Języka Java kusto

Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Biblioteka kliencka Języka Java może służyć do pozyskiwania danych, wydawania poleceń zarządzania i wykonywania zapytań dotyczących danych w klastrach usługi Azure Data Explorer.

Z tego artykułu dowiesz się, jak pozyskiwać dane przy użyciu biblioteki Java usługi Azure Data Explorer. Najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie utworzysz kolejkę pozyskiwania z magazynu obiektów blob do klastra przy użyciu zestawu SDK języka Java i zweryfikujesz wyniki.

Wymagania wstępne

Przeglądanie kodu

Ta sekcja jest opcjonalna. Zapoznaj się z poniższymi fragmentami kodu, aby dowiedzieć się, jak działa kod. Aby pominąć tę sekcję, przejdź do uruchomienia aplikacji.

Uwierzytelnianie

Program używa poświadczeń uwierzytelniania entra firmy Microsoft z elementem ConnectionStringBuilder".

  1. Utwórz element com.microsoft.azure.kusto.data.Client do wykonywania zapytań i zarządzania nimi.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Utwórz i użyj elementu com.microsoft.azure.kusto.ingest.IngestClient do kolejki pozyskiwania danych w usłudze Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Polecenia zarządzania

Polecenia zarządzania, takie jak .drop i .create, są wykonywane przez wywołanie execute obiektu com.microsoft.azure.kusto.data.Client .

Na przykład tabela jest tworzona StormEvents w następujący sposób:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Pozyskiwanie danych

Pozyskiwanie kolejek przy użyciu pliku z istniejącego kontenera usługi Azure Blob Storage.

  • Użyj BlobSourceInfo polecenia , aby określić ścieżkę usługi Blob Storage.
  • Służy IngestionProperties do definiowania tabeli, bazy danych, nazwy mapowania i typu danych. W poniższym przykładzie typ danych to CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Proces pozyskiwania rozpoczyna się w osobnym wątku, a main wątek czeka na zakończenie wątku pozyskiwania. W tym procesie jest używana funkcja CountdownLatch. Interfejs API pozyskiwania (IngestClient#ingestFromBlob) nie jest asynchroniczny. Pętla while służy do sondowania bieżącego stanu co 5 sekund i oczekiwania na zmianę stanu pozyskiwania z Pending na inny stan. Ostatnim stanem może być Succeeded, Failedlub PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Napiwek

Istnieją inne metody asynchroniczne obsługi pozyskiwania dla różnych aplikacji. Można na przykład użyć CompletableFuture polecenia , aby utworzyć potok definiujący akcję po pozyskaniu, taką jak zapytanie dotyczące tabeli, lub obsługiwać wyjątki zgłoszone do elementu IngestionStatus.

Uruchamianie aplikacji

Ogólne

Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:

  1. Usuwanie tabeli: StormEvents tabela jest usuwana (jeśli istnieje).
  2. Tworzenie tabeli: StormEvents tworzona jest tabela.
  3. Tworzenie mapowania: StormEvents_CSV_Mapping mapowanie jest tworzone.
  4. Pozyskiwanie plików: plik CSV (w usłudze Azure Blob Storage) jest kolejkowany do pozyskiwania.

Poniższy przykładowy kod pochodzi z pliku App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Napiwek

Aby wypróbować różne kombinacje operacji, usuń komentarz/komentarz odpowiednich metod w pliku App.java.

Uruchamianie aplikacji

  1. Sklonuj przykładowy kod z usługi GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Ustaw informacje o jednostce usługi przy użyciu następujących informacji jako zmiennych środowiskowych używanych przez program:

    • Punkt końcowy klastra
    • Nazwa bazy danych
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Kompilowanie i uruchamianie:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Dane wyjściowe będą podobne do następujących:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Poczekaj kilka minut na zakończenie procesu pozyskiwania. Po pomyślnym zakończeniu zobaczysz następujący komunikat dziennika: Ingestion completed successfully. Możesz w tym momencie zamknąć program i przejść do następnego kroku bez wpływu na proces pozyskiwania, który został już w kolejce.

Sprawdź poprawność

Poczekaj od pięciu do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.

  1. Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem.

  2. Uruchom następujące polecenie, aby uzyskać liczbę rekordów w StormEvents tabeli:

    StormEvents | count
    

Rozwiązywanie problemów

  1. Aby wyświetlić błędy pozyskiwania w ciągu ostatnich czterech godzin, uruchom następujące polecenie w bazie danych:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Aby wyświetlić stan wszystkich operacji pozyskiwania w ciągu ostatnich czterech godzin, uruchom następujące polecenie:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Czyszczenie zasobów

Jeśli nie planujesz używania utworzonych zasobów, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents .

.drop table StormEvents