移行ガイド: Elasticsearch から Azure Data Explorer へ
このガイドでは、Logstash を使用して Elasticsearch データを Azure Data Explorer に移行する方法学習します。
このガイドでは、移行するデータは、次のデータ スキーマを持つ vehicle という名前の Elasticsearch インデックス内に存在します。
{
"Manufacturer": "string",
"Model": "string",
"ReleaseYear": "int",
"ReleaseDate": "datetime"
}
前提条件
Elasticsearch データを Azure Data Explorer に移行するには、次のものが必要です。
- Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 空きクラスター 作成することも 完全なクラスターを作成 することもできます。 最適な方法を決定するには、 機能の比較を確認。
- Azure Data Explorer クラスターにアクセスするためのアプリ ID と委任されたアクセス許可。 詳細については、「 Microsoft Entra アプリの作成」を参照してください。 Logstash パイプラインを構成するには、アプリ ID、シークレット、テナント ID が必要です。
- Logstash バージョン 6 以降のインストール手順。
移行前
前提条件を満たしたら、環境のトポロジを検出し、Azure Cloud 移行の実現可能性を評価する準備が整いました。
Azure Data Explorer クラスターでターゲット スキーマを作成する
クエリと分析のためにデータを適切に取り込んで構造化するには、Azure Data Explorer クラスターにテーブル スキーマとマッピングを作成する必要があります。
テーブルのスキーマと移行するデータが一致している必要があります。 インジェスト マッピングは、ELK のソース列とテーブル内のターゲット列のマッピングを確立するために重要です。
クラスターでテーブル スキーマとインジェスト マッピングを作成するには:
Azure Data Explorer Web UI にサインインします。
クラスターに 接続を追加します。
移行データのテーブル スキーマを作成するデータベースを選択します。
データベース クエリ ウィンドウで次のコマンドを実行して、テーブル スキーマを作成します。
.create tables Vehicle ( Manufacturer: string, Model: string, ReleaseYear: int, ReleaseDate: datetime )
次のコマンドを実行して、インジェスト マッピングを作成します。
.create table Vehicle ingestion json mapping 'VechicleMapping' '[' ' {"column":"Manufacturer", "path":"$.manufacturer"},' ' {"column":"Model", "path":"$.model"},' ' {"column":"ReleaseYear", "path":"$.releaseYear"},' ' {"column":"ReleaseDate", "path":"$.releaseDate"}' ']'
移行のために Logstash を準備する
Azure Data Explorer クラスターにデータを移行する場合は、 Logstash パイプラインを適切に設定することが重要です。 パイプラインにより、データが正しく書式設定され、ターゲット テーブルに転送されます。
複数の Elasticsearch クラスターまたはインデックスからデータを移動する必要がある場合は、パイプライン構成ファイルに複数の入力セクションを作成できます。 これを実現するには、Elasticsearch クラスターまたはインデックスごとに 1 つの入力セクションを定義し、必要に応じてタグを使用して分類します。 次に、出力セクションの条件付きステートメントでこれらのタグを使用して、これらのデータセットを特定の Azure Data Explorer クラスター テーブルに転送できます。
Logstash パイプラインを設定するには:
コマンド シェルで Logstash ルート ディレクトリに移動し、次のコマンドを実行して、 Logstash 出力プラグインをインストールします。 プラグインの詳細については、「 Logstash からのデータのインレスト」を参照してください。
bin/logstash-plugin install logstash-output-kusto
次の設定を使用して、Logstash パイプライン構成ファイルを作成します。
input { elasticsearch { hosts => "http://localhost:9200" index => "vehicle" query => '{ "query": { "range" : { "releaseDate": { "gte": "2019-01-01", "lte": "2023-12-31" }}}}' user => "<elasticsearch_username>" password => "<elasticsearch_password>" ssl => true ca_file => "<certification_file>" } } filter { ruby { code => "event.set('[@metadata][timebucket]', Time.now().to_i/10)" } } output { kusto { path => "/tmp/region1/%{+YYYY-MM-dd}-%{[@metadata][timebucket]}.txt" ingest_url => "https://ingest-<azure_data_explorer_cluster_name>.<region>.kusto.windows.net" app_id => "<app_id>" app_key => "<app_secret>" app_tenant => "<app_tenant_id>" database => "<your_database>" table => "Vehicle" // The table schema you created earlier json_mapping => "vehicleMapping" // The ingestion mapping you created earlier } }
入力パラメーター。
パラメーター名 説明 ホスト Elasticsearch クラスターの URL。 インデックス 移行するインデックスの名前。 クエリ インデックスから特定のデータを取得するオプションのクエリ。 ユーザー Elasticsearch クラスターに接続するユーザー名。 password Elasticsearch クラスターに接続するためのパスワード。 tags データのソースを識別するための省略可能なタグ。 たとえば、elasticsearch セクションで tags => ["vehicle"]
を指定し、kusto セクションをラップするif "vehicle" in [tags] { ... }
を使用してフィルター処理します。ssl SSL 証明書が必要かどうかを指定します。 ca_file 認証に渡す証明書ファイル。 フィルター パラメーター
Ruby フィルターは、Elasticsearch データ ファイルの一意のタイムスタンプを 10 秒ごとに設定することで、重複データがクラスターに取り込まれるのを防ぎます。 これは、データを一意のタイムスタンプでファイルにチャンクし、データが移行のために適切に処理されるようにするベスト プラクティスです。
出力パラメーター
パラメーター名 説明 path Logstash プラグインは、イベントをクラスターに送信する前に一時ファイルに書き込みます。 このパラメーターは、一時ファイルが保存される場所へのパスと、クラスターへのアップロードをトリガーするファイルローテーションの時間式を記述します。 ingest_url インジェスト関連の通信用のクラスター エンドポイント。 app_id、app_key、および app_tenant クラスターに接続するために必要な資格情報。 取り込み特権を持つアプリケーションを使用していることを確認します。 詳細については、「前提条件」を参照してください。 database イベントを配置するデータベースの名前。 テーブル イベントを配置するターゲット テーブルの名前。 json_mapping マッピングは、受信イベントの json 文字列を正しい行形式にマップするために使用されます (どの ELK プロパティがどのテーブル スキーマ列に入れるかを定義します)。
移行
移行前の手順の準備が完了したら、次の手順は移行プロセスを実行することです。 発生する可能性のある問題に対処できるように、データ移行プロセス中にパイプラインを監視して、円滑に実行されるようにすることが重要です。
データを移行するには、コマンド シェルで Logstash ルート ディレクトリに移動し、次のコマンドを実行します。
bin/logstash -f <your_pipeline>.conf
画面に情報が出力されます。
移行後
移行が完了したら、移行後の一連のタスクを実行してデータを検証し、すべてが可能な限りスムーズかつ効率的に機能していることを確認する必要があります。
特定のインデックスのデータ検証プロセスは、通常、次のアクティビティで構成されます。
データの比較: Azure Data Explorer クラスター内の移行されたデータと Elasticsearch の元のデータを比較します。 これを行うには、ELK スタックの Kibana などのツールを使用して、両方の環境でデータのクエリと視覚化を行うことができます。
クエリの実行: Azure Data Explorer クラスター内の移行されたデータに対して一連のクエリを実行して、データが正確かつ完全であることを確認します。 これには、異なるフィールド間のリレーションシップをテストするクエリの実行と、データの整合性をテストするクエリが含まれます。
不足しているデータの確認: クラスター内の移行されたデータと Elasticsearch のデータを比較して、不足しているデータ、重複するデータ、またはその他のデータの不整合を確認します。
パフォーマンスを検証する: クラスター内の移行されたデータのパフォーマンスをテストし、Elasticsearch のデータのパフォーマンスと比較します。 これには、クエリの実行やデータの視覚化を含めて応答時間をテストし、クラスター内のデータがパフォーマンスのために最適化されていることを確認できます。
重要
移行されたデータまたはクラスターに変更が加えられた場合は、データ検証プロセスを繰り返して、データが正確で完全であることを確認します。
クラスター内のデータを検証するために実行できるクエリの例を次に示します。
Elasticsearch で、次のクエリを実行して次のクエリを取得します。
// Gets the total record count of the index GET vehicle/_count // Gets the total record count of the index based on a datetime query GET vehicle/_count { "query": { "range" : { "releaseDate": { "gte": "2021-01-01", "lte": "2021-12-31" } } } } // Gets the count of all vehicles that has manufacturer as "Honda". GET vehicle/_count { "query": { "bool" : { "must" : { "term" : { "manufacturer" : "Honda" } } } } } // Get the record count where a specific property doesn't exist. // This is helpful especially when some records don't have NULL properties. GET vehicle/_count { "query": { "bool": { "must_not": { "exists": { "field": "description" } } } } }
データベース クエリ ウィンドウで、次の対応するクエリを実行します。
// Gets the total record count in the table Vehicle | count // Gets the total record count where a given property is NOT empty/null Vehicle | where isnotempty(Manufacturer) // Gets the total record count where a given property is empty/null Vehicle | where isempty(Manufacturer) // Gets the total record count by a property value Vehicle | where Manufacturer == "Honda" | count
両方のクエリ セットの結果を比較して、クラスター内のデータが正確で完全であることを確認します。