Dela via


Integrering av Azure Data Explorer och Apache Flink®

Azure Data Explorer är en fullständigt hanterad stordataanalysplattform med höga prestanda som gör det enkelt att analysera stora mängder data nästan i realtid.

ADX hjälper användare att analysera stora mängder data från strömmande program, webbplatser, IoT-enheter osv. Genom att integrera Apache Flink med ADX kan du bearbeta realtidsdata och analysera dem i ADX.

Förutsättningar

  1. Skapa Flink-kluster.

  2. Skapa ADX med databas och tabell efter behov.

  3. Lägg till ingestor-behörigheter för den hanterade identiteten i Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Kör ett exempelprogram som definierar Kusto-klustrets URI (Uniform Resource Identifier), databas och hanterad identitet som används och den tabell som den behöver skriva till.

  5. Klona projektet flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Skapa tabellen i ADX med hjälp av följande kommando

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Uppdatera FlinkKustoSinkSample.java fil med rätt Kusto-kluster-URI, databas och den hanterade identitet som används.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Skapa projektet senare med hjälp av "mvn clean package"

  8. Leta upp JAR-filen med namnet "samples-java-1.0-SNAPSHOT-shaded.jar" under mappen "sample-java/target" och ladda sedan upp JAR-filen i Flink-användargränssnittet och skicka jobbet.

  9. Sök i Kusto-tabellen för att verifiera utdata

    skärmbilden visar hur man kör en fråga på Kusto-tabellen för att verifiera utdata.

    Det finns ingen fördröjning i att skriva data till Kusto-tabellen från Flink.

Hänvisning