Partager via


Intégration d’Azure Data Explorer et Apache Flink®

Azure Data Explorer est une plateforme d’analytique Big Data entièrement managée et hautement performante qui facilite l’analyse de volumes élevés de données en quasi-temps réel.

ADX aide les utilisateurs à analyser de grands volumes de données à partir d’applications de streaming, de sites web, d’appareils IoT, etc. L’intégration d’Apache Flink à ADX vous aide à traiter les données en temps réel et à les analyser dans ADX.

Conditions préalables

  1. Créer un cluster Flink.

  2. Créer ADX avec des de base de données et une table selon les besoins.

  3. Ajoutez des autorisations d’ingestion pour l’identité managée dans Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Exécutez un exemple de programme définissant l’URI du cluster Kusto (Uniform Resource Identifier), la base de données et l’identité managée utilisées, et la table dans laquelle il doit écrire.

  5. Clonez le projet flink-connector-kusto : https://github.com/Azure/flink-connector-kusto.git

  6. Créer la table dans ADX à l’aide de la commande suivante

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Mettez à jour FlinkKustoSinkSample.java fichier avec l’URI de cluster Kusto approprié, la base de données et l’identité managée utilisée.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Compilez ensuite le projet en utilisant la commande “mvn clean package”

  8. Recherchez le fichier JAR nommé « samples-java-1.0-SNAPSHOT-shaded.jar » sous le dossier « sample-java/target », puis chargez ce fichier JAR dans l’interface utilisateur Flink et envoyez le travail.

  9. Interroger la table Kusto pour vérifier le résultat

    Une capture d'écran montre la requête de la table Kusto pour vérifier la sortie.

    Il n’y a aucun délai d’écriture des données dans la table Kusto à partir de Flink.

Référence