从 Cribl Stream 获取数据

Cribl Stream 是一个处理引擎,可安全地从任何源收集、处理和流式传输计算机事件数据。 它让你可以分析并处理任何目标的数据,以便以安全的方式进行分析和管理。

本文介绍了如何使用 Cribl Stream 来引入数据。

有关数据连接器的完整列表,请参阅数据连接器概述

先决条件

创建 Microsoft Entra 服务主体

Microsoft Entra 服务主体可以通过 Azure 门户或以编程方式进行创建,如下例所示。

此服务主体是连接器用来将数据写入 Kusto 中的表的标识。 授予此服务主体访问 Kusto 资源所需的权限。

  1. 通过 Azure CLI 登录到你的 Azure 订阅。 然后在浏览器中进行身份验证。

    az login
    
  2. 选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 创建服务主体。 在此示例中,服务主体名为 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 从返回的 JSON 数据中复制 appIdpasswordtenant 供将来使用。

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

现已创建了 Microsoft Entra 应用程序和服务主体。

创建目标表

为传入数据创建一个目标表,并创建一个引入映射,将引入的数据列映射到目标表中的列。

  1. 在查询编辑器中运行以下表创建命令,并将占位符 TableName 替换为目标表的名称:

    .create table <TableName> (_raw: string, _time: long, cribl_pipe: dynamic)
    
  2. 运行以下 create ingestion mapping 命令,将占位符 TableName 替换为目标表名称,将 TableNameMapping 替换为引入映射的名称

    .create table <TableName> ingestion csv mapping '<TableNameMapping>' 'CriblLogMapping' '[{"Name":"_raw","DataType":"string","Ordinal":"0","ConstValue":null},{"Name":"_time","DataType":"long","Ordinal":"1","ConstValue":null},{"Name":"cribl_pipe","DataType":"dynamic","Ordinal":"2","ConstValue":null}]'
    
  3. 根据创建 Microsoft Entra 服务主体授予服务主体数据库引入者角色,并提供使用数据库的权限。 有关详细信息,请参阅示例。 将占位符 DatabaseName 替换为目标数据库的名称,将 ApplicationID 替换为创建 Microsoft Entra 服务主体时保存的 AppId 值。

    .add database <DatabaseName> ingestors ('aadapp=<ApplicationID>') 'App Registration'
    

创建 Cribl Stream 目标

以下部分介绍如何创建一个将数据写入 Kusto 中的表的 Cribl Stream 目标。 每个表都需要一个单独的 Cribl Stream 目标连接器。

选择目标

若要将 Cribl Stream 连接到你的表:

  1. 在 Cribl 的顶部导航中,选择“管理”,然后选择一个辅助角色组。

  2. 选择“路由”>“QuickConnect (Stream)”>“添加目标”。

  3. 在“设置新的 QuickConnect 目标”窗口中,选择“Azure 数据资源管理器”,然后选择“立即添加”。

注意

Azure 数据资源管理器连接适用于 Azure 数据资源管理器和实时智能。

设置常规设置

在“新建数据资源管理器”窗口中,在“常规设置”中设置以下设置:

设置 说明
输出 ID <OutputID>,例如 KustoDestination 用于标识目标的名称。
引入模式 批处理(默认)或流式处理 引入模式的设置。 批处理允许表在短时间内引入大量数据时,从一个 Cribl 存储容器拉取成批的数据。 流式处理会将数据直接发送到目标 KQL 表。 流式处理对于引入较小的数据量很有用,例如实时发送关键警报。 流式处理可以实现比批处理更低的延迟。 如果引入模式设置为流式处理,你需要启用流式处理策略。 有关详细信息,请参阅流式处理引入策略
群集基 URI 基 URI 基 URI
引入服务 URI 引入 URI 选择批处理模式时显示。 引入 URI
数据库名称 <DatabaseName> 目标数据库的名称。
表名称 <TableName> 目标表的名称。
验证数据库设置 (默认值)或 验证保存或启动目标时你输入的服务主体应用凭据。 它会验证表名称,但添加映射对象打开时除外。 如果应用没有同时具备数据库查看者表查看者角色,则应禁用此设置。
添加映射对象 (默认值)。 仅在选择批处理模式而不是默认的数据映射文本字段时显示。 选择“是”将打开一个窗口,用于输入 JSON 对象形式的数据映射。
数据映射 创建目标表”步骤中定义的映射架构名称。 映射架构名称。 “添加映射对象”设置为“否”时的默认视图。
压缩 gzip(默认值) 数据格式设置为 Parquet 时,压缩不可用。
数据格式 JSON(默认值)、原始或 Parquet。 数据格式。 Parquet 仅在批处理模式下可用,仅在 Linux 上受支持。
背压行为 阻止(默认)或删除 选择在接收方施加背压时是要阻止还是删除事件。
标记 可选值 可选标记,用于筛选和分组 Cribl Stream 的“管理目标”页中的目标。 在标记名称之间使用制表符或硬回车。 这些标记不会添加到已处理的事件中。

完成后,选择“下一步”。

身份验证设置

在边栏中选择“身份验证设置”。 使用你在“创建 Microsoft Entra 服务主体”中保存的值以及你的基 URI,如下所示:

设置 说明
租户 ID <TenantID> 使用你在“创建 Microsoft Entra 服务主体”中保存的 tenant 值。
客户端 ID <ClientID> 使用你在“创建 Microsoft Entra 服务主体”中保存的 appId 值。
Scope <baseuri>/.default 使用基 URI 的值作为 baseuri
身份验证方法 客户端密码客户端密码(文本机密)证书 选项是客户端密码 使用你在“创建 Microsoft Entra 服务主体”中创建的 Microsoft Entra 应用程序的客户端密码作为“客户端密码”。 对于证书,你的证书使用你在“创建 Microsoft Entra 服务主体”中创建的 Microsoft Entra 应用程序已注册/将注册的公钥。

然后选择下一步

持久队列

引入模式设置为流式处理背压行为设置为持久队列时显示。

设置 说明
最大文件大小 1 MB(默认值) 关闭文件之前可达到的最大队列文件大小。 输入数字时,包含 KB 或 MB 等单位。
最大队列大小 5 GB(默认值) 在目标停止排队数据之前,队列可以在每个工作进程上使用的最大磁盘空间量。 具有 KB、MB 或 GB 等单位的正数必需值。 最大值为 1 TB。
队列文件路径 $CRIBL_HOME/state/queues(默认值) 永久性队列文件位置。 Cribl Stream 将 /<worker‑id>/<output‑id> 追加到此值。
压缩 无(默认值),Gzip 用于在关闭时压缩持久化的数据的压缩方法。
队列已满行为 阻止删除 选择当队列由于磁盘容量不足或已满而施加背压时要阻止还是删除事件。
严格排序 (默认值)或 如果设置为“是”,事件会基于先进先出的排序规则进行转发。 设置为“否”以在之前排队的事件之前发送新事件。
排放速率限制 (EPS) 0(默认值) 当“严格排序”设置为“否”时,将显示此选项,让你可以对从队列写入接收方设置限制速率(以每秒事件数为单位)。 限制已排队事件的排放速率可提升新的或活动连接的吞吐量。 “零”则表示禁用限制。
清除持久队列 NA 选择此项会删除当前在排队传送到你的目标的文件。 你需要确认此操作,因为排队的数据会被永久删除,而不会传送。

完成后选择“下一步”

处理设置

设置 说明
管道 <\defined_pipeline> 在使用此输出发送数据之前处理数据的可选管道。
系统字段 cribl_pipe(默认值)、cribl_hostcribl_inputcribl_outputcribl_routecribl_wp 在事件发送到目标之前自动添加到事件的字段列表。 支持通配符。

完成后选择“下一步”

Parquet 设置

当“数据格式”的选择为“Parquet”时显示。

选择 Parquet 将打开 Parquet 设置选项卡,以选择 Parquet 架构。

设置 说明
自动架构 “打开”或“关闭” 选择“开”以根据 Cribl Stream 写入的每个 Parquet 文件的事件生成 Parquet 架构。
Parquet 架构 下拉列表 当“自动架构”设置为“关”时显示,使你可以选择 parquet 架构。
Parquet 版本 1.0、2.4、2.6(默认值) 版本会确定支持的数据类型及其表示方式。
数据页版本 V1、V2(默认值) 数据页序列化格式。 如果 Parquet 读取器不支持 Parquet V2,请使用 V1。
组行限制 1000(默认值) 每个组可以包含的最大行数。
页面大小 1 MB(默认值) 页段的目标内存大小。 较低的值可以提高读取速度,而较高的值可以提升压缩。
记录无效行 “是”或“否” 如果选择“是”,并且“日志级别”设置为 debug,则输出由于数据格式不匹配而跳过的最多 20 个唯一行。
写入统计信息 (默认值)或 如果配置了 Parquet 统计信息查看工具,请选择“开”。
写入页索引 (默认值)或 如果 Parquet 读取器使用 Parquet 页面索引统计信息来启用页面跳过,请选择“开”。
写入页面校验和 “打开”或“关闭” 如果使用 Parquet 工具通过 Parquet 页面校验和检查数据完整性,请选择“开”。
元数据(可选)* 可作为键值对包含的目标文件元数据属性。

重试

当引入模式设置为流式处理时显示。

设置 说明
遵循重试间隔标头 “是”或“否” 是否遵循 Retry-After 标头。 启用后,只要标头指定延迟为 180 秒或更短,接收的 Retry-After 标头会优先于“重试”部分中的其他配置选项进行使用。 否则,将忽略 Retry-After 标头。
失败的 HTTP 请求的设置 HTTP 状态代码 如果无法连接,会自动重试的 HTTP 状态代码列表。 Cribl Stream 自动重试 429 个失败的请求。
重试超时 HTTP 请求 “打开”或“关闭” 设置后,将有更多重试行为设置可用。
预退避间隔 (ms) 1000 ms(默认值) 重试前的等待时间。
退避乘数 2 s(默认值) 用作指数退避算法的基数,以确定重试之间的间隔。
退避限制 (ms) 10,000 ms(默认值) 最终流式处理重试的最大退避间隔。 可能的值范围为 10,000 毫秒(10 秒)到 180,000 毫秒(3 分钟)。

完成后选择“下一步”

高级设置

从边栏中选择“高级设置”。 下面介绍了选择“批处理”时的高级设置:

设置 说明
立即刷新 (默认值)。 设置为“是”以替代 Kusto 中的数据聚合。 有关详细信息,请参阅 Kusto 引入库的最佳做法
成功时保留 Blob (默认值)。 设置为“是”以在引入完成后保留数据 Blob。
盘区标记 <\ExtentTag, ET2,...> 如果需要,为目标表的分区盘区设置标记。
通过标记值强制实施唯一性 选择“添加值”以指定一个 ingest-by 值列表,用于筛选传入的盘区,并放弃与列出的值匹配的盘区。 有关详细信息,请参阅盘区(数据分片)
报告级别 DoNotReportFailuresOnly(默认值)和 FailuresAndSuccesses 引入状态报告级别。
报告方法 Queue(默认值)、TableQueueAndTable(推荐)。 引入状态报告的目标。
附加字段 如果需要,添加更多配置属性以发送到引入服务。
暂存位置 /tmp(默认值) 在压缩文件并将其移动到最终目标之前缓冲文件的本地文件系统位置。 Cribl 建议使用一个稳定且高性能的位置。
文件名后缀表达式 .${C.env["CRIBL_WORKER_ID"]}.${__format}${__compression === "gzip" ? ".gz" : ""}(默认值) 用引号或反引号括起来的 JavaScript 表达式,用作输出文件名后缀。 format 可以是 JSONraw__compression 可以是 nonegzip。 六个字符的随机序列会追加到文件名的末尾,以防止它们被覆盖。
最大文件大小 (MB) 32 MB(默认值) 文件在关闭并移动到存储容器之前可以达到的最大未压缩输出文件大小。
最大文件打开时间(秒) 300 秒(默认值) 在文件关闭并移动到存储容器之前写入它的最大时间量(以秒为单位)。
最大文件空闲时间(秒) 30 秒(默认值) 在关闭非活动文件并将其移动到存储容器之前,保持它们打开的最大时间量(以秒为单位)。
最大打开文件数 100(默认值) 在关闭最早打开的文件并将其移动到存储容器之前,可同时保持打开的最大文件数。
最大并发文件部件数 1 (默认值) 可同时上传的最大文件部件数。 默认值为 1,最高为 10。 将该值设置为一会使其按顺序一次发送一个部件。
移除空暂存目录 (默认值)或 在 Cribl Stream 上打开时,将在移动文件后删除空暂存目录。 这可以防止孤立的空目录增加。 启用后,公开暂存清理期
暂存清理期 300(默认值) 启用“移除暂存目录”时删除空目录前的时间量(以秒为单位)。 当“移除空暂存目录”设置为“是”时显示。 最小值为 10 秒,最大值为 86,400 秒(每 24 小时)。
环境 当设为空(默认值)时,配置将在所有地方启用。 如果使用 GitOps,可以指定要在其中启用配置的 Git 分支。

完成后,选择“保存”。

连接配置

在打开的“连接配置”窗口中,选择“Passthru 连接”,然后选择“保存”。 连接器开始对数据进行排队。

确认数据引入

  1. 数据到达表后,通过检查行计数来确认数据传输:

    <Tablename> 
    | count
    
  2. 确认在过去五分钟内排队的引入:

    .show commands-and-queries 
    | where Database == "" and CommandType == "DataIngestPull" 
    | where LastUpdatedOn >= ago(5m)
    
  3. 确认引入进程中没有失败:

    • 对于批处理:
    .show ingestion failures
    
    • 对于流式处理:
    .show streamingingestion failures 
    | order by LastFailureOn desc
    
  4. 验证表中的数据:

    <TableName>
    | take 10
    

有关查询示例和指南,请参阅在 KQL 中写入查询Kusto 查询语言文档