Intégration d’Azure Data Explorer et Apache Flink®
Azure Data Explorer est une plateforme d’analytique Big Data entièrement managée et hautement performante qui facilite l’analyse de volumes élevés de données en quasi-temps réel.
ADX aide les utilisateurs à analyser de grands volumes de données à partir d’applications de streaming, de sites web, d’appareils IoT, etc. L’intégration d’Apache Flink à ADX vous aide à traiter les données en temps réel et à les analyser dans ADX.
Conditions préalables
- Créer un cluster Apache Flink sur HDInsight sur AKS
- Créer un Azure Data Explorer
Étapes d’utilisation d’Azure Data Explorer comme récepteur dans Flink
Créer ADX avec des de base de données et une table selon les besoins.
Ajoutez des autorisations d’ingestion pour l’identité managée dans Kusto.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Exécutez un exemple de programme définissant l’URI du cluster Kusto (Uniform Resource Identifier), la base de données et l’identité managée utilisées, et la table dans laquelle il doit écrire.
Clonez le projet flink-connector-kusto : https://github.com/Azure/flink-connector-kusto.git
Créer la table dans ADX à l’aide de la commande suivante
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Mettez à jour FlinkKustoSinkSample.java fichier avec l’URI de cluster Kusto approprié, la base de données et l’identité managée utilisée.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
Compilez ensuite le projet en utilisant la commande “mvn clean package”
Recherchez le fichier JAR nommé « samples-java-1.0-SNAPSHOT-shaded.jar » sous le dossier « sample-java/target », puis chargez ce fichier JAR dans l’interface utilisateur Flink et envoyez le travail.
Interroger la table Kusto pour vérifier le résultat
Il n’y a aucun délai d’écriture des données dans la table Kusto à partir de Flink.
Référence
- Site Web Apache Flink
- Apache, Apache Flink, Flink et les noms de projets open source associés sont marques de commerce du Apache Software Foundation (ASF).