Поделиться через


Создание приложения для получения данных с помощью приема в очереди

Применимо: ✅Microsoft FabricAzure Data Explorer

Kusto может обрабатывать массовое потребление данных путем оптимизации и пакетного приема данных с помощью диспетчера пакетной обработки. Диспетчер пакетной обработки агрегирует принятые данные до того, как они достигнут целевой таблицы, что позволяет повысить эффективность обработки и производительность. Пакетная обработка обычно выполняется партиями по 1 ГБ необработанных данных, 1000 отдельных файлов или с тайм-аутом по умолчанию в 5 минут. Политики пакетной обработки можно обновлять на уровнях базы данных и таблиц, обычно уменьшая время пакетной обработки и уменьшая задержку. Дополнительные сведения о пакетной обработке данных см. в политике IngestionBatching и о программном изменении политики пакетной обработки данных на уровне таблицы .

Заметка

Пакетная обработка также учитывает различные факторы, такие как целевая база данных и таблица, пользователь, выполняющий прием, и различные свойства, связанные с приемом, например специальные теги.

В этой статье описано, как:

Необходимые условия

Перед началом работы

  • Используйте один из следующих методов, чтобы создать таблицу MyStormEvents и, так как выполняется прием только небольшого объема данных, установите время ожидания политики пакетной обработки приема на 10 секунд.

    1. Создайте целевую таблицу с именем MyStormEvents в базе данных, выполнив первое приложение в командах управления .
    2. Задайте время ожидания политики пакетного приема до 10 секунд, запустив второе приложение в командах управления . Перед запуском приложения измените значение времени ожидания на 00:00:10.

    Заметка

    Для распространения новых параметров политики пакетной обработки в диспетчер пакетной обработки может потребоваться несколько минут.

  • Скачайте пример файла данных stormevent.csv. Файл содержит 1 000 записей штормовых событий.

Заметка

В следующих примерах предполагается простое соответствие между столбцами загружаемых данных и схемой таблицы назначения. Если поступающие данные не совпадают с таблицей по схеме, необходимо использовать сопоставление приёма данных для соотнесения столбцов данных со схемой таблицы.

Поставить файл в очередь для загрузки и запросить результаты

В предпочтительном интегрированной среде разработки или текстовом редакторе создайте проект или файл с именем базовый прием с помощью соглашения, подходящего для предпочитаемого языка. Поместите файл stormevent.csv в то же расположение, что и приложение.

Заметка

В следующих примерах используется два клиента, один для запроса кластера и другого для приема данных в кластер. Для языков, в которых клиентская библиотека поддерживает ее, оба клиента совместно используют один и тот же средство проверки подлинности пользовательского запроса, что приводит к одному запросу пользователя вместо одного для каждого клиента.

Добавьте следующий код:

  1. Создайте клиентское приложение, которое подключается к кластеру и выводит количество строк в таблице MyStormEvents. Это число будет использоваться в качестве базового показателя для сравнения с числом строк после каждого метода приема. Замените заполнители <your_cluster_uri> и <your_database> URI кластера и именем базы данных соответственно.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Создайте объект построителя строк подключения, определяющий универсальный код ресурса (URI) приема данных, где это возможно, с помощью общего доступа к тем же учетным данным проверки подлинности, что и URI кластера. Замените заполнитель <your_ingestion_uri> на URI загрузки данных.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Внесите файл stormevent.csv в очередь пакетной обработки. Вы используете следующие объекты и свойства:

    • QueuedIngestClient для создания клиента приема.
    • IngestionProperties, чтобы задать свойства приема.
    • DataFormat, чтобы указать формат файла в виде CSV.
    • ignore_first_record, чтобы указать, игнорируется ли первая строка в CSV и аналогичных типах файлов, используя следующую логику:
      • True: первая строка игнорируется. Используйте этот параметр, чтобы удалить строку заголовка из табличных текстовых данных.
      • False: первая строка обрабатывается как обычная строка.

    Заметка

    Прием поддерживает максимальный размер файла размером 6 ГБ. Рекомендация заключается в приеме файлов в диапазоне от 100 МБ до 1 ГБ.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Узнайте количество строк в таблице после загрузки файла и покажите последнюю строку, которая была добавлена.

    Заметка

    Чтобы дать времени на завершение процесса приема данных, подождите 30 секунд, прежде чем делать запрос к таблице. Для C# подождите 60 секунд, чтобы дать время для асинхронного добавления файла в очередь обработки.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Полный код должен выглядеть следующим образом:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Запуск приложения

В командной оболочке используйте следующую команду для запуска приложения:

# Change directory to the folder that contains the management commands project
dotnet run .

Результат должен выглядеть следующим образом:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Очереди данных в памяти для приема и запроса результатов

Вы можете получать данные из памяти, создавая поток, содержащий данные, а затем добавляя его в очередь для приема.

Например, можно изменить приложение, заменив загрузку из файла на код следующим образом.

  1. Добавьте пакет дескриптора потока в импорт в верхней части файла.

    Дополнительные пакеты не требуются.

  2. Добавьте строку в оперативной памяти с данными для обработки.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Задайте свойства приема, чтобы не игнорировать первую запись, так как строка в памяти не имеет строки заголовка.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Добавляйте данные из памяти в пакетную очередь. По возможности укажите размер необработанных данных.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

Структура обновленного кода должна выглядеть следующим образом:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на один.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Поставьте большой двоичный объект в очередь для обработки и запроса результатов.

Вы можете загружать данные из объектов Blob в хранилище Azure, файлов Azure Data Lake и файлов Amazon S3.

Например, можно изменить приложение, заменив ввод данных из памяти в код следующим образом:

  1. Сначала отправьте файл stormevent.csv в учетную запись хранения и создайте универсальный код ресурса (URI) с разрешениями на чтение, например с помощью маркера SAS для БОЛЬШИХ двоичных объектов Azure.

  2. Добавьте пакет с дескриптором BLOB в раздел импортов в верхней части файла.

    Дополнительные пакеты не требуются.

  3. Создайте дескриптор большого двоичного объекта с помощью URI большого двоичного объекта, задайте свойства приема и приема данных из большого двоичного объекта. Замените заполнитель <your_blob_uri> на URI blob.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

Структура обновленного кода должна выглядеть следующим образом:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на 1000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Следующий шаг