从 Apache Flink 获取数据
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
Flink 连接器是可在任何 Flink 群集上运行的开源项目。 它实现了用于从 Flink 群集移动数据的数据接收器。 使用 Apache Flink 的连接器,可以构建面向数据驱动型方案(如机器学习 (ML)、提取-转换-加载 (ETL) 和 Log Analytics)的可缩放快速应用程序。
在本文中,您可以学习如何使用 Flink 连接器将数据从 Flink 发送到您的表。 您将创建表和数据映射,指示 Flink 将数据发送到表中,最后验证结果。
先决条件
- Azure 数据资源管理器群集和数据库。 在 Microsoft Fabric 中实时智能创建群集和数据库 或 KQL 数据库。
- 数据库中的目标表。 请参阅在 Azure 数据资源管理器中创建表或在实时智能中创建表
- Apache Flink 群集。 创建群集。
- Maven 3.x
获取 Flink 连接器
对于使用 Maven 管理依赖项的 Flink 项目,可通过将其添加为依赖项来集成 Flink Connector Core Sink For Azure Data Explorer:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
对于不使用 Maven 管理依赖项的项目,请克隆 Azure Data Explorer Connector For Apache Flink 的存储库并在本地对其进行构建。 此方法允许您使用命令 mvn clean install -DskipTests
手动将连接器添加至本地 Maven 存储库。
身份验证
您可从 Flink 进行身份验证,以使用 Microsoft Entra ID 应用程序。
此服务主体将是连接器用于将数据写入到 Kusto 中的表的标识。 你稍后将授予此服务主体访问 Kusto 资源所需的权限。
通过 Azure CLI 登录到你的 Azure 订阅。 然后在浏览器中进行身份验证。
az login
选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。
az account set --subscription YOUR_SUBSCRIPTION_GUID
创建服务主体。 在此示例中,服务主体名为
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
从返回的 JSON 数据中复制
appId
、password
、tenant
供将来使用。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
现已创建了 Microsoft Entra 应用程序和服务主体。
授予应用程序用户对数据库的权限:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
对表授予应用程序引入者或管理员权限。 所需的权限取决于所选的数据写入方法。 引入者权限足以满足 SinkV2,而 WriteAndSink 则需要管理员权限。
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
有关授权的详细信息,请参阅 Kusto 基于角色的访问控制。
从 Flink 写入数据
要从 Flink 写入数据:
导入需要的选项:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
使用您的应用程序进行身份验证。
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
配置接收器参数,例如数据库和表:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
您可以添加更多选项,如下表所述:
选项 说明 默认值 IngestionMappingRef 参考现有的引入映射。 FlushImmediately 立即刷新数据可能导致性能问题。 建议不要使用此方法。 BatchIntervalMs 控制刷新数据的频率。 30 秒 BatchSize 刷新前设置缓冲记录的批处理大小。 1,000 条记录 ClientBatchSizeLimit 指定引入前聚合数据的大小(以 MB 为单位)。 300 MB PollForIngestionStatus 如果为 True,则连接器会在数据刷新后轮询引入状态。 false DeliveryGuarantee 确定传递保证语义。 要实现一次性语义,请使用 WriteAheadSink。 AT_LEAST_ONCE 使用以下方法之一写入流数据:
- SinkV2:这是一个无状态选项,用于在检查点刷新数据,确保至少一次一致性。 建议使用此选项进行大容量数据引入。
- WriteAheadSink:此方法向 KustoSink 发出数据。 它与 Flink 的检查点系统集成,并提供精确的一次保证。 数据存储在 AbstractStateBackend 中,只有在检查点完成后才提交。
下面的示例使用 SinkV2。 若要使用 WriteAheadSink,请使用
buildWriteAheadSink
方法代替build
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
完整代码应如下所示:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
验证数据是否被引入
配置连接后,数据将发送至表。 以通过运行 KQL 查询来验证数据是否被引入。
运行以下查询以验证数据是否引入到表中:
<MyTable> | count
运行以下查询以查看数据:
<MyTable> | take 100