Compartir a través de


Ingesta de datos con Apache Flink en Azure Data Explorer

Apache Flink es un marco y un motor de procesamiento distribuido para cálculos con estado mediante flujos de datos enlazados y no enlazados.

El conector Flink es un proyecto de código abierto que se puede ejecutar en cualquier clúster de Flink. Implementa el receptor de datos para mover datos desde un clúster de Flink. Con el conector de Apache Flink, puede crear aplicaciones rápidas y escalables orientadas a escenarios controlados por datos, como, por ejemplo, el aprendizaje automático (ML), la extracción, transformación y carga de datos (ETL), y el análisis de registros.

En este artículo, aprenderá a usar el conector de Flink para enviar datos de Flink a la tabla. Creará una tabla y una asignación de datos, hará que Flink envíe los datos a la tabla y, a continuación, validará los resultados.

Requisitos previos

Para los proyectos de Flink que usan Maven para administrar las dependencias, integre el receptor de núcleo del conector de Flink para Azure Data Explorer agregándolo como dependencia:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

En el caso de los proyectos que no usan Maven para administrar las dependencias, clone el repositorio del conector de Azure Data Explorer para Apache Flink y créelo localmente. Este enfoque le permite agregar manualmente el conector al repositorio local de Maven mediante el comando mvn clean install -DskipTests.

Puede autenticarse desde Flink en mediante una aplicación de Microsoft Entra ID o una identidad administrada.

Esta entidad de servicio será la identidad utilizada por el conector para escribir datos en la tabla de Kusto. Más adelante concederemos permisos para que esta entidad de servicio acceda a los recursos de Kusto.

  1. Inicie sesión en su suscripción de Azure a través de la CLI de Azure. A continuación, realice la autenticación en el explorador.

    az login
    
  2. Elija la suscripción para hospedar la entidad de servicio. Este paso es necesario si tiene varias suscripciones.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Cree la entidad de servicio. En este ejemplo, la entidad de servicio se llama my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. En los datos JSON devueltos, copie los valores appId, password y tenant para usarlos posteriormente.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Ha creado una aplicación de Microsoft Entra y una entidad de servicio.

  1. Conceda permisos de usuario a la aplicación en la base de datos:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Conceda a la aplicación permisos de ingesta o de administrador en la tabla. Los permisos necesarios dependen del método de escritura de datos elegido. Los permisos de ingesta son suficientes para SinkV2, mientras que WriteAndSink requiere permisos de administrador.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Para obtener más información sobre la autorización, consulte Control de acceso basado en roles de Kusto.

Para escribir datos desde Flink:

  1. Importe las opciones necesarias:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Use la aplicación o la identidad administrada para autenticarse.

    Para la autenticación de aplicación:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Para la autenticación de identidad administrada:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configure los parámetros del receptor, como la base de datos y la tabla:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Puede agregar más opciones, que se describen en la tabla siguiente:

    Opción Descripción Valor predeterminado
    IngestionMappingRef Hace referencia a una asignación de ingesta existente.
    FlushImmediately Vacía los datos inmediatamente y puede causar problemas de rendimiento. No se recomienda este método.
    BatchIntervalMs Controla la frecuencia con la que se vacían los datos. 30 segundos
    BatchSize Establece el tamaño del lote para los registros de almacenamiento en búfer antes de vaciarlos. 1000 registros
    ClientBatchSizeLimit Especifica el tamaño en MB de datos agregados antes de la ingesta. 300 MB
    PollForIngestionStatus Si es true, el conector sondea el estado de ingesta después del vaciado de datos. false
    DeliveryGuarantee Determina la semántica de garantía de entrega. Para lograr exactamente una semántica, use WriteAheadSink. AT_LEAST_ONCE
  2. Escriba datos de streaming con uno de los métodos siguientes:

    • SinkV2: esta es una opción sin estado que vacía los datos en el punto de control, lo que garantiza al menos una coherencia. Se recomienda esta opción para la ingesta de datos de gran volumen.
    • WriteAheadSink: este método emite datos a KustoSink. Se integra con el sistema de puntos de comprobación de Flink y ofrece exactamente una garantía. Los datos se almacenan en un AbstractStateBackend y se confirman solo después de completar un punto de control.

    En el ejemplo siguiente se usa SinkV2. Para usar WriteAheadSink, use el método buildWriteAheadSink en lugar de build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

El código de finalización debería tener este aspecto:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Compruebe la ingesta de los datos

Una vez configurada la conexión, los datos se envían a la tabla. Puede comprobar que los datos se ingieren ejecutando una consulta KQL.

  1. Ejecute la consulta siguiente para comprobar que los datos se ingieren en la tabla:

    <MyTable>
    | count
    
  2. Ejecute la siguiente consulta para ver los datos:

    <MyTable>
    | take 100