Поделиться через


Получение данных из Apache Kafka в Azure Data Explorer

Apache Kafka — это распределенная платформа потоковой передачи для создания конвейеров потоковой передачи в режиме реального времени, которые надежно перемещают данные между системами или приложениями. Kafka Connect — это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Приемник Kusto Kafka служит соединителем из Kafka и не требует использования кода. Скачайте jar-файл соединителя приемника из репозитория Git или Концентратора соединителя Confluent.

В этой статье показано, как прием данных с помощью Kafka с помощью автономной установки Docker упростить настройку кластера Kafka и кластера соединителя Kafka.

Дополнительные сведения см. в разделе Репозиторий Git и Сведения о версиях для соединителя.

Необходимые компоненты

Создание субъекта-службы Microsoft Entra

Субъект-служба Microsoft Entra можно создать с помощью портал Azure или программы, как показано в следующем примере.

Этот субъект-служба — это удостоверение, используемое соединителем для записи данных таблицы в Kusto. Вы предоставляете этому субъекту-службе разрешения для доступа к ресурсам Kusto.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращаемых данных JSON скопируйте appIdpasswordданные и tenant для дальнейшего использования.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

Создание целевой таблицы

  1. В среде запроса создайте таблицу Storms с помощью следующей команды:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Создайте соответствующее сопоставление таблицы Storms_CSV_Mapping для загруженных данных с помощью следующей команды:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Создайте политику пакетного приема в таблице для настраиваемой задержки приема в очереди.

    Совет

    Политика пакетной обработки приема — это оптимизатор производительности, включающий три параметра. Первое условие удовлетворяет приему в таблицу.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Используйте субъект-службу из создания субъекта-службы Microsoft Entra, чтобы предоставить разрешение на работу с базой данных.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Запуск лаборатории

Следующая лаборатория предназначена для создания данных, настройки соединителя Kafka и потоковой передачи этих данных. Затем вы можете просмотреть полученные данные.

Клонировать репозиторий git

Клонировать репозиторий git лаборатории.

  1. Создайте локальный каталог на вашем компьютере.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Клонирование репозитория.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Содержание клонированного репозитория

Выполните следующую команду, чтобы вывести список содержимого клонированного репозитория:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Вот результат этого поиска:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Просмотрите файлы в клонированном репозитория

В следующих разделах описываются важные части файлов в дереве файлов.

adx-sink-config.json

Этот файл содержит файл свойств приемника Kusto, в котором обновляются определенные сведения о конфигурации:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Замените значения следующих атрибутов в соответствии с настройкой: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (имя базы данных) kusto.ingestion.urlи kusto.query.url.

Соединитель — Dockerfile

В этом файле есть команды для создания образа докера для экземпляра соединителя. Он включает загрузку соединителя из каталога выпуска репозитория git.

Каталог Storm-events-producer

В этом каталоге находится программа Go, которая считывает локальный файл StormEvents.csv и публикует данные в теме Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Запуск контейнеров

  1. В терминале запустите контейнеры:

    docker-compose up
    

    Приложение-производитель начинает отправку событий в storm-events раздел. Вы должны увидеть журналы, похожие на следующие журналы:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Чтобы проверить журналы, выполните следующую команду в отдельном терминале:

    docker-compose logs -f | grep kusto-connect
    

Запуск соединителя

Используйте REST-вызов Kafka Connect для запуска соединителя.

  1. В отдельном терминале запустите задачу приемника с помощью следующей команды:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Чтобы проверить статус, запустите следующую команду в отдельном терминале:

    curl http://localhost:8083/connectors/storm/status
    

Соединитель запускает процессы приема в очередь.

Примечание.

Если у вас возникли проблемы с подключением к соединителю создайте вопрос.

Управляемое удостоверение

По умолчанию соединитель Kafka использует метод приложения для проверки подлинности во время приема. Для проверки подлинности с помощью управляемого удостоверения:

  1. Назначьте кластер управляемому удостоверению и предоставьте учетным записям хранения разрешения на чтение. Дополнительные сведения см. в разделе приема данных с помощью проверки подлинности управляемого удостоверения.

  2. В файле adx-sink-config.json установите aad.auth.strategy managed_identity и убедитесь, что aad.auth.appid он имеет идентификатор клиента управляемого удостоверения (приложения).

  3. Используйте маркер службы метаданных частного экземпляра вместо субъекта-службы Microsoft Entra.

Примечание.

При использовании управляемого удостоверения appId и tenant выводится из контекста сайта вызова и password не требуется.

Запросить и просмотреть данные

Подтвердите получение данных

  1. После поступления данных в Storms таблицу подтвердите передачу данных, проверив количество строк:

    Storms 
    | count
    
  2. Убедитесь, что в процессе приема нет сбоев:

    .show ingestion failures
    

    Как только вы увидите данные, попробуйте выполнить несколько запросов.

Запрос данных

  1. Чтобы увидеть все записи, выполните следующий запрос:

    Storms
    | take 10
    
  2. Используйте where и project для фильтрации определенных данных:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Используйте оператор summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Снимок экрана: результаты диаграммы столбцов запроса Kafka.

Дополнительные примеры запросов и рекомендации см. в документации по написанию запросов в KQL и язык запросов Kusto документации.

Reset

Для сброса выполните следующие действия.

  1. Остановка контейнеров (docker-compose down -v)
  2. Delete (drop table Storms)
  3. Повторное создание таблицы Storms
  4. Повторное создание сопоставления таблиц
  5. Перезапуск контейнеров (docker-compose up)

Очистка ресурсов

Чтобы удалить ресурсы Azure Data Explorer, используйте az kusto cluster delete (kusto extension) или az kusto database delete (kusto extension):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Вы также можете удалить кластер и базу данных с помощью портал Azure. Дополнительные сведения см. в статье "Удаление кластера Azure Data Explorer" и удаление базы данных в Azure Data Explorer.

Настройка соединителя Kafka Sink

Настройте соединитель Kafka Sink для работы с политикой пакетной обработки приема данных:

  • Настройте предельный размер flush.size.bytes для Kafka Sink начиная с 1 МБ с шагом приращения 10 МБ или 100 МБ.
  • При использовании Kafka Sink данные агрегируются дважды. Данные на стороне соединителя агрегируются в соответствии с параметрами очистки и на стороне службы в соответствии с политикой пакетной обработки. Если время пакетной обработки слишком короткое, поэтому данные не могут быть приняты соединителем и службой, необходимо увеличить время пакетной обработки. Установите размер пакетной обработки в 1 ГБ. При необходимости его можно увеличить или уменьшить с шагом 100 МБ. Например, если размер очистки составляет 1 МБ, а размер политики пакетной обработки составляет 100 МБ, соединитель приемника Kafka объединяет данные в пакет размером 100 МБ. Затем этот пакет будет приемлен службой. Если время политики пакетной обработки составляет 20 секунд, а соединитель Приемника Kafka сбрасывает 50 МБ в 20-секундный период, служба выполняет прием пакета размером 50 МБ.
  • Вы можете изменять масштаб, добавляя экземпляры и разделы Kafka. Увеличьте значение tasks.max до нужного числа разделов. Создайте раздел, если у вас достаточно данных для создания большого двоичного объекта, размер которого равен значению параметра flush.size.bytes. Если большой двоичный объект меньше, пакет обрабатывается при достижении предельного времени, поэтому секция не получает достаточно пропускной способности. Большое количество разделов приводит к увеличению времени на обработку.