Logstash から Azure Data Explorer にデータを取り込む
重要
このコネクタは、Microsoft Fabric のリアルタイム インテリジェンスで使用できます。 次の例外を除き、この記事の手順を使用してください。
- 必要に応じて、「KQL データベースを作成する」の手順に従ってデータベースを作成する。
- 必要に応じて、「空のテーブルを作成する」の手順に従ってテーブルを作成する。
- 「URI をコピーする」の手順に従って、クエリまたはインジェスト URI を取得する。
- KQL クエリセットでクエリを実行する。
Logstash は、多くのソースからデータを同時に取り込み、変換したデータを任意の "一時退避場所" に送信する、オープン ソースのサーバー側データ処理パイプラインです。 この記事では、そのデータを Azure Data Explorer に送信します。これは、ログとテレメトリ データのための高速で非常にスケーラブルなデータ探索サービスです。 最初にテスト クラスターにテーブルとデータのマッピングを作成し、次にデータをテーブルに送信して結果を検証するように Logstash に指示します。
Note
このコネクタでは現在、json データ形式のみがサポートされています。
前提条件
- Microsoft アカウントまたは Microsoft Entra ユーザー ID。 Azure サブスクリプションは不要です。
- Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します。
- Logstash バージョン 6 以降のインストール手順。
テーブルの作成
クラスターとデータベースが用意できたら、次はテーブルを作成します。
データベース クエリ ウィンドウで次のコマンドを実行して、テーブルを作成します。
.create table logs (timestamp: datetime, message: string)
次のコマンドを実行して、新しいテーブル
logs
が作成されたこと、およびそれが空であることを確認します。logs | count
マッピングを作成する
マッピングは、受信データをターゲット テーブルのスキーマに変換するために Azure Data Explorer によって使用されます。 次のコマンドでは、path
で示される受信 json からプロパティを抽出して column
に出力する、basicmsg
という名前の新しいマッピングを作成しています。
クエリ ウィンドウで次のコマンドを実行します。
.create table logs ingestion json mapping 'basicmsg' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"message","path":"$.message"}]'
Logstash 出力プラグインをインストールする
Logstash 出力プラグインは、Azure Data Explorer と通信してデータをサービスに送信します。 詳細については、「Logstash プラグイン」を参照してください。
コマンド シェルで Logstash ルート ディレクトリに移動し、次のコマンドを実行してプラグインをインストールします。
bin/logstash-plugin install logstash-output-kusto
サンプル データセットを生成するように Logstash を構成する
Logstash では、エンドツーエンドのパイプラインをテストするために使用できるサンプル イベントを生成できます。 既に Logstash を使用していて、ご自分のイベント ストリームにアクセスできる場合は、次のセクションに進んでください。
Note
独自のデータを使用している場合は、前の手順で定義したテーブル オブジェクトとマッピング オブジェクトを変更してください。
(vi を使用して) 必要なパイプライン設定が含まれる新しいテキスト ファイルを編集します。
vi test.conf
1,000 件のテスト イベントを生成することを Logstash に指示する、次の設定を貼り付けます。
input { stdin { } generator { message => "Test Message 123" count => 1000 } }
この構成には、stdin
入力プラグインも含まれています。これを使用すると、自分でメッセージをさらに作成することができます (それらをパイプラインに送信するには、必ず Enter キーを使用してください)。
Azure Data Explorer にデータを送信するように Logstash を構成する
次の設定を、前の手順で使用したのと同じ構成ファイルに貼り付けます。 すべてのプレースホルダーは、お客様の設定に対応する値に置き換えてください。 詳細については、「Microsoft Entra アプリケーションを作成する」を参照してください。
output {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "https://ingest-<cluster name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>" # logs as defined above
json_mapping => "<mapping name>" # basicmsg as defined above
}
}
パラメーター名 | 説明 |
---|---|
path | Logstash プラグインでは、イベントを Azure Data Explorer に送信する前に、それらを一時ファイルに書き込みます。 このパラメーターには、ファイルを書き込む場所のパスと、Azure Data Explorer サービスへのアップロードをトリガーするためのファイル ローテーションの時間表現が含まれます。 |
ingest_url | インジェストに関連する通信の Kusto エンドポイント。 |
app_id、app_key、および app_tenant | Azure Data Explorer に接続するために必要な資格情報。 取り込み特権を備えたアプリケーションを必ず使用してください。 |
database | イベントを配置するデータベースの名前。 |
テーブル | イベントを配置するターゲット テーブルの名前。 |
json_mapping | マッピングは、受信イベント json 文字列を適切な行形式にマッピングするために使用されます (どのプロパティをどの列に入力するかを定義します)。 |
Logstash を実行する
これで、Logstash を実行して設定をテストする準備が整いました。
コマンド シェルで Logstash ルート ディレクトリに移動し、次のコマンドを実行します。
bin/logstash -f test.conf
画面に情報が表示され、サンプル構成によって生成された 1,000 件のメッセージが表示されます。 この時点で、さらにメッセージを手動で入力することもできます。
数分後、次の Data Explorer クエリを実行して、お客様が定義したテーブル内のメッセージを確認します。
logs | order by timestamp desc
Ctrl + C キーを押して、Logstash を終了します
リソースをクリーンアップする
お客様のデータベースで次のコマンドを実行して、logs
テーブルをクリーンアップします。
.drop table logs