你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将数据从 Apache Kafka 引入到 Azure 数据资源管理器中

Apache Kafka 是一个分布式流式处理平台,可用于构建实时流式处理数据管道,在系统或应用程序之间可靠地移动数据。 Kafka Connect 是一个工具,用于在 Apache Kafka 和其他数据系统之间以可缩放且可靠的方式流式传输数据。 Kusto Kafka 接收器充当来自 Kafka 的连接器,并且不需要使用代码。 从 Git 存储库Confluent 连接器中心下载接收器连接器 jar。

本文演示了如何通过 Kafka 并使用自包含的 Docker 安装程序来引入数据,从而简化 Kafka 群集和 Kafka 连接器群集设置。

有关详细信息,请参阅连接器 Git 存储库版本具体信息

先决条件

创建 Microsoft Entra 服务主体

Microsoft Entra 服务主体可以通过 Azure 门户或通过编程方式进行创建,如以下示例所示。

此服务主体是连接器用于将数据写入到 Kusto 中的表的标识。 授予此服务主体访问 Kusto 资源所需的权限。

  1. 通过 Azure CLI 登录到你的 Azure 订阅。 然后在浏览器中进行身份验证。

    az login
    
  2. 选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 创建服务主体。 在此示例中,服务主体名为 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 从返回的 JSON 数据中复制 appIdpasswordtenant 供将来使用。

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

现已创建了 Microsoft Entra 应用程序和服务主体。

创建目标表

  1. 使用以下命令从查询环境中创建一个名为 Storms 的表:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. 使用以下命令为引入的数据创建对应的表映射 Storms_CSV_Mapping

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. 针对可配置的排队引入延迟在表中创建引入批处理策略

    提示

    引入批处理策略是一个性能优化器,包含三个参数。 满足第一个条件将触发到表的引入。

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. 使用创建 Microsoft Entra 服务主体中的服务主体来授予使用数据库的权限。

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

运行实验室

以下实验室旨在为你提供开始创建数据、设置 Kafka 连接器以及流式传输此数据的体验。 然后,你可以查看引入的数据。

克隆 git 存储库

克隆实验室的 git 存储库

  1. 在计算机上创建一个本地目录。

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. 克隆存储库。

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

克隆的存储库的内容

运行以下命令以列出克隆的存储库的内容:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

此搜索的该结果是:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

查看克隆的存储库中的文件

以下各部分介绍了文件树中的文件的重要部分。

adx-sink-config.json

此文件包含 Kusto 接收器属性文件,将在其中更新特定配置详细信息:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

根据你的设置替换以下属性的值:aad.auth.authorityaad.auth.appidaad.auth.appkeykusto.tables.topics.mapping(数据库名称)、kusto.ingestion.urlkusto.query.url

连接器 - Dockerfile

此文件包含用于为连接器实例生成 docker 映像的命令。 它包括 git 存储库版本目录中的连接器下载。

Storm-events-producer 目录

此目录包含一个用于读取本地“StormEvents.csv”文件并将数据发布到 Kafka 主题的 Go 程序。

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

启动容器

  1. 在终端启动容器:

    docker-compose up
    

    生成者应用程序开始向 storm-events 主题发送事件。 你应当会看到类似于以下日志的日志:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. 若要检查日志,请在单独的终端中运行以下命令:

    docker-compose logs -f | grep kusto-connect
    

启动连接器

使用 Kafka Connect REST 调用来启动连接器。

  1. 在单独的终端中,使用以下命令启动接收器任务:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. 若要检查状态,请在单独的终端中运行以下命令:

    curl http://localhost:8083/connectors/storm/status
    

连接器开始对引入进程进行列队。

注意

如果有日志连接器问题,请创建问题

托管的标识

默认情况下,Kafka 连接器在引入期间使用应用程序方法进行身份验证。 若要使用托管标识进行身份验证,请执行以下操作:

  1. 为群集分配托管标识,并授予存储帐户读取权限。 有关详细信息,请参阅使用托管标识身份验证引入数据

  2. adx-sink-config.json 文件中,将 aad.auth.strategy 设置为 managed_identity,并确保将 aad.auth.appid 设置为托管标识客户端(应用程序)ID。

  3. 使用专用实例元数据服务令牌,而不是 Microsoft Entra 服务主体

注意

使用托管标识时,appIdtenant 将从调用站点的上下文推断,并且不需要 password

查询和查看数据

确认数据引入

  1. 数据到达 Storms 表后,通过检查行计数来确认数据的传输:

    Storms 
    | count
    
  2. 确认引入进程中没有失败:

    .show ingestion failures
    

    看到数据后,请尝试一些查询。

查询数据

  1. 若要查看所有记录,请运行以下查询

    Storms
    | take 10
    
  2. 使用 whereproject 来筛选特定数据:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. 使用 summarize 运算符:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    已连接的 Kafka 查询柱形图结果的屏幕截图。

如需更多查询示例和指导,请参阅在 KQL 中编写查询Kusto 查询语言文档

重置

若要进行重置,请执行以下步骤:

  1. 停止容器 (docker-compose down -v)
  2. 删除 (drop table Storms)
  3. 重新创建 Storms
  4. 重新创建表映射
  5. 重启容器 (docker-compose up)

清理资源

若要删除 Azure 数据资源管理器资源,请使用 az kusto cluster delete(kusto 扩展)az kusto database delete(kusto 扩展)

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

还可以通过 Azure 门户删除群集和数据库。 有关详细信息,请参阅删除 Azure 数据资源管理器群集在 Azure 数据资源管理器中删除数据库

优化 Kafka 接收器连接器

优化 Kafka 接收器连接器以使用引入批处理策略

  • 优化 Kafka 接收器 flush.size.bytes 大小限制,从 1 MB 开始,以 10 MB 或 100 MB 的增量增加。
  • 使用 Kafka 接收器时,数据将聚合两次。 在连接器端上,数据根据刷新设置进行聚合,而在服务端上,数据根据批处理策略进行聚合。 如果批处理时间太短,使连接器和服务都无法引入数据,则必须增加批处理时间。 将批处理大小设置为 1 GB,并根据需要以 100 MB 的增量增加或减少。 例如,如果刷新大小为 1 MB,批处理策略大小为 100 MB,则 Kafka 接收器连接器会将数据聚合为 100 MB 的批。 然后,服务将引入该批处理。 如果批处理策略时间为 20 秒,并且 Kafka 接收器连接器在 20 秒期间刷新 50 MB,则服务将引入 50 MB 的批。
  • 可通过添加实例和 Kafka 分区进行缩放。 将 tasks.max 增加到分区的数目。 如果有足够的数据生成 blob,则创建一个分区,其大小与 flush.size.bytes 设置的大小相同。 如果 blob 较小,批处理会在达到时间限制时进行处理,因此分区不会收到足够的吞吐量。 大量的分区意味着会产生更多处理开销。