Kusto는 일괄 처리 관리자를 통해 수집된 데이터를 최적화하고 일괄 처리하여 대량 데이터 유입을 처리할 수 있습니다. 일괄 처리 관리자는 수집된 데이터를 대상 테이블에 도달하기 전에 집계하여 보다 효율적인 처리와 향상된 성능을 제공합니다. 일괄 처리는 일반적으로 1GB의 원시 데이터, 1,000개의 개별 파일 또는 기본적으로 5분이 초과되는 대량으로 수행됩니다. 일괄 처리 정책은 일반적으로 일괄 처리 시간을 줄이고 대기 시간을 줄이기 위해 데이터베이스 및 테이블 수준에서 업데이트할 수 있습니다. 수집 일괄 처리에 대한 자세한 내용은 수집 일괄 처리 정책 및 프로그래밍 방식으로 테이블 수준 수집 일괄 처리 정책 변경을 참조하세요.
메모
또한 일괄 처리는 대상 데이터베이스 및 테이블, 수집을 실행하는 사용자 및 수집과 관련된 다양한 속성(예: 특수 태그)과 같은 다양한 요소를 고려합니다.
새 일괄 처리 정책 설정이 일괄 처리 관리자로 전파되는 데 몇 분 정도 걸릴 수 있습니다.
stormevent.csv 샘플 데이터 파일을 다운로드합니다. 파일에는 1,000개의 폭풍 관련 사건 기록이 포함됩니다.
메모
다음 예제에서는 수집된 데이터의 열과 대상 테이블의 스키마 간에 사소한 일치를 가정합니다.
수집된 데이터가 테이블 스키마와 사소하게 일치하지 않는 경우 수집 매핑을 사용하여 데이터 열을 테이블 스키마와 정렬해야 합니다.
파일을 큐에 추가하고 결과를 쿼리합니다.
선호하는 IDE 또는 텍스트 편집기에서 기본 설정 언어에 적합한 규칙을 사용하여 기본 수집 프로젝트 또는 파일을 만듭니다.
stormevent.csv 파일을 앱과 동일한 위치에 배치합니다.
메모
다음 예제에서는 두 개의 클라이언트를 사용합니다. 하나는 클러스터를 쿼리하고 다른 하나는 클러스터로 데이터를 수집합니다. 클라이언트 라이브러리가 지원하는 언어의 경우 두 클라이언트가 동일한 사용자 프롬프트 인증자를 공유하므로 각 클라이언트에 대해 하나의 사용자 프롬프트 대신 단일 사용자 프롬프트가 생성됩니다.
다음 코드를 추가합니다.
클러스터에 연결하고 MyStormEvents 테이블의 행 수를 출력하는 클라이언트 앱을 만듭니다. 각 수집 방법을 적용한 후에 얻은 행 수와 비교하기 위한 기본 기준으로 이 개수를 사용합니다.
<your_cluster_uri> 및 <your_database> 자리 표시자를 각각 클러스터 URI 및 데이터베이스 이름으로 바꾸십시오.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
메모
Node.js 앱의 경우 InteractiveBrowserCredentialInBrowserOptions대신 InteractiveBrowserCredentialNodeOptions 사용합니다.
메모
Java SDK는 현재 동일한 사용자 프롬프트 인증자를 공유하는 두 클라이언트를 지원하지 않으므로 각 클라이언트에 대한 사용자 프롬프트가 생성됩니다.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
수집을 위해 메모리 내 데이터 큐에 대기하고 결과를 쿼리합니다.
데이터를 포함하는 스트림을 만든 다음 수집을 위해 대기하여 메모리에서 데이터를 수집할 수 있습니다.
앱을 실행할 때 다음과 유사한 결과가 표시됩니다. 데이터 수집 후, 테이블의 행 수가 한 줄 증가한 것을 확인하세요.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
수집을 위해 Blob 큐에 대기하고 결과를 쿼리합니다.
Azure Storage Blob, Azure Data Lake 파일 및 Amazon S3 파일에서 데이터를 수집할 수 있습니다.
예를 들어, 앱에서 메모리에서 수집을 다음의 코드로 교체하여 수정할 수 있습니다.
먼저 스토리지 계정에 stormevent.csv 파일을 업로드하고 읽기 권한이 있는 URI를 생성합니다(예: Azure Blob에 대한 SAS 토큰 사용).
앱을 실행할 때 다음과 유사한 결과가 표시됩니다. 수집이 완료된 후, 테이블의 행 수가 1,000개 증가했습니다.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}