整合 Azure 數據總管和 Apache Flink®
Azure 數據總管是完全受控、高效能、巨量數據分析平臺,可讓您輕鬆地近乎即時地分析大量數據。
ADX 可協助使用者分析來自串流應用程式、網站、IoT 裝置等大量數據。Apache Flink 與 ADX 整合可協助您處理實時數據,並在 ADX 中進行分析。
先決條件
在 Flink 中使用 Azure 資料總管作為匯出端的步驟
視需要建立具有資料庫 和數據表的 ADX。
在 Kusto 中新增受控識別的引入器許可權。
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
執行定義 Kusto 叢集 URI(統一資源識別符)、資料庫和受控識別所使用的範例程式,以及它必須寫入的數據表。
複製 flink-connector-kusto 專案:https://github.com/Azure/flink-connector-kusto.git
使用下列命令在ADX中建立資料表
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
使用正確的 Kusto 叢集 URI、資料庫和使用的受控識別來更新FlinkKustoSinkSample.java檔案。
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
稍後使用 「mvn clean package」 建置專案
在 'sample-java/target' 資料夾下找出名為 'samples-java-1.0-SNAPSHOT-shaded.jar' 的 JAR 檔案,然後在 Flink UI 中上傳此 JAR 檔案並提交作業。
查詢 Kusto 資料表以確認輸出
從 Flink 將數據寫入 Kusto 資料表並無延遲。
參考
- Apache Flink 網站
- Apache、Apache Flink、Flink 以及相關的開放原始碼專案名稱是 Apache Software Foundation(ASF) 的商標。