Usługa Kusto jest w stanie obsługiwać masowe pobieranie danych przez optymalizację i przetwarzanie wsadowe pozyskanych danych za pośrednictwem menedżera przetwarzania wsadowego. Menedżer wsadowy agreguje pozyskane dane przed dotarciem do tabeli docelowej, co pozwala na bardziej efektywne przetwarzanie i zwiększoną efektywność. Przetwarzanie wsadowe jest zazwyczaj realizowane w partiach po 1 GB danych pierwotnych, 1000 pojedynczych plików lub z domyślnym limitem czasu wynoszącym 5 minut. Zasady dzielenia na partie można aktualizować na poziomach bazy danych i tabeli, co zwykle zmniejsza czas dzielenia na partie i zmniejsza opóźnienie. Aby uzyskać więcej informacji na temat grupowania pozyskiwania, zobacz zasady grupowania pozyskiwania i programową zmianę zasad pozyskiwania danych na poziomie tabeli .
Notatka
Przetwarzanie wsadowe uwzględnia również różne czynniki, takie jak docelowa baza danych i tabela, użytkownik uruchamiający proces ładowania danych oraz różne właściwości związane z tym procesem, takie jak specjalne tagi.
Z tego artykułu dowiesz się, jak wykonywać następujące działania:
Użyj jednej z następujących metod, aby utworzyć tabelę MyStormEvents i, ponieważ przesyłana jest tylko niewielka ilość danych, ustaw limit czasu zasad przesyłania danych na 10 sekund.
Utwórz tabelę docelową o nazwie MyStormEvents w bazie danych, uruchamiając pierwszą aplikację w poleceniach zarządzania .
Ustaw limit czasu zasady dzielenia na partie przy pozyskiwaniu na 10 sekund, uruchamiając drugą aplikację w poleceniach zarządzających . Przed uruchomieniem aplikacji zmień wartość limitu czasu na 00:00:10.
W środowisku zapytań utwórz tabelę docelową o nazwie MyStormEvents w bazie danych, uruchamiając następujące zapytanie:
Propagowanie nowych ustawień zasad grupowania do menedżera wsadowego może potrwać kilka minut.
Pobierz przykładowy plik danych stormevent.csv. Plik zawiera 1000 rekordów zdarzeń burzowych.
Notatka
W poniższych przykładach przyjęto założenie, że trywialne dopasowanie występuje między kolumnami pozyskanych danych a schematem tabeli docelowej.
Jeśli pozyskane dane nie pasują w oczywisty sposób do schematu tabeli, należy użyć mapowania wczytywania danych, aby wyrównać kolumny danych ze schematem tabeli.
Kolejkowanie pliku do wczytywania i zapytań o wyniki
W preferowanym środowisku IDE lub edytorze tekstów utwórz projekt lub plik o nazwie podstawowego ingestowania zgodnie z konwencjami preferowanego języka. Umieść plik stormevent.csv w tej samej lokalizacji co aplikacja.
Notatka
W poniższych przykładach używasz dwóch klientów, jeden do wykonywania zapytań względem klastra, a drugi do pozyskiwania danych do klastra. W przypadku języków, w których biblioteka klienta go obsługuje, oba klienci współdzielą ten sam monit o uwierzytelnienia użytkownika, co powoduje wyświetlenie jednego monitu użytkownika zamiast jednego dla każdego klienta.
Dodaj następujący kod:
Utwórz aplikację kliencką, która łączy się z klastrem i wyświetla liczbę wierszy w tabeli MyStormEvents. Użyjesz tej liczby jako punktu odniesienia do porównania z liczbą wierszy po każdej metodzie przetwarzania. Zastąp symbole zastępcze <your_cluster_uri> i <your_database> odpowiednio swoim identyfikatorem URI klastra i nazwą bazy danych.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Notatka
W przypadku aplikacji Node.js użyj InteractiveBrowserCredentialNodeOptions zamiast InteractiveBrowserCredentialInBrowserOptions.
Notatka
Java SDK nie obsługuje obecnie sytuacji, w której dwóch klientów korzysta z tego samego autentykatora monitu użytkownika, co skutkuje pojawieniem się monitu dla każdego klienta z osobna.
Utwórz obiekt konstruktora parametrów połączenia, który definiuje identyfikator URI pozyskiwania danych, jeśli to możliwe, przy użyciu współużytkowania tych samych poświadczeń uwierzytelniania co identyfikator URI klastra. Zastąp symbol zastępczy <your_ingestion_uri> identyfikatorem URI pozyskiwania danych.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Wykonaj zapytanie dotyczące liczby wierszy w tabeli po pozyskaniu pliku i wyświetl ostatni wiersz pozyskany.
Notatka
Aby umożliwić ukończenie przetwarzania danych, poczekaj 30 sekund przed wykonaniem zapytania do tabeli. W przypadku języka C# poczekaj 60 sekund, aby umożliwić asynchronicznie dodanie pliku do kolejki pozyskiwania.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Powinien zostać wyświetlony wynik podobny do następującego:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Umieść dane w pamięci w kolejkę na potrzeby przetwarzania i zapytaj o wyniki
Dane można pozyskiwać z pamięci, tworząc strumień zawierający dane, a następnie kolejkując je do pozyskiwania.
Na przykład można zmodyfikować aplikację, zastępując pobieranie danych z pliku kodem w następujący sposób:
Dodaj pakiet deskryptora strumienia do importów w górnej części pliku.
Po uruchomieniu aplikacji powinien zostać wyświetlony wynik podobny do poniższego. Zwróć uwagę, że po załadowaniu liczba wierszy w tabeli wzrosła o jeden.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Ustaw obiekt blob w kolejce do pozyskiwania i zapytaj o wyniki
Dane można pozyskiwać z obiektów blob usługi Azure Storage, plików usługi Azure Data Lake i plików Amazon S3.
Na przykład można zmodyfikować aplikację, zastępując kod przyjmowania danych z pamięci następującym kodem:
Zacznij od przekazania pliku stormevent.csv na konto przechowywania i tworząc identyfikator URI z uprawnieniami do odczytu, na przykład przy użyciu tokenu SAS dla obiektów blob platformy Azure.
Dodaj pakiet deskryptora obiektów blob do importów w górnej części pliku.
Utwórz deskryptor bloba przy użyciu identyfikatora URI bloba, ustaw właściwości ładowania, a następnie załaduj dane z bloba. Zastąp symbol zastępczy <your_blob_uri> identyfikatorem URI obiektu blob.
Po uruchomieniu aplikacji powinien zostać wyświetlony wynik podobny do poniższego. Zwróć uwagę, że po załadowaniu danych liczba wierszy w tabeli wzrosła o 1 000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}