Прием данных с помощью пакета SDK для Java Kusto
Azure Data Explorer — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Клиентская библиотека Java может использоваться для приема данных, выполнения команд управления и запроса данных в кластерах Azure Data Explorer.
Из этой статьи вы узнаете, как принимать данные с помощью библиотеки Java для Azure Data Explorer. Во-первых, вы создадите таблицу и выполните сопоставление данных в тестовом кластере. Затем вы поместите в очередь прием данных из хранилища BLOB-объектов в кластер с помощью пакета SDK для Java и проверите результаты.
Необходимые компоненты
- Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данных.
- Git.
- JDK 1.8 или более поздней версии.
- Maven.
- Создайте регистрацию приложения и предоставьте ей разрешения на доступ к базе данных. Сохраните идентификатор клиента и секрет клиента для последующего использования.
Просмотр кода
Это необязательный раздел. Ознакомьтесь с приведенными ниже фрагментами кода, чтобы узнать, как работает код. Чтобы пропустить этот раздел, перейдите на страницу Запуск приложения.
Проверка подлинности
Программа использует учетные данные проверки подлинности Microsoft Entra с помощью ConnectionStringBuilder.
Создание
com.microsoft.azure.kusto.data.Client
для запросов и управления.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Создание и использование
com.microsoft.azure.kusto.ingest.IngestClient
для постановки приема данных в очередь Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Команды управления
Команды управления, такие как .drop
и .create
, выполняются путем вызова execute
com.microsoft.azure.kusto.data.Client
объекта.
Например, таблица StormEvents
создается следующим образом:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Прием данных
Прием данных помещается в очередь с помощью файла из существующего контейнера хранилища Blob-объектов Azure.
- Используйте
BlobSourceInfo
, чтобы указать путь к хранилищу BLOB-объектов. - Используйте
IngestionProperties
для определения таблицы, базы данных, имени сопоставления и типа данных. В следующем примере используется тип данныхCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Процесс приема данных начинается в отдельном потоке, а поток main
ожидает завершения потока приема. В этом процессе используется CountdownLatch. API приема (IngestClient#ingestFromBlob
) не является асинхронным. Цикл while
используется для опроса текущего состояния через каждые 5 секунд и ожидает перехода приема из состояния Pending
в другое состояние. Конечным состоянием может быть Succeeded
, Failed
или PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Совет
Существуют и другие методы для асинхронной обработки приема данных для различных приложений. Например, можно использовать CompletableFuture
для создания конвейера, определяющего действие после приема, например запрос таблицы или обработка исключений, переданных в IngestionStatus
.
Выполнение приложения
Общие
Когда вы запускаете пример кода, выполняются следующие действия:
- Удаление таблицы:
StormEvents
удаление таблицы (если она существует). - Создание таблицы:
StormEvents
создание таблицы. - Создание сопоставления:
StormEvents_CSV_Mapping
создание сопоставления. - Прием файла: CSV-файл (в хранилище Blob-объектов Azure) помещается в очередь для приема данных.
Ниже приведен пример кода из App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Совет
Чтобы попробовать различные комбинации операций, вы можете раскомментировать или прокомментировать соответствующие методы в App.java
.
Выполнение приложения
Клонируйте образец кода с GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Задайте сведения о субъекте-службе со следующими сведениями в качестве переменных среды, используемых программой:
- Конечная точка кластера
- Имя базы данных
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Выполните сборку:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Результат должен быть аналогичен приведенному ниже:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Подождите несколько минут, пока процесс приема данных завершится. После успешного завершения вы увидите следующее сообщение журнала:Ingestion completed successfully
. Вы можете выйти из программы на этом этапе и перейти к следующему шагу, не влияя на процесс приема данных, который уже поставлен в очередь.
Проверить
Подождите в пределах 10 минут, пока не будет запланирован прием данных в очереди, и загрузите данные в Azure Data Explorer.
Войдите в https://dataexplorer.azure.com и подключитесь к кластеру.
Выполните следующую команду, чтобы получить количество записей в таблице
StormEvents
:StormEvents | count
Устранение неполадок
Чтобы просмотреть ошибки приема данных в течение последних четырех часов, выполните следующую команду в базе данных:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Выполните следующую команду, чтобы узнать состояние всех операций приема данных за последние четыре часа:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Очистка ресурсов
Если вы не планируете использовать созданные ресурсы, выполните следующую команду в базе данных, чтобы удалить таблицу StormEvents
.
.drop table StormEvents