Kusto kan hantera massdataintag genom att optimera och batchinmata inmatade data via batchhanteringshanteraren. Batchhanteraren aggregerar inmatade data innan de når måltabellen, vilket möjliggör effektivare bearbetning och bättre prestanda. Batchbearbetning görs vanligtvis i partier om 1 GB rådata, 1 000 enskilda filer eller en standardtidsgräns på 5 minuter. Batchbearbetningsprinciper kan uppdateras på databas- och tabellnivå, vanligtvis för att sänka batchtiden och minska svarstiden. För mer information om batchinmatning, se IngestionBatching policy och Ändra tabellnivåns batchinmatningspolicy programmatiskt.
Anteckning
Batchbearbetning tar också hänsyn till olika faktorer, till exempel måldatabasen och tabellen, användaren som kör inmatningen och olika egenskaper som är associerade med inmatningen, till exempel särskilda taggar.
Använd någon av följande metoder för att skapa MyStormEvents-tabellen och, eftersom endast en liten mängd data matas in, anger du tidsgränsen för inmatningsbatchprincipen till 10 sekunder:
Skapa en måltabell med namnet MyStormEvents i databasen genom att köra den första appen i hanteringskommandon.
Ange tidsgränsen för principen för batchning av inmatning till 10 sekunder genom att köra den andra appen med hanteringskommandon . Innan du kör appen ändrar du timeout-värdet till 00:00:10.
I frågemiljön skapar du en måltabell med namnet MyStormEvents i databasen genom att köra följande fråga:
Det kan ta några minuter innan de nya principinställningarna för batchbearbetning sprids till batchhanteraren.
Ladda ned stormevent.csv exempeldatafil. Filen innehåller 1 000 stormhändelser.
Not
I följande exempel förutsätts en trivial matchning mellan kolumnerna i inmatade data och schemat för måltabellen.
Om inmatade data inte matchar tabellschemat så måste du använda en inmatningsmappning för att justera datakolumnerna med tabellschemat.
Köa en fil för inmatning och fråga efter resultaten
I önskad IDE eller textredigerare skapar du ett projekt eller en fil med namnet grundläggande inmatning med den konvention som är lämplig för det språk du föredrar. Placera stormevent.csv-filen på samma plats som din app.
Obs
I följande exempel använder du två klienter, en för att fråga klustret och den andra för att mata in data i klustret. För språk där klientbiblioteket har stöd för detta delar båda klienterna samma autentisering med användaruppmaning, vilket resulterar i en enda användaruppmaning i stället för en för varje klient.
Lägg till följande kod:
Skapa en klientapp som ansluter till klustret och skriver ut antalet rader i MyStormEvents tabell. Du använder det här antalet som baslinje för jämförelse med antalet rader efter varje metod för inmatning. Ersätt platshållarna <your_cluster_uri> och <your_database> med klustrets URI respektive databasnamn.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Not
För Node.js appar använder du InteractiveBrowserCredentialNodeOptions i stället för InteractiveBrowserCredentialInBrowserOptions.
Not
Java SDK har för närvarande inte stöd för att båda klienterna delar samma autentisering för användarfrågor, vilket resulterar i en användarprompt för varje klient.
Skapa en anslutningssträngsbyggare som definierar URI:n för datainmatning och använder samma autentiseringsuppgifter som kluster-URI:n där det är möjligt. Ersätt platshållaren <your_ingestion_uri> med URI för datainmatning.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Fråga efter antalet rader i tabellen efter att filen har matats in och visa den sista raden som har matats in.
Obs
Om du vill tillåta att inmatningen slutförs väntar du 30 sekunder innan du kör frågor mot tabellen. För C# väntar du 60 sekunder för att tillåta tid för att lägga till filen i inmatningskön asynkront.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Lagra minnesintern data för inmatning och sök efter resultaten
Du kan mata in data från minnet genom att skapa en ström som innehåller data och sedan köa den för inmatning.
Du kan till exempel ändra appen genom att ersätta inmatning från fil-kod på följande sätt:
Lägg till stream descriptor-paketet i importerna överst i filen.
När du kör appen bör du se ett resultat som liknar följande. Observera att efter inmatningen ökade antalet rader i tabellen med en.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Schemalägg en blob för inmatning och hämta resultaten
Du kan mata in data från Azure Storage-blobar, Azure Data Lake-filer och Amazon S3-filer.
Till exempel kan du modifiera appen genom att ersätta inmatningen från minneskoden med följande:
Börja med att ladda upp stormevent.csv-filen till ditt lagringskonto och generera en URI med läsbehörighet, till exempel med hjälp av en SAS-token för Azure-blobbar.
Lägg till blobdeskriptorpaketet till importerna överst i filen.
Skapa en blobbeskrivning med hjälp av blob-URI:n, ange inmatningsegenskaperna och mata sedan in data från bloben. Ersätt platshållaren <your_blob_uri> med blob-URI:n.
När du kör appen bör du se ett resultat som liknar följande. Observera att efter inmatningen ökade antalet rader i tabellen med 1 000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}