Вставка образца данных в формате JSON в Azure Data Explorer
Статья
В этой статье показано, как загрузить данные в формате JSON в базу данных Azure Data Explorer. Вы начнете с простых примеров необработанного и сопоставленного JSON, продолжите работу с многострочным JSON, а затем перейдете к более сложным схемам JSON, содержащим массивы и словари. В примерах подробно описан процесс приема форматированных данных JSON с помощью язык запросов Kusto (KQL), C#или Python.
Примечание.
Мы не рекомендуем использовать .ingest команды управления в рабочих сценариях. Вместо этого используйте соединитель данных или программным способом прием данных с помощью одной из клиентских библиотек Kusto.
Необходимые компоненты
Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
Azure Data Explorer поддерживает два формата файлов JSON.
json: разделенный строкой JSON. Каждая строка входных данных содержит ровно одну запись JSON. Этот формат поддерживает синтаксический анализ комментариев и одноцитированных свойств. Для получения дополнительной информации см. Строки JSON.
multijson: многострочный JSON. Парсер игнорирует разделители строк и считывает запись с предыдущей позиции до конца допустимого JSON.
Примечание.
При приеме с помощью интерфейса получения данных используется multijsonформат по умолчанию. Формат может обрабатывать многостроные записи JSON и массивы записей JSON. При обнаружении ошибки синтаксического анализа весь файл удаляется. Чтобы игнорировать недопустимые записи JSON, выберите параметр "Игнорировать ошибки формата данных"., который переключит формат json на (строки JSON).
Если вы используете формат строки JSON (json), строки, которые не представляют допустимые записи JSON, пропускаются во время синтаксического анализа.
Получение и сопоставление данных в формате JSON
При приеме данных в формате JSON необходимо указать формат с помощью свойства приема данных. Для приема данных JSON требуется сопоставление, которое сопоставляет исходную запись JSON с ее целевым столбцом. При приеме данных используйте свойство IngestionMapping с его свойством приема ingestionMappingReference (для предварительно определенного отображения) или свойством IngestionMappings. В этой статье будет использоваться свойство приема ingestionMappingReference, которое предварительно определено для таблицы, используемой для приема. В приведенных ниже примерах мы начнем с добавления записей JSON в виде необработанных данных в таблицу с одним столбцом. Затем мы воспользуемся сопоставлением для вставки каждого свойства в сопоставленный столбец.
Простой пример JSON
Следующий пример представляет собой простой JSON с плоской структурой. Данные содержат информацию о температуре и влажности, собираемую несколькими устройствами. Каждая запись помечается идентификатором и отметкой времени.
В этом примере вы загружаете записи JSON как необработанные данные в таблицу с одним столбцом. Обработка данных с использованием запросов и политики обновления выполняется после того, как данные были загружены.
Эта команда создает сопоставление и сопоставляет корневой путь JSON $ столбцу Event.
Прием данных в таблице RawEvents.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Используйте C# для приема данных в необработанном формате JSON.
Создание RawEvents таблицы.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
В этом сопоставлении, как определено схемой таблицы, записи timestamp будут вставлены в столбец Time как типы данных datetime.
Прием данных в таблице Events.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
В файле simple.json есть несколько записей JSON, разделенных строками. Формат — json, а отображение, используемое в команде приема данных, — это созданное вами FlatEventMapping.
Создайте новую таблицу со схемой, аналогичной входным данным JSON. Мы будем использовать эту таблицу для всех следующих примеров и команд вставки.
В этом сопоставлении, как определено схемой таблицы, записи timestamp будут вставлены в столбец Time как типы данных datetime.
Прием данных в таблице Events.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
В файле simple.json есть несколько записей JSON, разделенных строками. Формат — json, а отображение, используемое в команде приема данных, — это созданное вами FlatEventMapping.
Создайте новую таблицу со схемой, аналогичной входным данным JSON. Мы будем использовать эту таблицу для всех следующих примеров и команд вставки.
Файл simple.json содержит несколько записей JSON, разделенных строками. Формат — json, а отображение, используемое в команде приема данных, — это созданное вами FlatEventMapping.
Получение многострочных записей JSON
В этом примере вы загружаете многострочные записи JSON. Каждое свойство JSON сопоставляется с одним столбцом в таблице. В файле Multiined.json есть несколько записей JSON с отступом. multijson Формат указывает на чтение записей по структуре JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Прием данных в таблице Events.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Тип данных "массив" представляет собой упорядоченную коллекцию значений. Получение массива JSON выполняется политикой обновления. JSON загружается как есть в промежуточную таблицу. Политика обновления запускает предопределенную функцию для таблицы RawEvents, повторно вставляя результаты в целевую таблицу. Мы будем принимать данные со следующей структурой:
Создайте функцию update policy, которая расширяет коллекцию records, чтобы каждое значение в коллекции получало отдельную строку, используя оператор mv-expand. Мы будем использовать таблицу RawEvents как исходную, а Events — как целевую.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Схема, полученная функцией, должна соответствовать схеме целевой таблицы. Используйте оператор getschema для просмотра схемы.
EventRecordsExpand() | getschema
Добавьте политику обновления для целевой таблицы. Эта политика автоматически запускает запрос ко всем недавно полученным данным в промежуточной таблице RawEvents и вставляет результаты в таблицу Events. Определите политику нулевого хранения, чтобы избежать сохранения промежуточной таблицы.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Просмотрите данные в таблице Events.
Events
Создайте функцию обновления, которая расширяет коллекцию records, чтобы каждое значение в коллекции получало отдельную строку, используя оператор mv-expand. Мы будем использовать таблицу RawEvents как исходную, а Events — как целевую.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Примечание.
Схема, полученная функцией, должна соответствовать схеме целевой таблицы.
Добавьте политику обновления для целевой таблицы. Эта политика автоматически выполнит запрос ко всем недавно полученным данным в промежуточной таблице RawEvents и загрузит его результаты в таблицу Events. Определите политику нулевого хранения, чтобы избежать сохранения промежуточной таблицы.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Просмотрите данные в таблице Events.
Создайте функцию обновления, которая расширяет коллекцию records, чтобы каждое значение в коллекции получало отдельную строку, используя оператор mv-expand. Мы будем использовать таблицу RawEvents как исходную, а Events — как целевую.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Примечание.
Схема, полученная функцией, должна соответствовать схеме целевой таблицы.
Добавьте политику обновления для целевой таблицы. Эта политика автоматически выполнит запрос ко всем недавно полученным данным в промежуточной таблице RawEvents и загрузит его результаты в таблицу Events. Определите политику нулевого хранения, чтобы избежать сохранения промежуточной таблицы.