共用方式為


整合 Azure 數據總管和 Apache Flink®

Azure 數據總管是完全受控、高效能、巨量數據分析平臺,可讓您輕鬆地近乎即時地分析大量數據。

ADX 可協助使用者分析來自串流應用程式、網站、IoT 裝置等大量數據。Apache Flink 與 ADX 整合可協助您處理實時數據,並在 ADX 中進行分析。

先決條件

  1. 建立 Flink 叢集

  2. 視需要建立具有資料庫 和數據表的 ADX。

  3. 在 Kusto 中新增受控識別的引入器許可權。

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. 執行定義 Kusto 叢集 URI(統一資源識別符)、資料庫和受控識別所使用的範例程式,以及它必須寫入的數據表。

  5. 複製 flink-connector-kusto 專案:https://github.com/Azure/flink-connector-kusto.git

  6. 使用下列命令在ADX中建立資料表

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 使用正確的 Kusto 叢集 URI、資料庫和使用的受控識別來更新FlinkKustoSinkSample.java檔案。

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    稍後使用 「mvn clean package」 建置專案

  8. 在 'sample-java/target' 資料夾下找出名為 'samples-java-1.0-SNAPSHOT-shaded.jar' 的 JAR 檔案,然後在 Flink UI 中上傳此 JAR 檔案並提交作業。

  9. 查詢 Kusto 資料表以確認輸出

    螢幕快照顯示查詢 Kusto 資料表以確認輸出。

    從 Flink 將數據寫入 Kusto 資料表並無延遲。

參考