Прием данных с помощью Apache Flink в Azure Data Explorer
Apache Flink — это платформа и распределенный обработчик обработки для вычислений с отслеживанием состояния по несвязанным и привязанным потокам данных.
Соединитель Flink — это проект открытый код, который может выполняться в любом кластере Flink. Он реализует приемник данных для перемещения данных из кластера Flink. С помощью соединителя Для Apache Flink можно создавать быстрые и масштабируемые приложения, предназначенные для сценариев на основе данных, например машинное обучение (ML), Extract-Transform-Load (ETL) и Log Analytics.
Из этой статьи вы узнаете, как использовать соединитель Flink для отправки данных из Flink в таблицу. Вы создаете сопоставление таблиц и данных, прямое Flink для отправки данных в таблицу, а затем проверяете результаты.
Необходимые компоненты
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данных или базу данных KQL в аналитике в режиме реального времени в Microsoft Fabric.
- Целевая таблица в базе данных. См . статью "Создание таблицы" в Azure Data Explorer или создание таблицы в аналитике в режиме реального времени
- Кластер Apache Flink. Создание кластера.
- Maven 3.x
Получение соединителя Flink
Для проектов Flink, использующих Maven для управления зависимостями, интегрируйте приемник Flink Connector Core для Azure Data Explorer , добавив его в качестве зависимости:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
Для проектов, которые не используют Maven для управления зависимостями, клонируйте репозиторий для соединителя Azure Data Explorer для Apache Flink и создайте его локально. Этот подход позволяет вручную добавить соединитель в локальный репозиторий Maven с помощью команды mvn clean install -DskipTests
.
Вы можете пройти проверку подлинности из Flink, используя приложение идентификатора Microsoft Entra или управляемое удостоверение.
Этот субъект-служба будет удостоверением, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставьте этому субъекту-службе разрешения для доступа к ресурсам Kusto.
Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.
az login
Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Создайте субъект-службу. В этом примере принципал службы называется
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Из возвращаемых данных JSON скопируйте
appId
password
данные иtenant
для дальнейшего использования.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Вы создали приложение Microsoft Entra и субъект-службу.
Предоставьте пользователю приложения разрешения на базу данных:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Предоставьте приложению разрешения ingestor или администратора в таблице. Необходимые разрешения зависят от выбранного метода записи данных. Разрешения Ingestor достаточно для SinkV2, в то время как WriteAndSink требует разрешений администратора.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Дополнительные сведения об авторизации см. в разделе "Управление доступом на основе ролей Kusto".
Запись данных из Flink
Запись данных из Flink:
Импортируйте необходимые параметры:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Используйте приложение или управляемое удостоверение для проверки подлинности.
Для проверки подлинности приложения:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Для проверки подлинности управляемого удостоверения:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Настройте параметры приемника, такие как база данных и таблица:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Дополнительные параметры можно добавить, как описано в следующей таблице:
Вариант Описание Значение по умолчанию IngestionMappingRef Ссылается на существующее сопоставление приема. FlushImmediately Немедленно сбрасывает данные и может привести к проблемам с производительностью. Этот метод не рекомендуется. BatchIntervalMs Определяет частоту очистки данных. 30 секунд BatchSize Задает размер пакета для буферизации записей перед очисткой. 1000 записей ClientBatchSizeLimit Задает размер в МБ агрегированных данных перед приемом. 300 МБ PollForIngestionStatus Если значение true, соединитель опрашивает состояние приема после очистки данных. false DeliveryGuarantee Определяет семантику гарантии доставки. Для достижения точной семантики используйте WriteAheadSink. AT_LEAST_ONCE Запись потоковых данных с помощью одного из следующих методов:
- SinkV2: это параметр без отслеживания состояния, который очищает данные на контрольной точке, обеспечивая по крайней мере один раз согласованность. Рекомендуется использовать этот параметр для приема больших объемов данных.
- WriteAheadSink: этот метод выдает данные в KustoSink. Она интегрирована с системой контрольных точек Flink и предлагает ровно один раз гарантии. Данные хранятся в abstractStateBackend и фиксируются только после завершения контрольной точки.
В следующем примере используется ПриемникV2. Чтобы использовать WriteAheadSink, используйте
buildWriteAheadSink
метод вместоbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Полный код должен выглядеть примерно так:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Убедитесь, что данные приема
После настройки подключения данные отправляются в таблицу. Вы можете убедиться, что данные будут приняты, выполнив запрос KQL.
Выполните следующий запрос, чтобы проверить прием данных в таблицу:
<MyTable> | count
Выполните следующий запрос, чтобы просмотреть данные:
<MyTable> | take 100