共用方式為


使用 Kusto .NET SDK 內嵌數據

.NET 有兩個 用戶端連結庫:內嵌連結庫資料庫。 如需 .NET SDK 的詳細資訊,請參閱 關於 .NET SDK。 這些連結庫可讓您將數據擷取(載入)到叢集,並從您的程式代碼查詢數據。 在本文中,您會先在測試叢集中建立數據表和數據對應。 接著,您會將擷取佇列至叢集,並驗證結果。

必要條件

  • Microsoft帳戶或Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
  • 叢集和資料庫。 建立叢集和資料庫

安裝內嵌連結庫

Install-Package Microsoft.Azure.Kusto.Ingest

新增驗證和建構 連接字串

驗證

若要驗證應用程式,SDK 會使用您的Microsoft Entra 租使用者標識符。 若要尋找您的租使用者標識碼,請使用下列URL,以您的網域 取代YourDomain

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

例如,如果您的網域是 contoso.com,則URL為: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/。 按兩下此網址以查看結果;第一行如下所示。

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在此情況下,租使用者識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779f

此範例會使用互動式Microsoft Entra 用戶驗證來存取叢集。 您也可以使用 Microsoft Entra 應用程式驗證搭配憑證或應用程式密碼。 在執行此程序代碼之前,請務必為 和 clusterUri 設定正確的值tenantId

SDK 提供方便的方式,將驗證方法設定為 連接字串 的一部分。 如需 連接字串 的完整檔,請參閱 連接字串

注意

SDK 的目前版本不支援 .NET Core 上的互動式用戶驗證。 如有需要,請改用 Microsoft Entra 使用者名稱/密碼或應用程式驗證。

建構 連接字串

現在您可以建構 連接字串。 您將在稍後的步驟中建立目的地數據表和對應。

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

設定來源檔案資訊

設定來源檔案的路徑。 此範例會使用裝載於 Azure Blob 儲存體 上的範例檔案。 StormEvents 範例數據集包含來自國家環境資訊中心的天氣相關數據

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

在測試叢集上建立數據表

建立名為 StormEvents 的數據表,其符合檔案中 StormEvents.csv 數據的架構。

提示

下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定義擷取對應

將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱。 在該數據表上布建 CSV 數據行對應物件

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定義數據表的批處理原則

批處理傳入數據會優化數據分區大小,此大小是由擷取批處理原則控制。 使用 擷取批處理原則管理命令來修改原則。 使用此原則可降低緩慢抵達數據的延遲。

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

建議您定義擷 Raw Data Size 取數據的值,並以累加方式將大小縮減為 250 MB,同時檢查效能是否改善。

您可以使用 Flush Immediately 屬性來略過批處理,雖然不建議進行大規模擷取,因為它可能會導致效能不佳。

將訊息排入佇列以擷取

將訊息排入佇列,以從 Blob 記憶體提取數據並擷取數據。 系統會建立與擷取叢集的連線,並建立另一個客戶端來處理該端點。

提示

下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

驗證數據已內嵌至數據表

等候五到十分鐘,等候佇列擷取排程擷取,並將數據載入您的叢集。 然後執行下列程式代碼以取得數據表中的 StormEvents 記錄計數。

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

執行疑難解答查詢

登入 https://dataexplorer.azure.com 併連線到您的叢集。 在資料庫中執行下列命令,以查看過去四小時內是否有任何擷取失敗。 在執行之前,請先取代資料庫名稱。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

執行下列命令,以檢視過去四小時內所有擷取作業的狀態。 在執行之前,請先取代資料庫名稱。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清除資源

如果您打算遵循我們的其他文章,請保留您所建立的資源。 如果沒有,請在資料庫中執行下列命令來清除 StormEvents 數據表。

.drop table StormEvents