共用方式為


使用 Kusto Java SDK 內嵌數據

Azure 資料總管是快速及可調整的資料探索服務,以取得記錄和遙測資料。 Java 用戶端連結庫可用來內嵌數據、發出管理命令,以及查詢 Azure 數據總管叢集中的數據。

在本文中,瞭解如何使用 Azure 數據總管 Java 連結庫內嵌數據。 首先,您將在測試叢集中建立數據表和數據對應。 然後,您將使用 Java SDK 將 Blob 記憶體的擷取排入佇列至叢集,並驗證結果。

必要條件

檢閱程式碼

本節為選擇性。 檢閱下列代碼段,以瞭解程式代碼的運作方式。 若要略過本節,請移至 執行應用程式

驗證

此程式會使用 Microsoft Entra 驗證認證搭配 ConnectionStringBuilder』。

  1. com.microsoft.azure.kusto.data.Client建立查詢與管理的 。

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. 建立並使用 com.microsoft.azure.kusto.ingest.IngestClient 將數據擷取排入 Azure 數據總管的佇列:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

管理命令

管理命令,例如 .drop.create,是藉由呼叫 execute 對象來執行 com.microsoft.azure.kusto.data.Client

例如,數據表 StormEvents 的建立方式如下:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

資料提取

使用來自現有 Azure Blob 儲存體 容器的檔案進行佇列擷取。

  • 使用 BlobSourceInfo 來指定 Blob 記憶體路徑。
  • 使用 IngestionProperties 來定義數據表、資料庫、對應名稱和數據類型。 在下列範例中,資料類型為 CSV
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

擷取進程會在個別線程中啟動, main 線程會等候擷取線程完成。 此程式使用 CountdownLatch。 擷取 API (IngestClient#ingestFromBlob) 不是異步的。 while迴圈可用來每隔 5 秒輪詢目前狀態,並等候擷取狀態從 Pending 變更為不同的狀態。 最終狀態可以是 SucceededFailedPartiallySucceeded

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

提示

還有其他方法可針對不同的應用程式以異步方式處理擷取。 例如,您可以使用 CompletableFuture 來建立定義動作后擷取的管線,例如查詢數據表,或處理回報至的 IngestionStatus例外狀況。

執行應用程式

一般

當您執行範例程式代碼時,會執行下列動作:

  1. 卸除數據表:StormEvents卸除數據表(如果有的話)。
  2. 數據表建立StormEvents 建立數據表。
  3. 對應建立StormEvents_CSV_Mapping 建立對應。
  4. 檔案擷取:CSV 檔案 (Azure Blob 儲存體) 已排入佇列以進行擷取。

下列範例程式代碼來自 App.java

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

提示

若要嘗試不同的作業組合,請在 中 App.java取消批注/批注個別方法。

執行應用程式

  1. 從 GitHub 複製範例程式代碼:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. 以下列資訊設定服務主體資訊作為程式所使用的環境變數:

    • 叢集端點
    • 資料庫名稱
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. 建置並執行:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    輸出將類似於:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

等候幾分鐘,擷取程式才能完成。 成功完成之後,您會看到下列記錄訊息: Ingestion completed successfully。 此時您可以結束程式並移至下一個步驟,而不會影響已排入佇列的擷取程式。

Validate

等候 5 到 10 分鐘,佇列擷取排程擷取程式,並將數據載入 Azure 數據總管。

  1. 登入 https://dataexplorer.azure.com 併連線到您的叢集。

  2. 執行下列命令以取得資料表中的 StormEvents 記錄計數:

    StormEvents | count
    

疑難排解

  1. 若要查看過去四小時內的擷取失敗,請在您的資料庫上執行下列命令:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. 若要檢視過去四小時內所有擷取作業的狀態,請執行下列命令:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清除資源

如果您不打算使用您已建立的資源,請在資料庫中執行下列命令來卸除 StormEvents 數據表。

.drop table StormEvents