Delen via


Integratie van Azure Data Explorer en Apache Flink®

Azure Data Explorer is een volledig beheerd, krachtige, big data analytics-platform waarmee u eenvoudig grote hoeveelheden gegevens in bijna realtime kunt analyseren.

ADX helpt gebruikers bij het analyseren van grote hoeveelheden gegevens uit streamingtoepassingen, websites, IoT-apparaten, enzovoort. Door Apache Flink te integreren met ADX, kunt u realtime gegevens verwerken en analyseren in ADX.

Voorwaarden

  1. Flink-cluster maken.

  2. ADX maken met database en tabel, indien nodig.

  3. Voeg ingestor-machtigingen toe voor de beheerde identiteit in Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Voer een voorbeeldprogramma uit waarin de Kusto-cluster-URI (Uniform Resource Identifier), de gebruikte database en beheerde identiteit worden gedefinieerd en de tabel waarnaar moet worden geschreven.

  5. Kloon het flink-connector-kusto-project: https://github.com/Azure/flink-connector-kusto.git

  6. Maak de tabel in ADX met behulp van de volgende opdracht

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Werk FlinkKustoSinkSample.java bestand bij met de juiste Kusto-cluster-URI, database en de beheerde identiteit die wordt gebruikt.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Bouw het project later met behulp van 'mvn clean package'

  8. Zoek het JAR-bestand met de naam 'samples-java-1.0-SNAPSHOT-shaded.jar' onder de map sample-java/target, upload dit JAR-bestand in de Flink UI en verzend de taak.

  9. Een query uitvoeren op de Kusto-tabel om de uitvoer te controleren

    schermopname toont een query op de Kusto-tabel om de uitvoer te controleren.

    Er is geen vertraging in het schrijven van de gegevens naar de Kusto-tabel van Flink.

Referentie