Kusto dokáže zpracovat hromadný příjem dat optimalizací a dávkováním přijatých dat prostřednictvím svého správce dávkování. Manažer dávkování agreguje přijatá data předtím, než dosáhnou cílové tabulky, což umožňuje efektivnější zpracování a zvýšený výkon. Dávkování se obvykle provádí ve skupinách po 1 GB nezpracovaných dat, po 1 000 jednotlivých souborech, nebo je ve výchozím nastavení časový limit 5 minut. Zásady dávkování je možné aktualizovat na úrovni databáze a tabulky, což obvykle snižuje dobu dávkování a snižuje latenci. Další informace o dávkování příjmu najdete v tématu Zásady dávkového příjmu a Změnit zásady dávkového příjmu na úrovni tabulky programově.
Poznámka
Dávkování také bere v úvahu různé faktory, jako je cílová databáze a tabulka, uživatel, který spouští příjem dat, a různé vlastnosti spojené s příjmem dat, jako jsou speciální značky.
Pomocí jedné z následujících metod vytvořte MyStormEvents tabulku a vzhledem k tomu, že se ingestuje jenom malé množství dat, nastavte časový limit zásad dávkování příjmu na 10 sekund:
Vytvořte cílovou tabulku s názvem MyStormEvents ve vaší databázi spuštěním první aplikace v příkazů pro správu.
Nastavte časový limit zásady dávkování ingesce na 10 sekund tak, že spustíte druhou aplikaci v rámci příkazů pro správu. Před spuštěním aplikace změňte hodnotu časového limitu na 00:00:10.
V prostředí dotazu vytvořte cílovou tabulku s názvem MyStormEvents v databázi spuštěním následujícího dotazu:
Rozšíření nového nastavení zásad dávkování do správce dávek může trvat několik minut.
Stáhněte si ukázkový datový soubor stormevent.csv. Soubor obsahuje 1 000 záznamů bouřkových událostí.
Poznámka
Následující příklady předpokládají triviální shodu mezi sloupci přijatých dat a schématem cílové tabulky.
Pokud ingestované data triviálně neodpovídají schématu tabulky, musíte k zarovnání sloupců dat se schématem tabulky použít mapování příjmu dat.
Zařadíte soubor do fronty pro příjem dat a odešlete dotaz na výsledky.
Ve vámi preferovaném integrovaném vývojovém prostředí (IDE) nebo textovém editoru vytvořte projekt nebo soubor s názvem základní příjem dat pomocí konvence vhodné pro váš preferovaný jazyk. Umístěte stormevent.csv soubor do stejného umístění jako vaše aplikace.
Poznámka
V následujících příkladech používáte dva klienty, jeden na dotazování vašeho clusteru a druhý na vložení dat do vašeho clusteru. Pro jazyky, ve kterých ji klientská knihovna podporuje, sdílejí oba klienti stejný ověřovací program výzvy uživatele, což vede k zobrazení výzvy jednoho uživatele místo jednoho pro každého klienta.
Přidejte následující kód:
Vytvořte klientskou aplikaci, která se připojí ke clusteru, a vytiskne počet řádků v tabulce MyStormEvents. Tento počet použijete jako základní hodnotu pro porovnání s počtem řádků po každé metodě ingestu. Nahraďte zástupné symboly <your_cluster_uri> a <your_database> identifikátorem URI clusteru a názvem databáze.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Poznámka
Používejte pro Node.js aplikace InteractiveBrowserCredentialNodeOptions místo InteractiveBrowserCredentialInBrowserOptions.
Poznámka
Sada Java SDK v současné době nepodporuje sdílení ověřovacího nástroje pro uživatelskou výzvu mezi klienty, což vede k zobrazení výzvy pro každého klienta zvlášť.
Vytvořte objekt tvůrce připojovacích řetězců, který definuje identifikátor URI příjmu dat, pokud je to možné, pomocí sdílení stejných přihlašovacích údajů pro ověření jako identifikátor URI clusteru. Nahraďte zástupný symbol <your_ingestion_uri> adresou URI pro příjem dat.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Po ingestování souboru zadejte dotaz na počet řádků v tabulce a zobrazte poslední přijatý řádek.
Poznámka
Pokud chcete umožnit dokončení příjmu dat, počkejte 30 sekund před dotazováním tabulky. Počkejte 60 sekund v jazyce C#, abyste umožnili asynchronní přidání souboru do fronty příjmu dat.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Měl by se zobrazit výsledek podobný následujícímu:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Fronta dat v paměti pro příjem dat a dotazování výsledků
Data z paměti můžete ingestovat vytvořením datového proudu obsahujícího data a následným řazením do fronty pro příjem dat.
Například můžete upravit aplikaci tak, že nahradíte importování z kódu souboru takto:
Přidejte balíček popisovače streamu do importů v horní části souboru.
Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o jeden.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Vytvoření fronty objektu blob pro příjem dat a dotazování výsledků
Můžete ingestovat data z objektů blob služby Azure Storage, souborů Azure Data Lake a souborů Amazon S3.
Aplikaci můžete například upravit tak, že ingestujete z paměti kód následujícím kódem:
Začněte tím, že nahrajete soubor stormevent.csv do účtu úložiště a vygenerujete identifikátor URI s oprávněními ke čtení, například pomocí tokenu SAS pro objekty blob Azure.
Přidejte balíček popisovače objektů blob do importů v horní části souboru.
Pomocí identifikátoru URI objektu blob vytvořte popisovač objektů blob, nastavte vlastnosti příjmu dat a pak ingestujte data z objektu blob. Nahraďte zástupný symbol <your_blob_uri> identifikátorem URI blobu.
Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o 1 000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}