ASIM パーサーを作成する

完了

Advanced Security Information Model (ASIM) ユーザーは、クエリでテーブル名の代わりに統一パーサーを使用して、正規化された形式でデータを表示し、スキーマに関連するすべてのデータをクエリに含めます。 その結果、統一パーサーでは、ソース固有パーサーを使用して各ソースの具体的な詳細が処理されます。

Microsoft Sentinel では、多くのデータ ソースに対して組み込みのソース固有パーサーが用意されています。 次の状況では、これらのソース固有パーサーを変更または開発することができます。

デバイスから ASIM スキーマに適合するイベントが提供されるが、デバイスと関連スキーマに対応するソース固有パーサーが Microsoft Sentinel で使用できない場合。

デバイスで ASIM ソース固有パーサーを使用できるが、デバイスから ASIM パーサーで想定されているものとは異なる方法または形式でイベントが送信される場合。 次に例を示します。

ソース デバイスが標準以外の方法でイベントを送信するように構成されている。

デバイスのバージョンが ASIM パーサーでサポートされているものと異なる。

イベントが中間システムによって収集、変更、転送される場合がある。

カスタム パーサーの開発プロセス

次のワークフローでは、カスタム ASIM ソース固有パーサーを開発する大まかな手順について説明します。

  1. サンプル ログを収集します。

  2. ソースから送信されたイベントが表すスキーマを識別します。

  3. マップソース イベント フィールドが識別されたスキーマにマップします。

  4. ソースに対して 1 つ以上の ASIM パーサーを開発します。 ソースに関連するスキーマごとに、フィルター パーサーとパラメーターなしのパーサーを開発する必要があります。

  5. パーサーをテストします。

  6. パーサーを Microsoft Sentinel のワークスペースにデプロイします。

  7. 関連する ASIM 統一パーサーを更新して、新しいカスタム パーサーを参照します。

  8. また、プライマリ ASIM ディストリビューションにパーサーを コントリビュートすることもできます。 コントリビュートされたパーサーは、組み込みのパーサーとしてすべてのワークスペースで使用することもできます。

サンプル ログの収集

効果的な ASIM パーサーを構築するには、代表的なログ セットが必要です。ほとんどの場合、ソース システムを設定して Microsoft Sentinel に接続する必要があります。 ソース デバイスを使用できない場合は、クラウドの従量課金制サービスを使用して、開発とテストのために多数のデバイスをデプロイできます。

さらに、ログのベンダー ドキュメントとサンプルを見つけることは、ログ形式の範囲を広く確保する上で、開発を迅速化し、間違いを減らすのに役立ちます。

ログの代表的なセットには、次のものが含まれている必要があります。

  • 異なるイベント結果を持つイベント。
  • 応答アクションが異なるイベント。
  • ユーザー名、ホスト名と ID、値の正規化を必要とするその他のフィールドのさまざまな形式。

マッピング

パーサーを開発する前に、ソース イベントまたはイベントで使用可能な情報を、識別したスキーマにマップします。

  • すべての必須フィールド、できれば推奨フィールドもマップします。
  • ソースから入手できる情報を正規化されたフィールドにマップしてみてください。 選択したスキーマの一部として使用できない場合は、他のスキーマで使用可能なフィールドへのマッピングを検討してください。
  • ソースのフィールドの値を、ASIM で許可されている正規化された値にマップします。 元の値は、EventOriginalResultDetails などの別のフィールドに格納されます。

パーサーの開発

関連するスキーマごとに、フィルター処理とパラメーターなしのパーサーの両方を開発します。

カスタム パーサーは、Microsoft Sentinel の [ログ] ページで開発される KQL クエリです。 パーサー クエリには、次の 3 つの部分があります。

[フィルター] > [解析] > [準備] フィールド

関連レコードのフィルター処理

多くの場合、Microsoft Sentinel のテーブルには、複数の種類のイベントが含まれています。 次に例を示します。

  • Syslog テーブルには、複数のソースからのデータが含まれています。
  • カスタム テーブルには、複数のイベントの種類を提供し、さまざまなスキーマに適合する 1 つのソースの情報を含めることができます。

したがって、パーサーでは、まずターゲット スキーマに関連するレコードのみをフィルター処理する必要があります。

KQL でのフィルター処理は、where 演算子を使って行われます。 たとえば、Sysmon event 1 はプロセスの作成を報告するものであるため、ProcessEvent スキーマに正規化されます。 Sysmon event 1 イベントは Event テーブルの一部であるため、以下のフィルターを使用します。

Event | where Source == "Microsoft-Windows-Sysmon" and EventID == 1

重要

パーサーは時間でフィルター処理しないでください。 パーサーを使用するクエリでは、時間範囲が適用されます。

ウォッチリストを使用したソースの種類によるフィルター処理

特定のソースの種類をフィルター処理するための情報が、イベント自体には含まれていない場合もあります。

たとえば、Infoblox DNS イベントは Syslog メッセージとして送信され、他のソースから送信された Syslog メッセージとの区別が困難です。 このような場合、パーサーは、関連するイベントを定義するソースの一覧に依拠します。 この一覧は ASimSourceType ウォッチリストに保持されます。

パーサーで ASimSourceType ウォッチリストを使用するには:

  • パーサーの先頭に次の行を含めます。
let Sources_by_SourceType=(sourcetype:string){_GetWatchlist('ASimSourceType') | where SearchKey == tostring(sourcetype) | extend Source=column_ifexists('Source','') | where isnotempty(Source)| distinct Source };
  • ウォッチリストを使用するフィルターを、パーサーのフィルター処理セクションに追加します。 たとえば、Infoblox DNS パーサーでは、フィルター処理セクションに次の情報が含まれています。
| where Computer in (Sources_by_SourceType('InfobloxNIOS'))

パーサーでこのサンプルを使用するには:

  • Computer を、ソースのソース情報を含むフィールドの名前に置き換えます。 Syslog ベースのパーサーの場合、これは Computer のままで構いません。

  • InfobloxNIOS トークンを、使用するパーサーに適した任意の値に置き換えます。 選択した値を使用して ASimSourceType ウォッチリストを更新する必要があること、および、この種類のイベントを送信するソースの一覧をパーサーのユーザーに通知します。

パーサー パラメーターに基づくフィルター処理

フィルター パーサーを開発する場合は、関連するスキーマのリファレンス記事の説明に従って、パーサーがそのスキーマのフィルター パラメーターを確実に受け入れるようにします。 既存のパーサーを出発点として使用すると、パーサーに正しい関数シグネチャを確実に含めることができます。 ほとんどの場合、実際のフィルター処理コードも同じスキーマのフィルター パーサーに似ています。

フィルター処理を行う場合は、次の条件を確認します。

  • 物理フィールド を使用して解析する前にフィルター処理します。 フィルター処理された結果が十分に正確ではない場合は、解析後にテストを繰り返して結果を微調整します。 詳細については、フィルター処理の最適化に関する記事をご覧ください。
  • パラメーターが定義されておらず既定値が残っている場合は、フィルター処理を行いません。

次の例は、文字列パラメーター (既定値は通常 '*') およびリスト パラメーター (既定値は通常空のリスト) のフィルター処理を実装する方法を示しています。

srcipaddr=='*' or ClientIP==srcipaddr
array_length(domain_has_any) == 0 or Name has_any (domain_has_any)

フィルター処理の最適化

パーサーのパフォーマンスを確保するために、次のフィルター選択に関する推奨事項を確認してください。

  • 解析されたフィールドではなく、常に組み込みのフィルター処理を行います。 解析されたフィールドを使用してフィルター処理する方が簡単な場合もありますが、これはパフォーマンスに大きく影響します。
  • 最適化されたパフォーマンスを提供する演算子を使用します。 特に、==、has、startswith です。 また、contains や matches regex などの演算子を使うと、パフォーマンスが劇的に向上します。

フィルター処理のパフォーマンスに関する推奨事項に従うのは困難な場合もあります。 たとえば、has を使うと contains よりも精度は低下します。 他のケースでは、SyslogMessage のような組み込みフィールドの照合は、DvcAction のような抽出されたフィールドの比較よりも精度が低くなります。 このような場合は、組み込みフィールドに対してパフォーマンス最適化演算子を使用して事前にフィルター処理し、解析後により正確な条件を使用してフィルターを繰り返すことをお勧めします。

例として、次の Infoblox DNS パーサーのスニペットを参照してください。 パーサーは、まず SyslogMessage フィールドに単語 client があるかどうかを確認します。 ただし、この単語がメッセージ内の別の場所で使用されている可能性もあります。そのため、Log_Type フィールドを解析した後、パーサーでは単語 client が実際にフィールドの値だったことを再確認します。

Syslog | where ProcessName == "named" and SyslogMessage has "client"
…
      | extend Log_Type = tostring(Parser[1]),
      | where Log_Type == "client"

解析

クエリで関連するレコードを選択すると、そのレコードの解析が必要になる場合があります。 通常、1 つのテキスト フィールドで複数のイベント フィールドが伝達される場合は、解析が必要です。

解析を実行する KQL 演算子を、以下にパフォーマンスが最適化されている順に示します。 最初の方法では最適なパフォーマンスが得られますが、最後のパフォーマンスは最も最適化されていません。

演算子 説明
split 区切り値の文字列を解析します。
parse_csv CSV (コンマ区切り値) 行として書式設定された値の文字列を解析します。
parse パターンを使用して、任意の文字列から複数の値を解析します。これは、パフォーマンス向上のための単純なパターンでも、正規表現でもかまいません。
extract_all 正規表現を使用して、任意の文字列から単一の値を解析します。 parse が正規表現を使用している場合、extract_all はそれと同様のパフォーマンスを発揮します。
extract 正規表現を使用して、任意の文字列から単一の値を抽出します。 1 つの値が必要な場合、extract を使用すると、parse や extract_all よりもパフォーマンスが向上します。 しかし、同じソース文字列に対して extract の複数のアクティブ化を使用することは、1 つの parse または extract_all に比べて効率が悪いため、避ける必要があります。
parse_json JSON 形式で設定された文字列の値を解析します。 JSON の一部の値だけが必要な場合は、parse、extract、または extract_all を使用するとパフォーマンスが向上します。
parse_xml XML 形式で設定された文字列の値を解析します。 XML の一部の値だけが必要な場合は、parse、extract、または extract_all を使用するとパフォーマンスが向上します。

解析フェーズでは文字列の解析に加えて、以下のような元の値の処理が必要になる場合があります。

  • 書式設定と型変換。 抽出されたソース フィールドは、ターゲット スキーマ フィールドに合うように書式設定しなければならない場合があります。 たとえば、日付と時刻を表す文字列を datetime フィールドに変換することが必要な場合もあります。 このような場合は、todatetime や tohex などの関数が役立ちます。

  • 値の検索。 抽出されたソース フィールドの値は、ターゲット スキーマ フィールドに指定された値のセットにマッピングされる必要があります。 たとえば、いくつかのソースは DNS 応答コードを数値でレポートしますが、スキーマでは、より一般的なテキストの応答コードが要求されます。 関数 iff と case は、少数の値をマップする場合に役立ちます。

    たとえば、Microsoft DNS パーサーは、次のように、iff ステートメントを使用して、イベント ID と応答コードに基づいて EventResult フィールドを割り当てます。

    extend EventResult = iff(EventId==257 and ResponseCode==0 ,'Success','Failure')
    

    複数の値の場合は、同じ DNS パーサーで示されているように、datatable と lookup を使用します。

    let RCodeTable = datatable(ResponseCode:int,ResponseCodeName:string) [ 0, 'NOERROR', 1, 'FORMERR'....];
    ...
     | lookup RCodeTable on ResponseCode
     | extend EventResultDetails = case (
     isnotempty(ResponseCodeName), ResponseCodeName,
     ResponseCode between (3841 .. 4095), 'Reserved for Private Use',
     'Unassigned')
    

マッピング値

多くの場合、抽出された元の値を正規化する必要があります。 たとえば、ASIM では、MAC アドレスは区切り文字としてコロンを使用しますが、送信元はハイフンで区切られた MAC アドレスを送信できます。 値を変換するための主要な演算子は、上記の「解析」セクションで示したように、KQL 文字列、数値、日付関数の広範なセットと共に extend です。

case、iff、lookup ステートメントは、ターゲット フィールドで許可されている値に値のセットをマップする必要がある場合に使用します。

各ソース値がターゲット値にマップされる場合は、datatable 演算子を使用してマッピングを定義し、マップの lookup を実行します。 次に例を示します。

let NetworkProtocolLookup = datatable(Proto:real, NetworkProtocol:string)[
        6, 'TCP',
        17, 'UDP'
   ];
    let DnsResponseCodeLookup=datatable(DnsResponseCode:int,DnsResponseCodeName:string)[
      0,'NOERROR',
      1,'FORMERR',
      2,'SERVFAIL',
      3,'NXDOMAIN',
      ...
   ];
   ...
   | lookup DnsResponseCodeLookup on DnsResponseCode
   | lookup NetworkProtocolLookup on Proto

lookup は、マッピングに可能な値が 2 つしかない場合にも便利で効率的であることに注意してください。

それよりもマッピング条件が複雑になる場合は、iff または case 関数を使用します。 iff 関数を使用すると、次の 2 つの値が有効になります。

| extend EventResult = 
      iff(EventId==257 and ResponseCode==0,'Success','Failure’)

case 関数は、3 つ以上のターゲット値をサポートします。 次の例は、lookupcase を組み合わせる方法を示しています。 上記の lookup 例では、lookup 値が見つからない場合に DnsResponseCodeName フィールドに空の値が返されます。 次の case 例では、使用可能な場合は lookup 操作の結果を使用し、それ以外の場合は追加の条件を指定することで、これを拡張します。

| extend DnsResponseCodeName = 
      case (
        DnsResponseCodeName != "", DnsResponseCodeName,
        DnsResponseCode between (3841 .. 4095), 'Reserved for Private Use',
        'Unassigned'
      )

結果セットのフィールドを準備する

パーサーでは、正規化されたフィールドが確実に使用されるようにするために、結果セット内のフィールドを準備する必要があります。

結果セットのフィールドを準備するには、次の KQL 演算子を使用します。

演算子 説明 パーサーで使用する場合
project-rename フィールドの名前を変更します。 フィールドが実際のイベントに存在し、名前変更だけが必要な場合は、project-rename を使用します。 名前が変更されたフィールドは組み込みフィールドと同様に動作し、フィールドに対する操作のパフォーマンスがはるかに向上します。
project-away フィールドを削除します。 結果セットから削除したい特定のフィールドには project-away を使用します。 正規化されていない元のフィールドは、混乱を招くか、非常に大きくてパフォーマンスに影響を与える可能性がある場合を除き、結果セットから削除しないことをお勧めします。
project 以前に存在した、またはステートメントの一部として作成されたフィールドを選択し、その他のフィールドをすべて削除します。 パーサーでは、正規化されていない他のフィールドを削除してはいけないため、パーサーでの使用は推奨されません。 解析時に使用される一時的な値など、特定のフィールドを削除する必要がある場合は、project-away を使用して結果から削除します。
extend エイリアスを追加します。 計算フィールドの生成における役割とは別に、extend 演算子はエイリアスの作成にも使用されます。

解析バリアントを処理する

多くの場合、イベント ストリーム内のイベントには、さまざまな解析ロジックを必要とするバリアントが含まれます。 1 つのパーサーでさまざまなバリアントを解析するには、iff や case などの条件ステートメントを使用するか、union 構造体を使用します。

union を使用して複数のバリアントを処理するには、バリアントごとに個別の関数を作成し、union ステートメントを使用して結果を結合します。

let AzureFirewallNetworkRuleLogs = AzureDiagnostics
    | where Category == "AzureFirewallNetworkRule"
    | where isnotempty(msg_s);
let parseLogs = AzureFirewallNetworkRuleLogs
    | where msg_s has_any("TCP", "UDP")
    | parse-where
        msg_s with           networkProtocol:string 
        " request from "     srcIpAddr:string
        ":"                  srcPortNumber:int
    …
    | project-away msg_s;
let parseLogsWithUrls = AzureFirewallNetworkRuleLogs
    | where msg_s has_all ("Url:","ThreatIntel:")
    | parse-where
        msg_s with           networkProtocol:string 
        " request from "     srcIpAddr:string
        " to "               dstIpAddr:string
    …
union parseLogs,  parseLogsWithUrls…

イベントの重複や過剰な処理を回避するには、各関数が、ネイティブ フィールドを使用して、解析を意図したイベントのみをフィルター処理することによって開始されるようにします。 また、必要に応じて、組合の前にある各ブランチで project-away を使用します。

パーサーをデプロイする

パーサーを手動でデプロイするには、パーサーを Azure Monitor ログ ページにコピーし、クエリを関数として保存します。 このメソッドはテストする際に役立ちます。 詳細については、「関数を作成する」を参照してください。

多数のパーサーをデプロイするには、次のようにパーサー ARM テンプレートを使用することをお勧めします。

  1. 各スキーマの関連するテンプレートに基づいて YAML ファイルを作成し、そこにクエリを含めます。 最初に、スキーマとパーサーの種類 (フィルター処理またはパラメーターレス) に関連する YAML テンプレートを使用します。

  2. ASIM Yaml to ARM テンプレート コンバーターを使用して、YAML ファイルを ARM テンプレートに変換します。

  3. 更新プログラムをデプロイする場合は、ポータルまたは PowerShell の関数の削除ツールを使用して以前のバージョンの関数を削除します。

  4. Azure portal または PowerShell を使用して、テンプレートをデプロイします。

リンクされたテンプレートを使用して、複数のテンプレートを 1 つのデプロイ プロセスにまとめることもできます。