Compartir a través de


Integración de Azure Data Explorer y Apache Flink®

Azure Data Explorer es una plataforma de análisis de macrodatos totalmente administrada y de alto rendimiento que facilita el análisis de grandes volúmenes de datos casi en tiempo real.

ADX ayuda a los usuarios a analizar grandes volúmenes de datos de aplicaciones de streaming, sitios web, dispositivos IoT, etc. La integración de Apache Flink con ADX le ayuda a procesar datos en tiempo real y a analizarlos en ADX.

Prerrequisitos

  1. Crear clúster de Flink.

  2. Crear ADX con base de datos y tabla según sea necesario.

  3. Agregue permisos de ingestor para la identidad administrada en Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Ejecute un programa de ejemplo que defina el URI del clúster de Kusto (identificador uniforme de recursos), la base de datos y la identidad administrada usadas, y la tabla en la que necesita escribir.

  5. Clone el proyecto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Creación de la tabla en ADX mediante el comando siguiente

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Actualice el archivo FlinkKustoSinkSample.java con el URI correcto del clúster de Kusto, la base de datos y la identidad administrada usada.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Después, compila el proyecto utilizando "mvn clean package"

  8. Busque el archivo JAR denominado "samples-java-1.0-SNAPSHOT-shaded.jar" en la carpeta "sample-java/target" y cargue este archivo JAR en la interfaz de usuario de Flink y envíe el trabajo.

  9. Consulta de la tabla Kusto para comprobar la salida

    captura de pantalla que muestra la consulta de la tabla Kusto para comprobar la salida.

    No hay ningún retraso al escribir los datos en la tabla Kusto por parte de Flink.

Referencia