Kusto может обрабатывать массовое потребление данных путем оптимизации и пакетного приема данных с помощью диспетчера пакетной обработки. Диспетчер пакетной обработки агрегирует принятые данные до того, как они достигнут целевой таблицы, что позволяет повысить эффективность обработки и производительность. Пакетная обработка обычно выполняется партиями по 1 ГБ необработанных данных, 1000 отдельных файлов или с тайм-аутом по умолчанию в 5 минут. Политики пакетной обработки можно обновлять на уровнях базы данных и таблиц, обычно уменьшая время пакетной обработки и уменьшая задержку. Дополнительные сведения о пакетной обработке данных см. в политике IngestionBatching и о программном изменении политики пакетной обработки данных на уровне таблицы .
Заметка
Пакетная обработка также учитывает различные факторы, такие как целевая база данных и таблица, пользователь, выполняющий прием, и различные свойства, связанные с приемом, например специальные теги.
Используйте один из следующих методов, чтобы создать таблицу MyStormEvents и, так как выполняется прием только небольшого объема данных, установите время ожидания политики пакетной обработки приема на 10 секунд.
Создайте целевую таблицу с именем MyStormEvents в базе данных, выполнив первое приложение в командах управления .
Задайте время ожидания политики пакетного приема до 10 секунд, запустив второе приложение в командах управления . Перед запуском приложения измените значение времени ожидания на 00:00:10.
В среде запроса создайте целевую таблицу с именем MyStormEvents в базе данных, выполнив следующий запрос:
Для распространения новых параметров политики пакетной обработки в диспетчер пакетной обработки может потребоваться несколько минут.
Скачайте пример файла данных stormevent.csv. Файл содержит 1 000 записей штормовых событий.
Заметка
В следующих примерах предполагается простое соответствие между столбцами загружаемых данных и схемой таблицы назначения.
Если поступающие данные не совпадают с таблицей по схеме, необходимо использовать сопоставление приёма данных для соотнесения столбцов данных со схемой таблицы.
Поставить файл в очередь для загрузки и запросить результаты
В предпочтительном интегрированной среде разработки или текстовом редакторе создайте проект или файл с именем базовый прием с помощью соглашения, подходящего для предпочитаемого языка. Поместите файл stormevent.csv в то же расположение, что и приложение.
Заметка
В следующих примерах используется два клиента, один для запроса кластера и другого для приема данных в кластер. Для языков, в которых клиентская библиотека поддерживает ее, оба клиента совместно используют один и тот же средство проверки подлинности пользовательского запроса, что приводит к одному запросу пользователя вместо одного для каждого клиента.
Добавьте следующий код:
Создайте клиентское приложение, которое подключается к кластеру и выводит количество строк в таблице MyStormEvents. Это число будет использоваться в качестве базового показателя для сравнения с числом строк после каждого метода приема. Замените заполнители <your_cluster_uri> и <your_database> URI кластера и именем базы данных соответственно.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Заметка
Для приложений Node.js используйте InteractiveBrowserCredentialNodeOptions вместо InteractiveBrowserCredentialInBrowserOptions.
Заметка
Пакет SDK для Java в настоящее время не поддерживает использование обоими клиентами одного и того же аутентификатора запроса пользователя, что вызывает запрос аутентификации для каждого клиента.
Создайте объект построителя строк подключения, определяющий универсальный код ресурса (URI) приема данных, где это возможно, с помощью общего доступа к тем же учетным данным проверки подлинности, что и URI кластера. Замените заполнитель <your_ingestion_uri> на URI загрузки данных.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Узнайте количество строк в таблице после загрузки файла и покажите последнюю строку, которая была добавлена.
Заметка
Чтобы дать времени на завершение процесса приема данных, подождите 30 секунд, прежде чем делать запрос к таблице. Для C# подождите 60 секунд, чтобы дать время для асинхронного добавления файла в очередь обработки.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Очереди данных в памяти для приема и запроса результатов
Вы можете получать данные из памяти, создавая поток, содержащий данные, а затем добавляя его в очередь для приема.
Например, можно изменить приложение, заменив загрузку из файла на код следующим образом.
Добавьте пакет дескриптора потока в импорт в верхней части файла.
При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на один.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Поставьте большой двоичный объект в очередь для обработки и запроса результатов.
Вы можете загружать данные из объектов Blob в хранилище Azure, файлов Azure Data Lake и файлов Amazon S3.
Например, можно изменить приложение, заменив ввод данных из памяти в код следующим образом:
Сначала отправьте файл stormevent.csv в учетную запись хранения и создайте универсальный код ресурса (URI) с разрешениями на чтение, например с помощью маркера SAS для БОЛЬШИХ двоичных объектов Azure.
Добавьте пакет с дескриптором BLOB в раздел импортов в верхней части файла.
Создайте дескриптор большого двоичного объекта с помощью URI большого двоичного объекта, задайте свойства приема и приема данных из большого двоичного объекта. Замените заполнитель <your_blob_uri> на URI blob.
При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на 1000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}