Прием данных с помощью пакета SDK для .NET Kusto
Существует две клиентские библиотеки для .NET: библиотека приема и библиотека данных. Дополнительные сведения о пакете SDK .NET см. в разделе Сведения о пакете SDK .NET. Они позволяют принимать (загружать) данные в кластер и запрашивать данные из кода. В этой статье вы сначала создадите таблицу и сопоставление данных в тестовом кластере. Затем вы поставите в очередь прием данных в кластер и проверите результаты.
Необходимые компоненты
- Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
- Кластер и база данных. Создайте кластер и базу данных.
Установка библиотеки приема
Install-Package Microsoft.Azure.Kusto.Ingest
Добавление проверки подлинности и создание строки подключения
Проверка подлинности
Для проверки подлинности приложения пакет SDK использует идентификатор клиента Microsoft Entra. Чтобы найти идентификатор клиента, используйте следующий URL-адрес, заменив домен на имя_вашего_домена.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Например, если ваш домен называется contoso.com, URL-адрес будет следующим: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Щелкните этот URL-адрес, чтобы просмотреть результаты. Первая строка выглядит следующим образом:
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
В данном случае идентификатор клиента — 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
В этом примере для доступа к кластеру используется интерактивная проверка подлинности пользователей Microsoft Entra. Вы также можете использовать проверку подлинности приложения Microsoft Entra с секретом сертификата или приложения. Обязательно задайте правильные значения для tenantId
и clusterUri
перед выполнением этого кода.
Пакет SDK предоставляет удобный способ настройки метода проверки подлинности в рамках строка подключения. Полные сведения о строка подключения см. в строка подключения.
Примечание.
Текущая версия пакета SDK не поддерживает интерактивную проверку подлинности пользователя в .NET Core. При необходимости используйте вместо этого имя пользователя и пароль Microsoft Entra или проверку подлинности приложения.
Создание строки подключения
Теперь можно создать строка подключения. Целевую таблицу и сопоставление вы создадите позднее.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Определение данных исходного файла
Указание пути к исходному файлу. В этом примере используется пример файла, размещенный в хранилище BLOB-объектов Azure. Пример набора данных StormEvents содержит данные, связанные с погодой, из Национальных центров по экологической информации.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Создание таблицы в тестовом кластере
Создайте таблицу с именем StormEvents
, которая соответствует схеме данных в файле StormEvents.csv
.
Совет
Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента для каждого универсального кода ресурса достаточно даже при работе с несколькими базами данных (база данных может быть указана на уровне команд).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Определение сопоставления приема
Выполните сопоставление входящих данных CSV с именами столбцов, использованными при создании таблицы. Подготовка объекта сопоставления столбца CSV в таблице.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Определите политику пакетной обработки для таблицы.
Пакетная обработка входящих данных оптимизирует размер сегментов данных. Это процесс контролируется политикой пакетного приема данных. Измените политику с помощью команды управления политиками пакетной обработки приема. Используйте эту политику, чтобы сократить задержку медленно поступающих данных.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Мы рекомендуем определить значение Raw Data Size
для принимаемых данных и постепенно уменьшить размер до 250 МБ, при этом проверяя, повысилась ли производительность.
С помощью свойства Flush Immediately
пакетную обработку можно пропустить, хотя это не рекомендуется при приеме большого объема данных, так как это может привести к снижению производительности.
Отправка сообщения в очередь на прием
Поставьте сообщение в очередь, чтобы извлечь данные из хранилища BLOB-объектов и получить эти данные. Подключение устанавливается к кластеру приема, а для работы с этой конечной точкой создается другой клиент.
Совет
Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента для каждого универсального кода ресурса достаточно даже при работе с несколькими базами данных (база данных может быть указана на уровне команд).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Проверка получения данных в таблицу
Подождите пять–десять минут для приема в очереди, чтобы запланировать прием и загрузить данные в кластер. Затем выполните следующий код, чтобы получить количество записей в таблице StormEvents
.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Выполнение запросов по устранению неполадок
Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Очистка ресурсов
Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы очистить таблицу StormEvents
.
.drop table StormEvents