共用方式為


建立一個應用程式來使用排隊引入功能以獲取數據

適用於:✅Microsoft FabricAzure 數據總管

Kusto 能夠透過批次管理員來優化和批次處理接收的數據,以處理大規模數據。 批處理管理員會在數據到達目標數據表之前匯總內嵌的數據,以便更有效率地處理及改善效能。 批處理通常是以 1 GB 的原始數據、1000 個個別檔案,或預設逾時 5 分鐘來完成。 批處理原則可以在資料庫和數據表層級更新,通常可降低批處理時間並減少延遲。 如需有關批次擷取的詳細資訊,請參閱 批次擷取原則以程式方式更改資料表層級的批次擷取原則

注意

批處理也會考慮各種因素,例如目標資料庫和數據表、執行擷取的使用者,以及與擷取相關聯的各種屬性,例如特殊標記。

在本文中,您將瞭解如何:

先決條件

開始之前

  • 使用下列其中一種方法來建立 MyStormEvents 數據表,而且,由於只擷取少量的數據,將其擷取批處理原則逾時設定為 10 秒:

    1. 執行管理命令中的第一個應用程式,以在您的資料庫中建立名為 MyStormEvents 的目標資料表。
    2. 管理命令中執行第二個應用程式,將擷取批處理原則逾時設定為 10 秒。 在執行應用程式之前,請將逾時值變更為 00:00:10

    注意

    可能需要幾分鐘的時間,新的批處理原則設定才會傳播至批處理管理員。

  • 下載 stormevent.csv 範例數據檔。 檔案包含 1,000 筆暴風雨事件記錄。

注意

下列範例假設已匯入資料的欄位與目標資料表的架構之間有簡單的匹配。 如果擷取的數據與數據表架構不完全相符,您必須使用匯入對應來對應數據中的欄位與數據表架構。

將檔案排入佇列以擷取並查詢結果

在慣用的 IDE 或文本編輯器中,使用適合您慣用語言的慣例,建立名為 基本擷取 的項目或檔案。 將 stormevent.csv 檔案放在與您的應用程式相同的位置。

注意

在下列範例中,您會使用兩個用戶端,一個用來查詢您的叢集,另一個用來將數據內嵌到您的叢集中。 針對用戶端程式庫支援的語言,兩個客戶端會共享相同的使用者提示驗證器,因此會出現一個單一的使用者提示,而不是每個客戶端各有一個提示。

新增下列程式代碼:

  1. 建立用戶端應用程式,以連線到您的叢集,並列印 MyStormEvents 資料表中的數據列數目。 您將使用此計數作為比較基準,以比較每個擷取方法之後的數據列數目。 將 <your_cluster_uri><your_database> 佔位符分別替換為您的叢集 URI 和資料庫名稱。

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. 建立連接字串產生器物件,以盡可能使用共用與叢集 URI 相同的驗證認證來定義數據擷取 URI。 將 <your_ingestion_uri> 占位符替換為資料匯入 URI。

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. stormevent.csv 檔案新增至批次佇列以進行處理。 您使用以下物件與屬性:

    • QueuedIngestClient,以建立內嵌用戶端。
    • IngestionProperties 來設定擷取屬性。
    • DataFormat,將檔案格式指定為 CSV
    • ignore_first_record 使用下列邏輯,指定是否忽略 CSV 及類似檔案類型中的第一行:
      • True:忽略第一個資料列。 使用此選項可從表格文字資料中移除標題列。
      • False:第一列將作為普通列來處理。

    注意

    匯入支援的最大檔案大小為 6 GB。 建議擷取介於 100 MB 到 1 GB 之間的檔案。

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. 匯入檔案後,查詢數據表中的行數,並顯示最後匯入的行。

    注意

    為了讓資料導入完成,請在查詢資料表之前等候 30 秒。 為了 C#,請等候 60 秒,以便有充裕的時間將檔案異步新增至匯入佇列。

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

完整的程式代碼看起來應該像這樣:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

執行您的應用程式

在命令行介面中,使用下列命令來執行您的應用:

# Change directory to the folder that contains the management commands project
dotnet run .

您應該會看到類似下列的結果:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

將記憶體中的數據排入佇列以便引入和查詢結果

您可以建立包含數據的數據流,然後將它排入佇列以擷取記憶體中的數據。

例如,您可以修改應用程式,替換檔案 的程式碼中的 擷取,如下所示:

  1. 將串流描述符封裝新增到檔案頂部的匯入中。

    不需要其他套件。

  2. 新增含有要處理數據的記憶體字串。

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. 將擷取屬性設定為不要忽略第一筆記錄,因為記憶體內部字串沒有標頭數據列。

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. 藉由將記憶體中的數據新增至批次佇列來輸入。 可能的話,請提供原始數據的大小。

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

更新程式代碼的大綱看起來應該像這樣:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

當您執行應用程式時,應該會看到類似下列的結果。 請注意,匯入後,資料表的行數增加了一個。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

將 Blob 排入佇列以擷取並查詢結果

您可以從 Azure 儲存體 Blob、Azure Data Lake 檔案和 Amazon S3 檔案匯入數據。

例如,您可以這樣修改應用程式:以下程式碼將用來在記憶體 的程式碼中取代 擷取。

  1. 首先,將 stormevent.csv 檔案上傳至記憶體帳戶,併產生具有讀取許可權的 URI,例如,使用 azure Blob 的 SAS 令牌

  2. 將 blob descriptor 套件新增至檔案頂端的匯入。

    不需要其他套件。

  3. 使用 Blob URI 建立 Blob 描述元、設定擷取屬性,然後從 Blob 擷取數據。 將 <your_blob_uri> 佔位符替換為 blob URI。

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

更新程式代碼的大綱看起來應該像這樣:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

當您執行應用程式時,應該會看到類似下列的結果。 請注意,擷取之後,數據表中的數據行數目會增加 1,000。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

下一步