Sdílet prostřednictvím


Ingestování dat pomocí sady Kusto Java SDK

Azure Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Klientská knihovna Java se dá použít k příjmu dat, příkazů pro správu problémů a dotazování dat v clusterech Azure Data Exploreru.

V tomto článku se dozvíte, jak ingestovat data pomocí knihovny Java v Azure Data Exploreru. Nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak zařadíte příjem dat z úložiště objektů blob do clusteru pomocí sady Java SDK a ověříte výsledky.

Požadavky

Kontrola kódu

Tato část je nepovinná. Projděte si následující fragmenty kódu a zjistěte, jak kód funguje. Pokud chcete tuto část přeskočit, přejděte ke spuštění aplikace.

Ověřování

Program používá přihlašovací údaje ověřování Microsoft Entra s ConnectionStringBuilder'.

  1. Vytvořte dotaz com.microsoft.azure.kusto.data.Client a správu.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Vytvoření a použití k zařazování dat do fronty do Azure Data Exploreru com.microsoft.azure.kusto.ingest.IngestClient :

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Příkazy pro správu

Příkazy pro správu, například .drop a .create, se spouští voláním execute objektu com.microsoft.azure.kusto.data.Client .

Například StormEvents tabulka se vytvoří takto:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Příjem dat

Příjem fronty pomocí souboru z existujícího kontejneru Azure Blob Storage

  • Slouží BlobSourceInfo k určení cesty ke službě Blob Storage.
  • Slouží IngestionProperties k definování tabulky, databáze, názvu mapování a datového typu. V následujícím příkladu je CSVdatový typ .
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Proces příjmu dat začíná v samostatném vlákně a main vlákno čeká na dokončení vlákna příjmu dat. Tento proces používá CountdownLatch. Rozhraní API pro příjem dat (IngestClient#ingestFromBlob) není asynchronní. Smyčka while se používá k dotazování aktuálního stavu každých 5 sekund a čeká na změnu stavu příjmu dat z Pending jiného stavu. Konečný stav může být Succeeded, Failednebo PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tip

Existují další metody pro asynchronní zpracování příjmu dat pro různé aplikace. Můžete například použít CompletableFuture k vytvoření kanálu definující akci po ingestování, například dotaz na tabulku nebo zpracování výjimek, které byly hlášeny IngestionStatus.

Spuštění aplikace

OBECNÉ

Při spuštění ukázkového kódu se provádějí následující akce:

  1. Drop table: StormEvents table is dropd (if it exists).
  2. Vytvoření tabulky: StormEvents vytvoří se tabulka.
  3. Vytvoření mapování: StormEvents_CSV_Mapping vytvoří se mapování.
  4. Příjem souborů: Soubor CSV (ve službě Azure Blob Storage) se zařadí do fronty pro příjem dat.

Následující vzorový kód pochází z App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tip

Chcete-li vyzkoušet různé kombinace operací, odkomentujte/okomentujte příslušné metody v App.java.

Spuštění aplikace

  1. Naklonujte ukázkový kód z GitHubu:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Nastavte informace instančního objektu s následujícími informacemi jako proměnné prostředí používané programem:

    • Koncový bod clusteru
    • Název databáze
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Sestavení a spuštění:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Výstup bude vypadat přibližně takto:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Počkejte několik minut, než se proces příjmu dat dokončí. Po úspěšném dokončení se zobrazí následující zpráva protokolu: Ingestion completed successfully. V tomto okamžiku můžete program ukončit a přejít k dalšímu kroku, aniž by to mělo vliv na proces příjmu dat, který už byl zařazen do fronty.

Ověřit

Počkejte pět až 10 minut, než se příjem dat ve frontě naplánuje a načte data do Azure Data Exploreru.

  1. Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru.

  2. Spuštěním následujícího příkazu získejte počet záznamů v StormEvents tabulce:

    StormEvents | count
    

Odstraňování potíží

  1. Pokud chcete zobrazit selhání příjmu dat za posledních čtyři hodiny, spusťte v databázi následující příkaz:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Pokud chcete zobrazit stav všech operací příjmu dat za posledních čtyři hodiny, spusťte následující příkaz:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Vyčištění prostředků

Pokud nemáte v úmyslu používat prostředky, které jste vytvořili, spusťte v databázi následující příkaz, kterým tabulku vypustíte StormEvents .

.drop table StormEvents