Integración de Azure Data Explorer y Apache Flink®
Azure Data Explorer es una plataforma de análisis de macrodatos totalmente administrada y de alto rendimiento que facilita el análisis de grandes volúmenes de datos casi en tiempo real.
ADX ayuda a los usuarios a analizar grandes volúmenes de datos de aplicaciones de streaming, sitios web, dispositivos IoT, etc. La integración de Apache Flink con ADX le ayuda a procesar datos en tiempo real y a analizarlos en ADX.
Prerrequisitos
- creación de un clúster de Apache Flink en HDInsight en AKS
- Creación de de Azure Data Explorer
Pasos para usar Azure Data Explorer como receptor en Flink
Crear ADX con base de datos y tabla según sea necesario.
Agregue permisos de ingestor para la identidad administrada en Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Ejecute un programa de ejemplo que defina el URI del clúster de Kusto (identificador uniforme de recursos), la base de datos y la identidad administrada usadas, y la tabla en la que necesita escribir.
Clone el proyecto flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git
Creación de la tabla en ADX mediante el comando siguiente
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Actualice el archivo FlinkKustoSinkSample.java con el URI correcto del clúster de Kusto, la base de datos y la identidad administrada usada.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Después, compila el proyecto utilizando "mvn clean package"
Busque el archivo JAR denominado "samples-java-1.0-SNAPSHOT-shaded.jar" en la carpeta "sample-java/target" y cargue este archivo JAR en la interfaz de usuario de Flink y envíe el trabajo.
Consulta de la tabla Kusto para comprobar la salida
No hay ningún retraso al escribir los datos en la tabla Kusto por parte de Flink.
Referencia
- Sitio web de Apache Flink
- Los nombres de los proyectos de código abierto asociados Apache, Apache Flink, y Flink son marcas comerciales de la Apache Software Foundation (ASF).