Compartir a través de


Integración de Azure Data Explorer y Apache Flink®

Azure Data Explorer es una plataforma de análisis de macrodatos totalmente administrada y de alto rendimiento que facilita el análisis de grandes volúmenes de datos casi en tiempo real.

ADX ayuda a los usuarios a analizar grandes volúmenes de datos de aplicaciones de streaming, sitios web, dispositivos IoT, etc. La integración de Apache Flink con ADX le ayuda a procesar datos en tiempo real y a analizarlos en ADX.

Requisitos previos

  1. Creación de clústeres de Flink.

  2. Creación de ADX con una base de datos y tabla según sea necesario.

  3. Agregue permisos de ingesta para la identidad administrada en Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Ejecute un programa de ejemplo que defina el URI del clúster de Kusto (identificador uniforme de recursos), la base de datos y la identidad administrada usadas, y la tabla en la que necesita escribir.

  5. Clone el proyecto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Creación de la tabla en ADX mediante el siguiente comando

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Actualice el archivo FlinkKustoSinkSample.java con el URI de clúster de Kusto correcto, la base de datos y la identidad administrada usada.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Compilación posterior del proyecto mediante “mvn clean package”

  8. Busque el archivo JAR denominado "samples-java-1.0-SNAPSHOT-shaded.jar" en la carpeta "sample-java/target", y cargue este archivo JAR en la interfaz de usuario de Flink y envíe el trabajo.

  9. Consulta de la tabla Kusto para comprobar la salida

    screenshot shows query the Kusto table to verify the output.

    No hay ningún retraso al escribir los datos en la tabla de Kusto desde Flink.

Referencia