次の方法で共有


Kusto .NET SDK を使用してデータを取り込む

.NET には、取り込みライブラリデータ ライブラリの 2 つのクライアント ライブラリがあります。 .NET SDK の詳細については、.NET SDKについての記事を参照してください。 これらのライブラリを使用すると、クラスターにデータを取り込み (読み込み)、コードからデータのクエリを行うことができます。 この記事ではまず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、クラスターに対するインジェストをキューに入れて、結果を検証します。

前提条件

取り込みライブラリをインストールする

Install-Package Microsoft.Azure.Kusto.Ingest

認証の追加と接続文字列の作成

認証

アプリケーションを認証するために、SDK は Microsoft Entra テナント ID を使用します。 テナント ID を検索するには、次の URL を使用し、ドメインを YourDomain に置き換えます。

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

たとえば、ドメインが contoso.com の場合、URL は https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/ になります。 結果を表示するには、この URL をクリックします。最初の行は次のとおりです。

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

この場合のテナント ID は 6babcaad-604b-40ac-a9d7-9fd97c0b779f です。

この例では、対話型の Microsoft Entra ユーザー認証を使用してクラスターにアクセスします。 証明書またはアプリケーション シークレットを使用した Microsoft Entra アプリケーション認証を使用することもできます。 このコードを実行する前に、tenantIdclusterUriに正しい値を設定してください。

SDK には、接続文字列の一部として認証方法を設定する便利な方法が用意されています。 接続文字列の完全なドキュメントについては、接続文字列に関するページを参照してください。

Note

現在のバージョンの SDK では、.NET Core での対話型のユーザー認証はサポートされていません。 必要な場合は、代わりに Microsoft Entra のユーザー名/パスワードまたはアプリケーション認証を使用してください。

接続文字列を作成する

これで、接続文字列を作成できるようになりました。 ターゲット テーブルとマッピングは後のステップで作成します。

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

ソース ファイルの情報を設定する

ソース ファイルのパスを設定します。 この例では、Azure Blob Storage でホストされているサンプル ファイルを使います。 StormEvents サンプル データセットには、National Centers for Environmental Information から入手した気象関連データが含まれています。

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

テスト クラスターにテーブルを作成する

StormEvents.csv ファイル内のデータのスキーマと一致する、StormEvents という名前のテーブルを作成します。

ヒント

次のコード スニペットは、ほぼすべての呼び出しに対してクライアントのインスタンスを作成します。 これは、各スニペットを個別に実行できるようにするためです。 実稼働環境では、クライアント インスタンスは再入可能であり、必要な限り保持する必要があります。 複数のデータベースを使用する場合でも、URI ごとに 1 つのクライアントインスタンスで十分です (データベースはコマンド レベルで指定できます)。

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

インジェストのマッピングを定義する

受信した CSV データを、テーブル作成時に使用される列名にマップします。 CSV 列マッピング オブジェクトをそのテーブルにプロビジョニングします。

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

テーブルのバッチ処理ポリシーを定義する

バッチ処理の受信データにより、データのシャード サイズが最適化されます。これは、インジェスト バッチ処理ポリシーにより制御されます。 インジェスト バッチ処理ポリシー管理コマンドを使用してポリシーを変更します。 このポリシーを使用すると、低速なデータの待機時間を短縮できます。

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

取り込まれたデータの Raw Data Size の値を定義して、サイズを 250 MB に段階的に減らしながら、パフォーマンスが向上するかどうかを確認することをお勧めします。

Flush Immediately プロパティを使用してバッチ処理をスキップすることもできますが、これはパフォーマンスが低下する可能性があるため、大規模なインジェストにはお勧めしません。

取り込みのためにメッセージをキューに入れる

BLOB ストレージからデータをプルし、そのデータを取り込むために、メッセージをキューに入れます。 インジェスト クラスターへの接続が確立され、そのエンドポイントを使用するために別のクライアントが作成されます。

ヒント

次のコード スニペットは、ほぼすべての呼び出しに対してクライアントのインスタンスを作成します。 これは、各スニペットを個別に実行できるようにするためです。 実稼働環境では、クライアント インスタンスは再入可能であり、必要な限り保持する必要があります。 複数のデータベースを使用する場合でも、URI ごとに 1 つのクライアントインスタンスで十分です (データベースはコマンド レベルで指定できます)。

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

データがテーブルに取り込まれたことを確認する

キューに登録されたインジェストで取り込みがスケジュールされ、クラスターにデータが読み込まれるまで 5 から 10 分待ちます。 その後、次のコードを実行して、StormEvents テーブル内のレコードの数を取得します。

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

トラブルシューティングのクエリを実行する

https://dataexplorer.azure.com にサインインして、クラスターに接続します。 データベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

リソースをクリーンアップする

他の記事に進む場合は、作成したリソースをそのままにします。 行わない場合は、データベースで次のコマンドを実行して、StormEvents テーブルをクリーンアップします。

.drop table StormEvents