为 Azure 数据资源管理器创建事件中心数据连接
- 项目
在 Azure 数据资源管理器中,可以从事件中心(一个大数据流式处理平台和事件引入服务)引入数据。 事件中心每秒可以准实时处理数百万个事件。
在本文中,你将连接到事件中心并将数据引入 Azure 数据资源管理器。 有关从事件中心引入数据的概述,请参阅 Azure 事件中心数据连接。
先决条件
- Azure 订阅。 创建免费 Azure 帐户。
- Azure 数据资源管理器群集和数据库。 创建群集和数据库。
- 一个目标表。 创建表或使用现有表。
- 表的引入映射。
- 包含要引入的数据的事件中心。
创建事件中心数据连接
在本部分中,你将在事件中心和 Azure 数据资源管理器 表之间建立连接。 只要建立了此连接,数据就会从事件中心传输到目标表。 如果事件中心移动到其他资源或订阅,则需要更新或重新创建连接。
在 Azure 门户中,转到你的群集并选择“数据库”。 然后,选择包含目标表的数据库。
在左侧菜单中,选择“数据引入”。 然后,在顶部栏中选择“添加数据连接”。
在窗体中填写以下信息,然后选择“创建”。
设置 建议的值 字段说明 数据连接名称 test-hub-connection 要在 Azure 数据资源管理器中创建的连接的名称。 订阅 事件中心资源所在的订阅 ID。 事件中心命名空间 唯一的命名空间名称 先前选择的用于标识命名空间的名称。 事件中心 test-hub 你创建的事件中心。 使用者组 test-group 在创建的事件中心定义的使用者组。 事件系统属性 选择相关属性 事件中心系统属性。 如果每个事件消息有多个记录,则系统属性将添加到第一个记录中。 添加系统属性时,创建或更新表架构和映射以包括所选属性。 压缩 无 事件中心消息有效负载的压缩类型。 支持的压缩类型:None、Gzip。 托管标识(建议) 系统分配 由数据资源管理器群集用于从事件中心进行读取访问的托管标识。 建议使用托管标识来控制对事件中心的访问。
注意:
创建数据连接时:
* 系统分配的标识会自动创建(如果不存在)
* 为托管标识自动分配“Azure 事件中心数据接收方”角色,并将其添加到数据资源管理器群集。 建议验证是否已分配该角色,以及是否已将该标识添加到群集。注意
如果现有的数据连接未使用托管标识,我们建议将其更新为使用托管标识。
以下步骤将指导你通过 Azure 数据资源管理器 Web UI 中的引入向导创建事件中心连接。
注意
在 Azure 数据资源管理器 Web UI 的“数据”选项卡中,从“从事件中心引入数据”卡中选择“引入”。
“引入数据”窗口随即打开,其中的“目标”选项卡处于选中状态。 系统会自动填充“群集”和“数据库”字段。 可以从下拉菜单中选择其他群集或数据库。
在“表”下,选择“新建表”并输入新表的名称。 或者,使用现有表。
选择“下一步: 源”。
在“源类型”下,选择“事件中心” 。
在“数据连接”下,填写以下字段并选择“下一步: 架构”。
设置 建议的值 字段说明 订阅 事件中心资源所在的订阅 ID。 事件中心命名空间 标识你的命名空间的名称。 事件中心 要使用的事件中心。 数据连接名称 TestDataConnection 标识你的数据连接的名称。 使用者组 在你的事件中心内定义的使用者组。 压缩 事件中心消息有效负载的压缩类型。 事件系统属性 选择相关属性 事件中心系统属性。 如果每个事件消息有多个记录,则系统属性将添加到第一个记录中。 添加系统属性时,创建或更新表架构和映射以包括所选属性。 事件检索开始日期 协调世界时 (UTC) 数据连接将检索在“事件检索开始日期”之后创建的事件中心事件。 只能检索按照事件中心保留期保留的事件。 如果未指定“事件检索开始日期”,则默认时间是创建数据连接的时间。 设置引入策略。 如果为群集启用了流式处理,你可以选择“流式处理引入”。 如果未为群集启用流式处理,请设置“批处理时间”。 对于事件中心,建议的批处理时间为 30 秒。
选择“数据格式”。 对于 CSV 格式的数据,请选择“忽略第一条记录”以忽略文件的标题行。 对于 JSON 格式的数据,请选择“忽略数据格式错误”以引入 JSON 格式的数据,或者将其保留为未选中状态以引入 multijson 格式的数据。 选择“嵌套级别”以确定表列数据划分。
如果你在预览窗口中看到的数据不完整,则可能需要更多数据来创建包含所有必要数据字段的表。 使用以下命令从事件中心提取新数据:
- 丢弃显示的数据并提取新数据:丢弃显示的数据并搜索新事件。
- 提取更多数据:除已找到的事件外,还搜索更多事件。
注意
若要查看数据的预览,事件中心必须正在发送事件。
在完成时选择“下一步:摘要”。
在“从建立的事件中心持续引入”窗口中,当建立成功完成时,所有步骤将标有绿色打勾标记。
创建用于身份验证的 Azure AD 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。
运行以下代码。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID var clientSecret = "PlaceholderClientSecret"; //Client Secret var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; var authClient = ConfidentialClientApplicationBuilder.Create(clientId) .WithAuthority($"https://login.microsoftonline.com/{tenantId}") .WithClientSecret(clientSecret) .Build(); var result = authClient.AcquireTokenForClient(new[] { "https://management.core.windows.net/" }).ExecuteAsync().Result; var credentials = new TokenCredentials(result.AccessToken, result.TokenType); var kustoManagementClient = new KustoManagementClient(credentials) { SubscriptionId = subscriptionId }; var resourceGroupName = "testrg"; //The cluster and database that are created as part of the Prerequisites var clusterName = "mykustocluster"; var databaseName = "mykustodatabase"; var eventHubConnectionName = "myeventhubconnect"; //The event hub that is created as part of the Prerequisites var eventHubResourceId = "/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>"; var consumerGroup = "$Default"; var location = "Central US"; //The table and column mapping are created as part of the Prerequisites var tableName = "StormEvents"; var mappingRuleName = "StormEvents_CSV_Mapping"; var dataFormat = EventHubDataFormat.CSV; var compression = "None"; var databaseRouting = "Multi"; var eventHubConnectionData = new EventHubDataConnection( eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression, databaseRouting: databaseRouting ); await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, eventHubConnectionName, eventHubConnectionData);
设置 建议的值 字段说明 tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。 subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。 clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。 clientSecret PlaceholderClientSecret 可以访问租户中资源的应用程序的客户端密码。 resourceGroupName testrg 包含群集的资源组的名称。 clusterName mykustocluster 群集的名称。 databaseName mykustodatabase 群集中目标数据库的名称。 eventHubConnectionName myeventhubconnect 所需的数据连接名称。 tableName StormEvents 目标数据库中目标表的名称。 mappingRuleName StormEvents_CSV_Mapping 与目标表相关的列映射的名称。 dataFormat csv 消息的数据格式。 eventHubResourceId 资源 ID 包含要引入的数据的事件中心的资源 ID。 consumerGroup $Default 事件中心的使用者组。 location 美国中部 数据连接资源的位置。 compression “Gzip”或“None” 数据压缩的类型。 databaseRouting 多或单 连接的数据库路由。 如果将该值设置为“单个”,则数据连接会按 databaseName 设置中指定的内容被路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。
安装所需的库。
pip install azure-common pip install azure-mgmt-kusto
创建用于身份验证的 Azure AD 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。
运行以下代码。
from azure.mgmt.kusto import KustoManagementClient from azure.mgmt.kusto.models import EventHubDataConnection from azure.identity import ClientSecretCredential #Directory (tenant) ID tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Application ID client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Client Secret client_secret = "xxxxxxxxxxxxxx" subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" credentials = ServicePrincipalCredentials( client_id=client_id, secret=client_secret, tenant=tenant_id ) kusto_management_client = KustoManagementClient(credentials, subscription_id) resource_group_name = "myresourcegroup" #The cluster and database that are created as part of the Prerequisites cluster_name = "mycluster" database_name = "mydatabase" data_connection_name = "myeventhubconnect" #The event hub that is created as part of the Prerequisites event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/myresourcegroup/providers/Microsoft.EventHub/namespaces/myeventhubnamespace/eventhubs/myeventhub""; consumer_group = "$Default" location = "Central US" #The table and column mapping that are created as part of the Prerequisites table_name = "mytable" mapping_rule_name = "mytablemappingrule" data_format = "csv" database_routing = "Multi" #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python poller = kusto_management_client.data_connections.create_or_update( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name, parameters=EventHubDataConnection( event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location, table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing ) ) poller.wait() print(poller.result())
设置 建议的值 字段说明 tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。 subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。 client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。 client_secret xxxxxxxxxxxxxx 可以访问租户中资源的应用程序的客户端密码。 resource_group_name myresourcegroup 包含群集的资源组的名称。 cluster_name mycluster 群集的名称。 database_name mydatabase 群集中目标数据库的名称。 data_connection_name myeventhubconnect 所需的数据连接名称。 table_name mytable 目标数据库中目标表的名称。 mapping_rule_name mytablemappingrule 与目标表相关的列映射的名称。 data_format csv 消息的数据格式。 event_hub_resource_id 资源 ID 包含要引入的数据的事件中心的资源 ID。 consumer_group $Default 事件中心的使用者组。 location 美国中部 数据连接资源的位置。 databaseRouting 多或单 连接的数据库路由。 如果将该值设置为“单个”,则数据连接会按 databaseName 设置中指定的内容被路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。
以下示例显示用于添加事件中心数据连接的 Azure 资源管理器模板。 可以使用此窗体在 Azure 门户中编辑和部署模板。
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"namespaces_eventhubns_name": {
"type": "string",
"defaultValue": "eventhubns",
"metadata": {
"description": "Specifies the Event Hubs Namespace name."
}
},
"EventHubs_eventhubdemo_name": {
"type": "string",
"defaultValue": "eventhubdemo",
"metadata": {
"description": "Specifies the event hub name."
}
},
"consumergroup_default_name": {
"type": "string",
"defaultValue": "$Default",
"metadata": {
"description": "Specifies the consumer group of the event hub."
}
},
"Clusters_kustocluster_name": {
"type": "string",
"defaultValue": "kustocluster",
"metadata": {
"description": "Specifies the name of the cluster"
}
},
"databases_kustodb_name": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Specifies the name of the database"
}
},
"tables_kustotable_name": {
"type": "string",
"defaultValue": "kustotable",
"metadata": {
"description": "Specifies the name of the table"
}
},
"mapping_kustomapping_name": {
"type": "string",
"defaultValue": "kustomapping",
"metadata": {
"description": "Specifies the name of the mapping rule"
}
},
"dataformat_type": {
"type": "string",
"defaultValue": "csv",
"metadata": {
"description": "Specifies the data format"
}
},
"databaseRouting_type": {
"type": "string",
"defaultValue": "Single",
"metadata": {
"description": "The database routing for the connection. If you set the value to **Single**, the data connection will be routed to a single database in the cluster as specified in the *databaseName* setting. If you set the value to **Multi**, you can override the default target database using the *Database* EventData property."
}
},
"dataconnections_kustodc_name": {
"type": "string",
"defaultValue": "kustodc",
"metadata": {
"description": "Name of the data connection to create"
}
},
"subscriptionId": {
"type": "string",
"defaultValue": "[subscription().subscriptionId]",
"metadata": {
"description": "Specifies the subscriptionId of the event hub"
}
},
"resourceGroup": {
"type": "string",
"defaultValue": "[resourceGroup().name]",
"metadata": {
"description": "Specifies the resourceGroup of the event hub"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
"apiVersion": "2022-02-01",
"name": "[concat(parameters('Clusters_kustocluster_name'), '/', parameters('databases_kustodb_name'), '/', parameters('dataconnections_kustodc_name'))]",
"location": "[parameters('location')]",
"kind": "EventHub",
"properties": {
"managedIdentityResourceId": "[resourceId('Microsoft.Kusto/clusters', parameters('clusters_kustocluster_name'))]",
"eventHubResourceId": "[resourceId(parameters('subscriptionId'), parameters('resourceGroup'), 'Microsoft.EventHub/namespaces/eventhubs', parameters('namespaces_eventhubns_name'), parameters('EventHubs_eventhubdemo_name'))]",
"consumerGroup": "[parameters('consumergroup_default_name')]",
"tableName": "[parameters('tables_kustotable_name')]",
"mappingRuleName": "[parameters('mapping_kustomapping_name')]",
"dataFormat": "[parameters('dataformat_type')]",
"databaseRouting": "[parameters('databaseRouting_type')]"
}
}
]
}
删除事件中心数据连接
若要从 Azure 门户中删除事件中心连接,请执行以下操作:
- 转到你的群集。 在左侧菜单中选择“数据库”。 然后选择包含目标表的数据库。
- 在左侧菜单中选择“数据连接”。 然后选中相关事件中心数据连接旁边的复选框。
- 在顶部菜单栏中选择“删除”。
在最后一个向导页中,选择“管理数据连接”卡。 如果你已退出向导,请按照上一个选项卡中的说明通过 Azure 门户删除数据连接。
若要删除事件中心连接,请运行以下命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);
若要删除事件中心连接,请运行以下命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)