Руководство по миграции: Elasticsearch в Azure Data Explorer
В этом руководстве вы узнаете , как перенести данные Elasticsearch в Azure Data Explorer с помощью Logstash.
В этом руководстве данные, которые необходимо перенести, содержатся в индексе Elasticsearch с именем транспортного средства , имеющего следующую схему данных:
{
"Manufacturer": "string",
"Model": "string",
"ReleaseYear": "int",
"ReleaseDate": "datetime"
}
Необходимые компоненты
Чтобы перенести данные Elasticsearch в Azure Data Explorer, вам потребуется:
- Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
- Кластер и база данных Azure Data Explorer. Можно создать бесплатный кластер или создать полный кластер. Чтобы решить, что лучше всего подходит для вас, проверьте сравнение функций.
- Идентификатор приложения и делегированные разрешения для доступа к кластеру Azure Data Explorer. Дополнительные сведения см. в статье "Создание приложения Microsoft Entra". Для настройки конвейера Logstash требуется идентификатор приложения, секрет и клиент.
- Инструкции по установке Logstash версии 6+.
Подготовка к миграции
После выполнения необходимых условий можно приступать к обнаружению топологии среды и оценке возможности вашей миграции в облако Azure.
Создание целевой схемы в кластере Azure Data Explorer
Чтобы правильно принять и структурировать данные для запроса и анализа, необходимо создать схему таблицы и сопоставление в кластере Azure Data Explorer.
Схема таблицы и переносимых данных должна соответствовать. Сопоставление приема важно для установления сопоставления исходных столбцов в ELK с целевыми столбцами в таблице.
Чтобы создать схему таблицы и сопоставление приема в кластере:
Войдите в веб-интерфейс Azure Data Explorer.
Выберите базу данных, в которой нужно создать схему таблицы для данных миграции.
Выполните следующую команду в окне запроса базы данных, чтобы создать схему таблицы.
.create tables Vehicle ( Manufacturer: string, Model: string, ReleaseYear: int, ReleaseDate: datetime )
Выполните следующую команду, чтобы создать сопоставление приема.
.create table Vehicle ingestion json mapping 'VechicleMapping' '[' ' {"column":"Manufacturer", "path":"$.manufacturer"},' ' {"column":"Model", "path":"$.model"},' ' {"column":"ReleaseYear", "path":"$.releaseYear"},' ' {"column":"ReleaseDate", "path":"$.releaseDate"}' ']'
Подготовка Logstash к миграции
При переносе данных в кластер Azure Data Explorer важно правильно настроить конвейер Logstash. Конвейер гарантирует правильность форматирования и передачи данных в целевую таблицу.
Если необходимо переместить данные из нескольких кластеров Elasticsearch или индексов, можно создать несколько входных разделов в файле конфигурации конвейера. Для этого можно определить один входной раздел для каждого кластера Elasticsearch или индекса и классифицировать их с помощью тегов, если вы хотите. Затем эти теги можно использовать в условных инструкциях в разделе выходных данных для направления этих наборов данных в определенные таблицы кластера Azure Data Explorer.
Чтобы настроить конвейер Logstash, выполните приведенные действия.
В командной оболочке перейдите к корневому каталогу Logstash и выполните следующую команду, чтобы установить подключаемый модуль вывода Logstash. Дополнительные сведения о подключаемом модуле см. в разделе "Прием данных из Logstash".
bin/logstash-plugin install logstash-output-kusto
Создайте файл конфигурации конвейера Logstash, используя следующие параметры:
input { elasticsearch { hosts => "http://localhost:9200" index => "vehicle" query => '{ "query": { "range" : { "releaseDate": { "gte": "2019-01-01", "lte": "2023-12-31" }}}}' user => "<elasticsearch_username>" password => "<elasticsearch_password>" ssl => true ca_file => "<certification_file>" } } filter { ruby { code => "event.set('[@metadata][timebucket]', Time.now().to_i/10)" } } output { kusto { path => "/tmp/region1/%{+YYYY-MM-dd}-%{[@metadata][timebucket]}.txt" ingest_url => "https://ingest-<azure_data_explorer_cluster_name>.<region>.kusto.windows.net" app_id => "<app_id>" app_key => "<app_secret>" app_tenant => "<app_tenant_id>" database => "<your_database>" table => "Vehicle" // The table schema you created earlier json_mapping => "vehicleMapping" // The ingestion mapping you created earlier } }
Входные параметры
Наименование параметра Description Хозяева URL-адрес кластера Elasticsearch. index Имя индекса для миграции. query Необязательный запрос для получения определенных данных из индекса. user Имя пользователя для подключения к кластеру Elasticsearch. пароль Пароль для подключения к кластеру Elasticsearch. теги Необязательные теги для идентификации источника данных. Например, укажите tags => ["vehicle"]
в разделе elasticsearch, а затем фильтруйте с помощьюif "vehicle" in [tags] { ... }
упаковки раздела kusto.ssl Указывает, требуется ли SSL-сертификат. ca_file Файл сертификата, который необходимо передать для проверки подлинности. Параметры фильтра
Фильтр ruby предотвращает прием повторяющихся данных в кластер, задав уникальную метку времени для файлов данных Elasticsearch каждые 10 секунд. Это рекомендуемая практика, которая блокирует данные в файлы с уникальной меткой времени, обеспечивая правильную обработку данных для миграции.
Выходные параметры
Наименование параметра Description path Подключаемый модуль Logstash записывает события во временные файлы перед отправкой в кластер. Этот параметр описывает путь к тому, где сохраняются временные файлы, а также выражение времени для смены файлов для активации отправки в кластер. ingest_url Конечная точка кластера для взаимодействия, связанного с приемом данных. app_id, app_key и app_tenant Учетные данные, необходимые для подключения к кластеру. Убедитесь, что вы используете приложение с привилегиями приема. Дополнительные сведения см. в разделе Необходимые условия. database Имя базы данных для размещения событий. table Имя целевой таблицы для размещения событий. json_mapping Сопоставление используется для сопоставления входящей строки json события с правильным форматом строки (определяет, какое свойство ELK переходит в столбец схемы таблицы).
Миграция
После подготовки предварительных шагов миграции следующий шаг — выполнить процесс миграции. Важно отслеживать конвейер во время процесса миграции данных, чтобы убедиться, что он работает гладко и позволяет решить любые проблемы, которые могут возникнуть.
Чтобы перенести данные, в командной оболочке перейдите в корневой каталог Logstash и выполните следующую команду:
bin/logstash -f <your_pipeline>.conf
На экране должны отображаться сведения.
После миграции
После завершения миграции необходимо пройти ряд задач после миграции, чтобы проверить данные и убедиться, что все работает так же гладко и эффективно, как это возможно.
Процесс проверки данных для определенного индекса обычно состоит из следующих действий:
Сравнение данных. Сравнение перенесенных данных в кластере Azure Data Explorer с исходными данными в Elasticsearch. Это можно сделать с помощью такого инструмента, как Kibana в стеке ELK, который позволяет запрашивать и визуализировать данные в обеих средах.
Выполнение запроса. Выполните ряд запросов к перенесенным данным в кластере Azure Data Explorer, чтобы убедиться, что данные являются точными и полными. Это включает выполнение запросов, которые проверяют связи между различными полями и запросы, которые проверяют целостность данных.
Проверьте отсутствие данных: сравните перенесенные данные в кластере с данными в Elasticsearch, чтобы проверить отсутствие данных, дублирование данных или другие несоответствия данных.
Проверьте производительность. Проверьте производительность перенесенных данных в кластере и сравните его с производительностью данных в Elasticsearch. Это может включать выполнение запросов и визуализацию данных для проверки времени отклика и обеспечения оптимизации данных в кластере для повышения производительности.
Внимание
Повторите процесс проверки данных, если все изменения вносятся в перенесенные данные или кластер, чтобы убедиться, что данные по-прежнему точны и завершены.
Ниже приведены некоторые примеры запросов, которые можно запустить для проверки данных в кластере:
В Elasticsearch выполните следующие запросы, чтобы получить следующую команду:
// Gets the total record count of the index GET vehicle/_count // Gets the total record count of the index based on a datetime query GET vehicle/_count { "query": { "range" : { "releaseDate": { "gte": "2021-01-01", "lte": "2021-12-31" } } } } // Gets the count of all vehicles that has manufacturer as "Honda". GET vehicle/_count { "query": { "bool" : { "must" : { "term" : { "manufacturer" : "Honda" } } } } } // Get the record count where a specific property doesn't exist. // This is helpful especially when some records don't have NULL properties. GET vehicle/_count { "query": { "bool": { "must_not": { "exists": { "field": "description" } } } } }
В окне запроса базы данных выполните следующий соответствующий запрос:
// Gets the total record count in the table Vehicle | count // Gets the total record count where a given property is NOT empty/null Vehicle | where isnotempty(Manufacturer) // Gets the total record count where a given property is empty/null Vehicle | where isempty(Manufacturer) // Gets the total record count by a property value Vehicle | where Manufacturer == "Honda" | count
Сравните результаты обоих наборов запросов, чтобы убедиться, что данные в кластере точны и полны.