Kusto é capaz de lidar com a ingestão de dados em massa, otimizando e agrupando os dados ingeridos por meio de seu gerenciador de lotes. O gestor de lotes agrega os dados ingeridos antes de atingirem a tabela de destino, permitindo um processamento mais eficiente e um melhor desempenho. O processamento em lote geralmente é feito em volumes de 1 GB de dados brutos, mil ficheiros individuais ou, por padrão, um tempo de espera de 5 minutos. As políticas de processamento em lote podem ser atualizadas nos níveis de banco de dados e tabela, geralmente para diminuir o tempo de processamento em lote e reduzir a latência. Para obter mais informações sobre o processamento em lote de ingestão, consulte política IngestionBatching e Alterar a política de lote de ingestão de nível de tabela programaticamente.
Observação
O processamento em lote também leva em conta vários fatores, como o banco de dados e a tabela de destino, o usuário que executa a ingestão e várias propriedades associadas à ingestão, como tags especiais.
Use um dos seguintes métodos para criar a tabela MyStormEvents e, como apenas uma pequena quantidade de dados está sendo ingerida, defina o tempo limite da política de lote de ingestão para 10 segundos:
Crie uma tabela de destino chamada MyStormEvents no seu banco de dados executando a primeira aplicação em comandos de gerenciamento.
Defina o tempo limite da política de lote de ingestão para 10 segundos, executando a segunda aplicação nos comandos de gestão . Antes de executar o aplicativo, altere o valor de tempo limite para 00:00:10.
Em seu ambiente de consulta, crie uma tabela de destino chamada MyStormEvents em seu banco de dados executando a seguinte consulta:
Pode levar alguns minutos para que as novas configurações de política de lote se propaguem para o gerenciador de lotes.
Baixe o arquivo de dados de exemplo stormevent.csv. O arquivo contém 1.000 registros de eventos de tempestade.
Observação
Os exemplos a seguir assumem uma correspondência trivial entre as colunas dos dados ingeridos e o esquema da tabela de destino.
Se os dados ingeridos não corresponderem trivialmente ao esquema da tabela, você deverá usar um mapeamento de ingestão para alinhar as colunas dos dados com o esquema da tabela.
Enfileirar um ficheiro para ingestão e consultar os resultados
Em seu IDE ou editor de texto preferido, crie um projeto ou arquivo chamado ingestão básica usando a convenção apropriada para seu idioma preferido. Coloque o arquivo stormevent.csv no mesmo local do seu aplicativo.
Observação
Nos exemplos a seguir, você usa dois clientes, um para consultar seu cluster e outro para ingerir dados em seu cluster. Para idiomas em que a biblioteca do cliente oferece suporte a ele, ambos os clientes compartilham o mesmo autenticador de prompt do usuário, resultando em um único prompt de usuário em vez de um para cada cliente.
Adicione o seguinte código:
Crie uma aplicação cliente que se ligue ao seu cluster e imprima o número de linhas na tabela MyStormEvents. Você usará essa contagem como uma linha de base para comparação com o número de linhas após cada método de ingestão. Substitua os marcadores de posição <your_cluster_uri> e <your_database> pelo URI do cluster e pelo nome do banco de dados, respectivamente.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Observação
Para aplicativos Node.js, use InteractiveBrowserCredentialNodeOptions em vez de InteractiveBrowserCredentialInBrowserOptions.
Observação
Atualmente, o Java SDK não suporta ambos os clientes que compartilham o mesmo autenticador de prompt de usuário, resultando em um prompt de usuário para cada cliente.
Crie um objeto construtor de cadeia de conexão que defina o URI de ingestão de dados, sempre que possível, usando o compartilhamento das mesmas credenciais de autenticação que o URI do cluster. Substitua o espaço reservado <your_ingestion_uri> pelo URI de ingestão de dados.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Consulte o número de linhas na tabela após a ingestão do arquivo e mostre a última linha ingerida.
Observação
Para dar tempo para que a ingestão seja concluída, aguarde 30 segundos antes de consultar a tabela. Para C#, aguarde 60 segundos para dar tempo de adicionar o arquivo à fila de ingestão de forma assíncrona.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Enfileirar dados na memória para ingestão e consultar os resultados
Você pode ingerir dados da memória criando um fluxo contendo os dados e, em seguida, enfileirando-os para ingestão.
Por exemplo, você pode modificar o aplicativo substituindo a ingestão de do código de de arquivo, da seguinte maneira:
Adicione o pacote do descritor de fluxo às importações na parte superior do arquivo.
Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em um.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Colocar um blob em fila para processamento e consultar os resultados
Você pode ingerir dados de blobs de Armazenamento do Azure, arquivos do Azure Data Lake e arquivos do Amazon S3.
Por exemplo, poderá modificar a aplicação substituindo o componente de ingestão do código de memória pelo seguinte:
Comece carregando o arquivo stormevent.csv em sua conta de armazenamento e gere um URI com permissões de leitura, por exemplo, usando um token SAS para blobs do Azure.
Adicione o pacote descritor de blob às importações na parte superior do arquivo.
Crie um descritor de blob usando o URI de blob, defina as propriedades de ingestão e, em seguida, ingira dados do blob. Substitua o marcador <your_blob_uri> pelo URI do blob.
Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em 1.000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}