Intégration d’Azure Data Explorer et d’Apache Flink®
Azure Data Explorer est une plateforme d’analytique Big Data très performante et complètement managée, qui facilite l’analyse de grands volumes de données quasiment en temps réel.
ADX aide les utilisateurs à analyser de gros volumes de données provenant d'applications de streaming, de sites Web, d'appareils IoT, etc. L’intégration d’Apache Flink avec ADX vous aide à traiter les données en temps réel et à les analyser dans ADX.
Prérequis
Étapes pour utiliser l’explorateur de données Azure comme récepteur dans Flink
Créez ADX avec la base de données et la table selon les besoins.
Ajoutez des autorisations d’ingesteur pour l’identité gérée dans Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Exécutez un exemple de programme définissant l'URI (Uniform Resource Identifier) du cluster Kusto, la base de données et l'identité gérée utilisées, ainsi que la table dans laquelle il doit écrire.
Clonez le projet flink-connector-kusto : https://github.com/Azure/flink-connector-kusto.git
Créez la table dans ADX à l'aide de la commande suivante
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Mettez à jour le fichier FlinkKustoSinkSample.java avec l’URI du cluster Kusto approprié, la base de données et l’identité gérée utilisée.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Construisez plus tard le projet en utilisant « mvn clean package »
Localisez le fichier JAR nommé « samples-java-1.0-SNAPSHOT-shaded.jar » dans le dossier « sample-java/target », puis téléchargez ce fichier JAR dans l'interface utilisateur Flink et soumettez le travail.
Interrogez la table Kusto pour vérifier la sortie
Il n'y a aucun délai dans l'écriture des données dans la table Kusto depuis Flink.
Référence
- Site web Apache Flink
- Apache, Apache Flink, Flink et les noms de projet open source associés sont des marques de commerce d’Apache Software Foundation (ASF).