次の方法で共有


キューに登録されたインジェストを使用してデータを取得するアプリを作成する

適用対象: ✅Microsoft FabricAzure Data Explorer

Kusto は、バッチ処理マネージャーを使用して取り込まれたデータを最適化してバッチ処理することで、大量のデータ取り込みを処理できます。 バッチ処理マネージャーは、ターゲット テーブルに到達する前に取り込まれたデータを集計し、より効率的な処理とパフォーマンスの向上を実現します。 バッチ処理は通常、1 GB の生データ、1000 個の個々のファイル、または 5 分の既定のタイムアウトで大量に実行されます。 バッチ処理ポリシーは、通常、バッチ処理時間を短縮し、待機時間を短縮するために、データベースレベルとテーブル レベルで更新できます。 インジェスト のバッチ処理の詳細については、「IngestionBatching ポリシーの」および「プログラムでテーブル レベルのインジェスト バッチ処理ポリシーを変更する」を参照してください。

手記

バッチ処理では、ターゲット データベースやテーブル、インジェストを実行しているユーザー、取り込みに関連付けられているさまざまなプロパティ (特殊なタグなど) など、さまざまな要因も考慮されます。

この記事では、次の方法について説明します。

前提 条件

開始する前に

  • 次のいずれかの方法を使用して、MyStormEvents テーブルを作成し、少量のデータのみが取り込まれるので、インジェスト バッチ処理ポリシーのタイムアウトを 10 秒に設定します。

    1. 管理コマンドの最初のアプリを実行して、データベースに ターゲットテーブル MyStormEvents を作成します。
    2. 管理コマンドで 2 つ目のアプリを実行して、インジェスト バッチ ポリシーのタイムアウトを 10 秒設定します。 アプリを実行する前に、タイムアウト値を 00:00:10に変更します。

    手記

    新しいバッチ処理ポリシー設定がバッチ処理マネージャーに反映されるまでに数分かかる場合があります。

  • stormevent.csv サンプル データ ファイルをダウンロードします。 このファイルには、1,000 個の Storm イベント レコードが含まれています。

手記

次の例では、取り込まれたデータの列とターゲット テーブルのスキーマが簡単に一致することを前提としています。 取り込まれたデータがテーブル スキーマと簡単に一致しない場合は、インジェスト マッピングを使用して、データの列をテーブル スキーマに合わせる必要があります。

インジェストのためにファイルをキューに入れ、結果を照会する

任意の IDE またはテキスト エディターで、好みの言語に適した規則 使用して、基本的なインジェスト という名前のプロジェクトまたはファイルを作成します。 stormevent.csv ファイルをアプリと同じ場所に配置します。

手記

次の例では、2 つのクライアントを使用します。1 つはクラスターにクエリを実行し、もう 1 つはクラスターにデータを取り込みます。 クライアント ライブラリでサポートされている言語の場合、両方のクライアントが同じユーザー プロンプト認証システムを共有するため、クライアントごとに 1 つではなく 1 つのユーザー プロンプトが表示されます。

次のコードを追加します。

  1. クラスターに接続し、MyStormEvents テーブルの行数を出力するクライアント アプリを作成します。 この数は、インジェストの各方法の後の行数と比較するためのベースラインとして使用します。 <your_cluster_uri> プレースホルダーと <your_database> プレースホルダーをそれぞれクラスター URI とデータベース名に置き換えます。

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. クラスター URI と同じ認証資格情報を共有して、可能な場合はデータ インジェスト URI を定義する接続文字列ビルダー オブジェクトを作成します。 <your_ingestion_uri> プレースホルダーをデータ インジェスト URI に置き換えます。

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. stormevent.csv ファイルをバッチ キューに追加して取り込みます。 次のオブジェクトとプロパティを使用します。

    • Ingest クライアントを作成するには、QueuedIngestClient を使用します。
    • IngestionProperties。インジェストのプロパティを設定します。
    • DataFormat。ファイル形式を CSV として指定します。
    • 次のロジックを使用してCSVや類似のファイル形式で最初の行を無視するかどうかを指定するには、ignore_first_record を使用します。
      • True: 最初の行は無視されます。 このオプションを使用して、表形式のテキスト データからヘッダー行を削除します。
      • False: 最初の行が通常の行として取り込まれます。

    手記

    インジェストでは、最大ファイル サイズ 6 GB がサポートされます。 100 MB から 1 GB の間のファイルを取り込むのが推奨されます。

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. ファイルの取り込み後にテーブル内の行数を照会し、取り込まれた最後の行を表示します。

    手記

    インジェストが完了するまでの時間を許可するには、テーブルのクエリを実行する前に 30 秒待ちます。 C# では、インジェスト キューに非同期的にファイルを追加する時間を 60 秒待ちます。

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

完全なコードは次のようになります。

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

アプリを実行する

コマンド シェルで、次のコマンドを使用してアプリを実行します。

# Change directory to the folder that contains the management commands project
dotnet run .

次のような結果が表示されます。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

インジェストのためにメモリ内データをキューに入れ、結果を照会する

メモリからデータを取り込むには、データを含むストリームを作成し、インジェスト用にキューに入れる必要があります。

たとえば、次のように、ファイルから取り込まれる コードを置き換えるアプリを変更できます。

  1. ファイルの先頭にあるインポートにストリーム記述子パッケージを追加します。

    追加のパッケージは必要ありません。

  2. 取り込むデータを含むメモリ内文字列を追加します。

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. インジェストのプロパティは、インメモリ文字列にヘッダー行がないため、最初のレコードを無視しないように設定します。

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. インメモリ データをバッチ キューに追加して取り込みます。 可能であれば、生データのサイズを指定します。

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

更新されたコードの概要は次のようになります。

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

アプリを実行すると、次のような結果が表示されます。 インジェスト後、テーブル内の行数が 1 ずつ増加していることに注意してください。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

インジェストのために BLOB をキューに入れ、結果をクエリする

Azure Storage BLOB、Azure Data Lake ファイル、および Amazon S3 ファイルからデータを取り込むことができます。

たとえば、メモリから取り込まれる コードを次のように置き換えて、アプリを変更できます。

  1. まず、stormevent.csv ファイルをストレージ アカウントにアップロードし、読み取りアクセス許可を持つ URI を生成します (たとえば、Azure BLOB の SAS トークン 使用)。

  2. ファイルの先頭にあるインポートに BLOB 記述子パッケージを追加します。

    追加のパッケージは必要ありません。

  3. BLOB URI を使用して BLOB 記述子を作成し、インジェスト プロパティを設定してから、BLOB からデータを取り込みます。 <your_blob_uri> プレースホルダーを BLOB URI に置き換えます。

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

更新されたコードの概要は次のようになります。

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

アプリを実行すると、次のような結果が表示されます。 インジェスト後、テーブル内の行数が 1,000 増加していることに注意してください。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

次の手順