Integración de Azure Data Explorer y Apache Flink®
Azure Data Explorer es una plataforma de análisis de macrodatos totalmente administrada y de alto rendimiento que facilita el análisis de grandes volúmenes de datos casi en tiempo real.
ADX ayuda a los usuarios a analizar grandes volúmenes de datos de aplicaciones de streaming, sitios web, dispositivos IoT, etc. La integración de Apache Flink con ADX le ayuda a procesar datos en tiempo real y a analizarlos en ADX.
Requisitos previos
Pasos para usar Azure Data Explorer como receptor en Flink
Creación de ADX con una base de datos y tabla según sea necesario.
Agregue permisos de ingesta para la identidad administrada en Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Ejecute un programa de ejemplo que defina el URI del clúster de Kusto (identificador uniforme de recursos), la base de datos y la identidad administrada usadas, y la tabla en la que necesita escribir.
Clone el proyecto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git
Creación de la tabla en ADX mediante el siguiente comando
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Actualice el archivo FlinkKustoSinkSample.java con el URI de clúster de Kusto correcto, la base de datos y la identidad administrada usada.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Compilación posterior del proyecto mediante “mvn clean package”
Busque el archivo JAR denominado "samples-java-1.0-SNAPSHOT-shaded.jar" en la carpeta "sample-java/target", y cargue este archivo JAR en la interfaz de usuario de Flink y envíe el trabajo.
Consulta de la tabla Kusto para comprobar la salida
No hay ningún retraso al escribir los datos en la tabla de Kusto desde Flink.
Referencia
- Sitio web de Apache Flink
- Apache, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de Apache Software Foundation (ASF).