Поделиться через


Прием данных с помощью Apache Flink в Azure Data Explorer

Apache Flink — это платформа и распределенный обработчик обработки для вычислений с отслеживанием состояния по несвязанным и привязанным потокам данных.

Соединитель Flink — это проект открытый код, который может выполняться в любом кластере Flink. Он реализует приемник данных для перемещения данных из кластера Flink. С помощью соединителя Для Apache Flink можно создавать быстрые и масштабируемые приложения, предназначенные для сценариев на основе данных, например машинное обучение (ML), Extract-Transform-Load (ETL) и Log Analytics.

Из этой статьи вы узнаете, как использовать соединитель Flink для отправки данных из Flink в таблицу. Вы создаете сопоставление таблиц и данных, прямое Flink для отправки данных в таблицу, а затем проверяете результаты.

Необходимые компоненты

Для проектов Flink, использующих Maven для управления зависимостями, интегрируйте приемник Flink Connector Core для Azure Data Explorer , добавив его в качестве зависимости:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Для проектов, которые не используют Maven для управления зависимостями, клонируйте репозиторий для соединителя Azure Data Explorer для Apache Flink и создайте его локально. Этот подход позволяет вручную добавить соединитель в локальный репозиторий Maven с помощью команды mvn clean install -DskipTests.

Вы можете пройти проверку подлинности из Flink, используя приложение идентификатора Microsoft Entra или управляемое удостоверение.

Этот субъект-служба будет удостоверением, используемым соединителем для записи данных таблицы в Kusto. Позже вы предоставьте этому субъекту-службе разрешения для доступа к ресурсам Kusto.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращаемых данных JSON скопируйте appIdpasswordданные и tenant для дальнейшего использования.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

  1. Предоставьте пользователю приложения разрешения на базу данных:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Предоставьте приложению разрешения ingestor или администратора в таблице. Необходимые разрешения зависят от выбранного метода записи данных. Разрешения Ingestor достаточно для SinkV2, в то время как WriteAndSink требует разрешений администратора.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Дополнительные сведения об авторизации см. в разделе "Управление доступом на основе ролей Kusto".

Запись данных из Flink:

  1. Импортируйте необходимые параметры:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Используйте приложение или управляемое удостоверение для проверки подлинности.

    Для проверки подлинности приложения:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Для проверки подлинности управляемого удостоверения:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Настройте параметры приемника, такие как база данных и таблица:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Дополнительные параметры можно добавить, как описано в следующей таблице:

    Вариант Описание Значение по умолчанию
    IngestionMappingRef Ссылается на существующее сопоставление приема.
    FlushImmediately Немедленно сбрасывает данные и может привести к проблемам с производительностью. Этот метод не рекомендуется.
    BatchIntervalMs Определяет частоту очистки данных. 30 секунд
    BatchSize Задает размер пакета для буферизации записей перед очисткой. 1000 записей
    ClientBatchSizeLimit Задает размер в МБ агрегированных данных перед приемом. 300 МБ
    PollForIngestionStatus Если значение true, соединитель опрашивает состояние приема после очистки данных. false
    DeliveryGuarantee Определяет семантику гарантии доставки. Для достижения точной семантики используйте WriteAheadSink. AT_LEAST_ONCE
  2. Запись потоковых данных с помощью одного из следующих методов:

    • SinkV2: это параметр без отслеживания состояния, который очищает данные на контрольной точке, обеспечивая по крайней мере один раз согласованность. Рекомендуется использовать этот параметр для приема больших объемов данных.
    • WriteAheadSink: этот метод выдает данные в KustoSink. Она интегрирована с системой контрольных точек Flink и предлагает ровно один раз гарантии. Данные хранятся в abstractStateBackend и фиксируются только после завершения контрольной точки.

    В следующем примере используется ПриемникV2. Чтобы использовать WriteAheadSink, используйте buildWriteAheadSink метод вместо build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Полный код должен выглядеть примерно так:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Убедитесь, что данные приема

После настройки подключения данные отправляются в таблицу. Вы можете убедиться, что данные будут приняты, выполнив запрос KQL.

  1. Выполните следующий запрос, чтобы проверить прием данных в таблицу:

    <MyTable>
    | count
    
  2. Выполните следующий запрос, чтобы просмотреть данные:

    <MyTable>
    | take 100