Sdílet prostřednictvím


Integrace Azure Data Exploreru a Apache Flinku®

Azure Data Explorer je plně spravovaná vysoce výkonná analytická platforma pro velké objemy dat, která umožňuje snadno analyzovat velké objemy dat téměř v reálném čase.

ADX pomáhá uživatelům analyzovat velké objemy dat ze streamovaných aplikací, webů, zařízení IoT atd. Integrace Apache Flinku s ADX pomáhá zpracovávat data v reálném čase a analyzovat je v ADX.

Předpoklady

  1. Vytvořte cluster Flink.

  2. Podle potřeby vytvořte ADX s databází a tabulkou.

  3. Přidejte oprávnění ingestoru pro spravovanou identitu v Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Spusťte ukázkový program definující identifikátor URI clusteru Kusto (Uniform Resource Identifier), použitou databázi a spravovanou identitu a tabulku, do které se musí zapisovat.

  5. Naklonujte projekt flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Vytvoření tabulky v ADX pomocí následujícího příkazu

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Aktualizujte soubor FlinkKustoSinkSample.java správným identifikátorem URI clusteru Kusto, databází a použitou spravovanou identitou.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Později sestavte projekt pomocí "čistého balíčku mvn".

  8. Vyhledejte soubor JAR s názvem samples-java-1.0-SNAPSHOT-shaded.jar ve složce sample-java/target a pak tento soubor JAR nahrajte do uživatelského rozhraní Flink a odešlete úlohu.

  9. Ověření výstupu dotazem na tabulku Kusto

    screenshot shows query the Kusto table to verify the output.

    Zápis dat do tabulky Kusto z Flinku nezpozdí.

Reference