Получение данных из Apache Kafka в Azure Data Explorer
Apache Kafka — это распределенная платформа потоковой передачи для создания конвейеров потоковой передачи в режиме реального времени, которые надежно перемещают данные между системами или приложениями. Kafka Connect — это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Приемник Kusto Kafka служит соединителем из Kafka и не требует использования кода. Скачайте jar-файл соединителя приемника из репозитория Git или Концентратора соединителя Confluent.
В этой статье показано, как прием данных с помощью Kafka с помощью автономной установки Docker упростить настройку кластера Kafka и кластера соединителя Kafka.
Дополнительные сведения см. в разделе Репозиторий Git и Сведения о версиях для соединителя.
Необходимые компоненты
- Подписка Azure. Создайте бесплатную учетную запись Azure.
- Кластер и база данных Azure Data Explorer с политиками кэша и хранения по умолчанию.
- Azure CLI.
- Docker и Docker Compose.
Создание субъекта-службы Microsoft Entra
Субъект-служба Microsoft Entra можно создать с помощью портал Azure или программы, как показано в следующем примере.
Этот субъект-служба — это удостоверение, используемое соединителем для записи данных таблицы в Kusto. Вы предоставляете этому субъекту-службе разрешения для доступа к ресурсам Kusto.
Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.
az login
Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Создайте субъект-службу. В этом примере принципал службы называется
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Из возвращаемых данных JSON скопируйте
appId
password
данные иtenant
для дальнейшего использования.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Вы создали приложение Microsoft Entra и субъект-службу.
Создание целевой таблицы
В среде запроса создайте таблицу
Storms
с помощью следующей команды:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Создайте соответствующее сопоставление таблицы
Storms_CSV_Mapping
для загруженных данных с помощью следующей команды:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Создайте политику пакетного приема в таблице для настраиваемой задержки приема в очереди.
Совет
Политика пакетной обработки приема — это оптимизатор производительности, включающий три параметра. Первое условие удовлетворяет приему в таблицу.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Используйте субъект-службу из создания субъекта-службы Microsoft Entra, чтобы предоставить разрешение на работу с базой данных.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Запуск лаборатории
Следующая лаборатория предназначена для создания данных, настройки соединителя Kafka и потоковой передачи этих данных. Затем вы можете просмотреть полученные данные.
Клонировать репозиторий git
Клонировать репозиторий git лаборатории.
Создайте локальный каталог на вашем компьютере.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Клонирование репозитория.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Содержание клонированного репозитория
Выполните следующую команду, чтобы вывести список содержимого клонированного репозитория:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Вот результат этого поиска:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Просмотрите файлы в клонированном репозитория
В следующих разделах описываются важные части файлов в дереве файлов.
adx-sink-config.json
Этот файл содержит файл свойств приемника Kusto, в котором обновляются определенные сведения о конфигурации:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Замените значения следующих атрибутов в соответствии с настройкой: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(имя базы данных) kusto.ingestion.url
и kusto.query.url
.
Соединитель — Dockerfile
В этом файле есть команды для создания образа докера для экземпляра соединителя. Он включает загрузку соединителя из каталога выпуска репозитория git.
Каталог Storm-events-producer
В этом каталоге находится программа Go, которая считывает локальный файл StormEvents.csv и публикует данные в теме Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Запуск контейнеров
В терминале запустите контейнеры:
docker-compose up
Приложение-производитель начинает отправку событий в
storm-events
раздел. Вы должны увидеть журналы, похожие на следующие журналы:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Чтобы проверить журналы, выполните следующую команду в отдельном терминале:
docker-compose logs -f | grep kusto-connect
Запуск соединителя
Используйте REST-вызов Kafka Connect для запуска соединителя.
В отдельном терминале запустите задачу приемника с помощью следующей команды:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Чтобы проверить статус, запустите следующую команду в отдельном терминале:
curl http://localhost:8083/connectors/storm/status
Соединитель запускает процессы приема в очередь.
Примечание.
Если у вас возникли проблемы с подключением к соединителю создайте вопрос.
Управляемое удостоверение
По умолчанию соединитель Kafka использует метод приложения для проверки подлинности во время приема. Для проверки подлинности с помощью управляемого удостоверения:
Назначьте кластер управляемому удостоверению и предоставьте учетным записям хранения разрешения на чтение. Дополнительные сведения см. в разделе приема данных с помощью проверки подлинности управляемого удостоверения.
В файле adx-sink-config.json установите
aad.auth.strategy
managed_identity
и убедитесь, чтоaad.auth.appid
он имеет идентификатор клиента управляемого удостоверения (приложения).Используйте маркер службы метаданных частного экземпляра вместо субъекта-службы Microsoft Entra.
Примечание.
При использовании управляемого удостоверения appId
и tenant
выводится из контекста сайта вызова и password
не требуется.
Запросить и просмотреть данные
Подтвердите получение данных
После поступления данных в
Storms
таблицу подтвердите передачу данных, проверив количество строк:Storms | count
Убедитесь, что в процессе приема нет сбоев:
.show ingestion failures
Как только вы увидите данные, попробуйте выполнить несколько запросов.
Запрос данных
Чтобы увидеть все записи, выполните следующий запрос:
Storms | take 10
Используйте
where
иproject
для фильтрации определенных данных:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Используйте оператор
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Дополнительные примеры запросов и рекомендации см. в документации по написанию запросов в KQL и язык запросов Kusto документации.
Reset
Для сброса выполните следующие действия.
- Остановка контейнеров (
docker-compose down -v
) - Delete (
drop table Storms
) - Повторное создание таблицы
Storms
- Повторное создание сопоставления таблиц
- Перезапуск контейнеров (
docker-compose up
)
Очистка ресурсов
Чтобы удалить ресурсы Azure Data Explorer, используйте az kusto cluster delete (kusto extension) или az kusto database delete (kusto extension):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
Вы также можете удалить кластер и базу данных с помощью портал Azure. Дополнительные сведения см. в статье "Удаление кластера Azure Data Explorer" и удаление базы данных в Azure Data Explorer.
Настройка соединителя Kafka Sink
Настройте соединитель Kafka Sink для работы с политикой пакетной обработки приема данных:
- Настройте предельный размер
flush.size.bytes
для Kafka Sink начиная с 1 МБ с шагом приращения 10 МБ или 100 МБ. - При использовании Kafka Sink данные агрегируются дважды. Данные на стороне соединителя агрегируются в соответствии с параметрами очистки и на стороне службы в соответствии с политикой пакетной обработки. Если время пакетной обработки слишком короткое, поэтому данные не могут быть приняты соединителем и службой, необходимо увеличить время пакетной обработки. Установите размер пакетной обработки в 1 ГБ. При необходимости его можно увеличить или уменьшить с шагом 100 МБ. Например, если размер очистки составляет 1 МБ, а размер политики пакетной обработки составляет 100 МБ, соединитель приемника Kafka объединяет данные в пакет размером 100 МБ. Затем этот пакет будет приемлен службой. Если время политики пакетной обработки составляет 20 секунд, а соединитель Приемника Kafka сбрасывает 50 МБ в 20-секундный период, служба выполняет прием пакета размером 50 МБ.
- Вы можете изменять масштаб, добавляя экземпляры и разделы Kafka. Увеличьте значение
tasks.max
до нужного числа разделов. Создайте раздел, если у вас достаточно данных для создания большого двоичного объекта, размер которого равен значению параметраflush.size.bytes
. Если большой двоичный объект меньше, пакет обрабатывается при достижении предельного времени, поэтому секция не получает достаточно пропускной способности. Большое количество разделов приводит к увеличению времени на обработку.
Связанный контент
- Узнайте больше об Архитектуре больших данных.
- Узнайте, как загрузить образцы данных в формате JSON в Azure Data Explorer.
- Дополнительные сведения см. в лабораториях Kafka: