다음을 통해 공유


큐 대기 수집을 사용하여 데이터를 가져오는 앱을 만들기

적용 대상: ✅Microsoft FabricAzure Data Explorer

Kusto는 일괄 처리 관리자를 통해 수집된 데이터를 최적화하고 일괄 처리하여 대량 데이터 유입을 처리할 수 있습니다. 일괄 처리 관리자는 수집된 데이터를 대상 테이블에 도달하기 전에 집계하여 보다 효율적인 처리와 향상된 성능을 제공합니다. 일괄 처리는 일반적으로 1GB의 원시 데이터, 1,000개의 개별 파일 또는 기본적으로 5분이 초과되는 대량으로 수행됩니다. 일괄 처리 정책은 일반적으로 일괄 처리 시간을 줄이고 대기 시간을 줄이기 위해 데이터베이스 및 테이블 수준에서 업데이트할 수 있습니다. 수집 일괄 처리에 대한 자세한 내용은 수집 일괄 처리 정책프로그래밍 방식으로 테이블 수준 수집 일괄 처리 정책 변경을 참조하세요.

메모

또한 일괄 처리는 대상 데이터베이스 및 테이블, 수집을 실행하는 사용자 및 수집과 관련된 다양한 속성(예: 특수 태그)과 같은 다양한 요소를 고려합니다.

이 문서에서는 다음 방법을 알아봅니다.

필수 구성 요소

시작하기 전에

  • 다음 방법 중 하나를 사용하여 MyStormEvents 테이블을 만들고 소량의 데이터만 수집하므로 수집 일괄 처리 정책 제한 시간을 10초로 설정합니다.

    1. 관리 명령첫 번째 앱을 실행하여 데이터베이스에서 MyStormEvents이라는 대상 테이블을 만듭니다.
    2. 관리 명령두 번째 앱을 실행하여 수집 일괄 처리 정책 시간 제한을 10초로 설정합니다. 앱을 실행하기 전에 시간 제한 값을 00:00:10으로 변경합니다.

    메모

    새 일괄 처리 정책 설정이 일괄 처리 관리자로 전파되는 데 몇 분 정도 걸릴 수 있습니다.

  • stormevent.csv 샘플 데이터 파일을 다운로드합니다. 파일에는 1,000개의 폭풍 관련 사건 기록이 포함됩니다.

메모

다음 예제에서는 수집된 데이터의 열과 대상 테이블의 스키마 간에 사소한 일치를 가정합니다. 수집된 데이터가 테이블 스키마와 사소하게 일치하지 않는 경우 수집 매핑을 사용하여 데이터 열을 테이블 스키마와 정렬해야 합니다.

파일을 큐에 추가하고 결과를 쿼리합니다.

선호하는 IDE 또는 텍스트 편집기에서 기본 설정 언어에 적합한 규칙을 사용하여 기본 수집 프로젝트 또는 파일을 만듭니다. stormevent.csv 파일을 앱과 동일한 위치에 배치합니다.

메모

다음 예제에서는 두 개의 클라이언트를 사용합니다. 하나는 클러스터를 쿼리하고 다른 하나는 클러스터로 데이터를 수집합니다. 클라이언트 라이브러리가 지원하는 언어의 경우 두 클라이언트가 동일한 사용자 프롬프트 인증자를 공유하므로 각 클라이언트에 대해 하나의 사용자 프롬프트 대신 단일 사용자 프롬프트가 생성됩니다.

다음 코드를 추가합니다.

  1. 클러스터에 연결하고 MyStormEvents 테이블의 행 수를 출력하는 클라이언트 앱을 만듭니다. 각 수집 방법을 적용한 후에 얻은 행 수와 비교하기 위한 기본 기준으로 이 개수를 사용합니다. <your_cluster_uri><your_database> 자리 표시자를 각각 클러스터 URI 및 데이터베이스 이름으로 바꾸십시오.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. 가능한 경우 클러스터 URI와 동일한 인증 자격 증명을 공유하여 데이터 수집 URI를 정의하는 연결 문자열 작성기 개체를 만듭니다. <your_ingestion_uri> 자리 표시자를 데이터 입력 URI로 바꿉니다.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. 일괄 처리 큐에 stormevent.csv 파일을 추가하여 전송합니다. 다음 개체 및 속성을 사용합니다.

    • QueuedIngestClient를 생성하여 수집 클라이언트를 만듭니다.
    • IngestionProperties를 수집 속성을 설정할 있습니다.
    • DataFormat을 사용하여 파일 형식을 CSV으로 지정합니다.
    • ignore_first_record 다음 논리를 사용하여 CSV 및 유사한 파일 형식의 첫 번째 행이 무시되는지 여부를 지정합니다.
      • True: 첫 번째 행은 무시됩니다. 표 형식 텍스트 데이터에서 머리글 행을 삭제하려면 이 옵션을 사용합니다.
      • False: 첫 번째 행이 일반 행으로 처리됩니다.

    메모

    파일 수집은 최대 파일 크기가 6GB까지 지원됩니다. 100MB에서 1GB 사이의 파일을 수집하는 것이 좋습니다.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. 파일을 입력한 후 테이블의 행 수를 쿼리하고 마지막으로 입력된 행을 표시합니다.

    메모

    수집이 완료되기까지의 시간을 허용하려면 테이블을 쿼리하기 전에 30초 동안 기다립니다. C#의 경우 60초 동안 대기하여 파일을 비동기적으로 수집 큐에 추가하는 시간을 허용합니다.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

전체 코드는 다음과 같습니다.

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

앱 실행

명령 셸에서 다음 명령을 사용하여 앱을 실행합니다.

# Change directory to the folder that contains the management commands project
dotnet run .

다음과 유사한 결과가 표시됩니다.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

수집을 위해 메모리 내 데이터 큐에 대기하고 결과를 쿼리합니다.

데이터를 포함하는 스트림을 만든 다음 수집을 위해 대기하여 메모리에서 데이터를 수집할 수 있습니다.

예를 들어 다음과 같이 파일 코드에서 수집을 대체하여 앱을 수정할 수 있습니다.

  1. 파일 맨 위의 가져오기 목록에 스트림 설명자 패키지를 추가합니다.

    추가 패키지는 필요하지 않습니다.

  2. 수집할 데이터와 함께 메모리 내 문자열을 추가합니다.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. 메모리 내 문자열에 머리글 행이 없으므로 첫 번째 레코드를 무시하지 않도록 수집 속성을 설정합니다.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. 일괄 처리 큐에 추가하여 메모리 내 데이터를 수집합니다. 가능한 경우 원시 데이터의 크기를 제공합니다.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

업데이트된 코드의 개요는 다음과 같습니다.

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

앱을 실행할 때 다음과 유사한 결과가 표시됩니다. 데이터 수집 후, 테이블의 행 수가 한 줄 증가한 것을 확인하세요.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

수집을 위해 Blob 큐에 대기하고 결과를 쿼리합니다.

Azure Storage Blob, Azure Data Lake 파일 및 Amazon S3 파일에서 데이터를 수집할 수 있습니다.

예를 들어, 앱에서 메모리에서 수집을 다음의 코드로 교체하여 수정할 수 있습니다.

  1. 먼저 스토리지 계정에 stormevent.csv 파일을 업로드하고 읽기 권한이 있는 URI를 생성합니다(예: Azure Blob에 대한 SAS 토큰 사용).

  2. 파일 맨 위에 있는 가져오기 목록에 Blob 설명자 패키지를 추가합니다.

    추가 패키지는 필요하지 않습니다.

  3. Blob URI를 사용하여 Blob 설명자를 만들고, 수집 속성을 설정한 다음, Blob에서 데이터를 수집합니다. <your_blob_uri> 자리 표시자를 Blob URI로 바꿉다.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

업데이트된 코드의 개요는 다음과 같습니다.

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

앱을 실행할 때 다음과 유사한 결과가 표시됩니다. 수집이 완료된 후, 테이블의 행 수가 1,000개 증가했습니다.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

다음 단계

KQL 빠른 참조