使用 Kusto .NET SDK 內嵌數據
.NET 有兩個 用戶端連結庫:內嵌連結庫 和 資料庫。 如需 .NET SDK 的詳細資訊,請參閱 關於 .NET SDK。 這些連結庫可讓您將數據擷取(載入)到叢集,並從您的程式代碼查詢數據。 在本文中,您會先在測試叢集中建立數據表和數據對應。 接著,您會將擷取佇列至叢集,並驗證結果。
必要條件
- Microsoft帳戶或Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
- 叢集和資料庫。 建立叢集和資料庫。
安裝內嵌連結庫
Install-Package Microsoft.Azure.Kusto.Ingest
新增驗證和建構 連接字串
驗證
若要驗證應用程式,SDK 會使用您的Microsoft Entra 租使用者標識符。 若要尋找您的租使用者標識碼,請使用下列URL,以您的網域 取代YourDomain。
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
例如,如果您的網域是 contoso.com,則URL為: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按兩下此網址以查看結果;第一行如下所示。
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
在此情況下,租使用者識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779f
。
此範例會使用互動式Microsoft Entra 用戶驗證來存取叢集。 您也可以使用 Microsoft Entra 應用程式驗證搭配憑證或應用程式密碼。 在執行此程序代碼之前,請務必為 和 clusterUri
設定正確的值tenantId
。
SDK 提供方便的方式,將驗證方法設定為 連接字串 的一部分。 如需 連接字串 的完整檔,請參閱 連接字串。
注意
SDK 的目前版本不支援 .NET Core 上的互動式用戶驗證。 如有需要,請改用 Microsoft Entra 使用者名稱/密碼或應用程式驗證。
建構 連接字串
現在您可以建構 連接字串。 您將在稍後的步驟中建立目的地數據表和對應。
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
設定來源檔案資訊
設定來源檔案的路徑。 此範例會使用裝載於 Azure Blob 儲存體 上的範例檔案。 StormEvents 範例數據集包含來自國家環境資訊中心的天氣相關數據。
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
在測試叢集上建立數據表
建立名為 StormEvents
的數據表,其符合檔案中 StormEvents.csv
數據的架構。
提示
下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
定義擷取對應
將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱。 在該數據表上布建 CSV 數據行對應物件 。
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
定義數據表的批處理原則
批處理傳入數據會優化數據分區大小,此大小是由擷取批處理原則所控制。 使用 擷取批處理原則管理命令來修改原則。 使用此原則可降低緩慢抵達數據的延遲。
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
建議您定義擷 Raw Data Size
取數據的值,並以累加方式將大小縮減為 250 MB,同時檢查效能是否改善。
您可以使用 Flush Immediately
屬性來略過批處理,雖然不建議進行大規模擷取,因為它可能會導致效能不佳。
將訊息排入佇列以擷取
將訊息排入佇列,以從 Blob 記憶體提取數據並擷取數據。 系統會建立與擷取叢集的連線,並建立另一個客戶端來處理該端點。
提示
下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
驗證數據已內嵌至數據表
等候五到十分鐘,等候佇列擷取排程擷取,並將數據載入您的叢集。 然後執行下列程式代碼以取得數據表中的 StormEvents
記錄計數。
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
執行疑難解答查詢
登入 https://dataexplorer.azure.com 併連線到您的叢集。 在資料庫中執行下列命令,以查看過去四小時內是否有任何擷取失敗。 在執行之前,請先取代資料庫名稱。
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
執行下列命令,以檢視過去四小時內所有擷取作業的狀態。 在執行之前,請先取代資料庫名稱。
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
清除資源
如果您打算遵循我們的其他文章,請保留您所建立的資源。 如果沒有,請在資料庫中執行下列命令來清除 StormEvents
數據表。
.drop table StormEvents