Udostępnij za pośrednictwem


Integracja usług Azure Data Explorer i Apache Flink®

Azure Data Explorer to w pełni zarządzana, wysokiej wydajności platforma do analizy danych big data, która ułatwia analizowanie dużych ilości danych niemal w czasie rzeczywistym.

Usługa ADX ułatwia użytkownikom analizowanie dużych ilości danych z aplikacji przesyłania strumieniowego, witryn internetowych, urządzeń IoT itp. Integracja platformy Apache Flink z usługą ADX pomaga przetwarzać dane w czasie rzeczywistym i analizować je w usłudze ADX.

Wymagania wstępne

  1. Utwórz klaster Flink.

  2. Utwórz usługę ADX z bazą danych i tabelą zgodnie z wymaganiami.

  3. Dodaj uprawnienia ingestor dla tożsamości zarządzanej w usłudze Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Uruchom przykładowy program definiujący identyfikator URI klastra Kusto (uniform Resource Identifier), bazę danych i używaną tożsamość zarządzaną oraz tabelę, do których musi zapisywać dane.

  5. Sklonuj projekt flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Utwórz tabelę w usłudze ADX przy użyciu następującego polecenia

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Zaktualizuj plik FlinkKustoSinkSample.java przy użyciu odpowiedniego identyfikatora URI klastra Kusto, bazy danych i używanej tożsamości zarządzanej.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Później skompiluj projekt przy użyciu "pakietu clean mvn"

  8. Znajdź plik JAR o nazwie "samples-java-1.0-SNAPSHOT-shaded.jar" w folderze "sample-java/target", a następnie przekaż ten plik JAR w interfejsie użytkownika Flink i prześlij zadanie.

  9. Wykonywanie zapytań względem tabeli Kusto w celu zweryfikowania danych wyjściowych

    screenshot shows query the Kusto table to verify the output.

    Nie ma opóźnienia podczas zapisywania danych w tabeli Kusto z funkcji Flink.

Odwołanie