你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 数据资源管理器和 Apache Flink® 的集成

Azure 数据资源管理器是一种完全托管的高性能、大数据分析平台,可让你轻松实时地分析大量数据。

ADX 可帮助用户分析来自流式处理应用程序、网站、IoT 设备等的大量数据。将 Apache Flink® 与 ADX 集成有助于在 ADX 中处理和分析实时数据。

先决条件

  1. 创建 Flink 群集

  2. 根据需要使用数据库和表创建 ADX

  3. 在 Kusto 中添加托管标识的引入者权限。

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. 运行定义 Kusto 群集 URI(统一资源标识符)、使用的数据库和托管标识以及需要写入的表的示例程序。

  5. 克隆 flink-connector-kusto 项目:https://github.com/Azure/flink-connector-kusto.git

  6. 使用以下命令在 ADX 中创建表

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. 使用正确的 Kusto 群集 URI、数据库和使用的托管标识更新 FlinkKustoSinkSample.java 文件。

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    稍后将使用“mvn clean package”生成项目

  8. 在“sample-java/target”文件夹下找到名为“samples-java-1.0-SNAPSHOT-shaded.jar”的 JAR 文件,然后在 Flink UI 中上传此 JAR 文件并提交作业。

  9. 查询 Kusto 表以验证输出

    screenshot shows query the Kusto table to verify the output.

    从 Flink 向 Kusto 表写入数据没有任何延迟。

参考