你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Python,为 Azure Synapse 数据资源管理器创建事件中心数据连接(预览版)
Azure Synapse 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure Synapse 数据资源管理器提供从事件中心、IoT 中心和写入 blob 容器的 blob 的引入(数据加载)。
本文使用 Python 为 Azure Synapse 数据资源管理器创建事件中心数据连接。
先决条件
Azure 订阅。 创建免费 Azure 帐户。
使用 Synapse Studio 或 Azure 门户创建数据资源管理器池
创建数据资源管理器数据库。
在 Synapse Studio 的左窗格中,选择“数据”。
选择“+”(添加新资源)>“数据资源管理器池”,并使用以下信息:
设置 建议值 说明 池名称 contosodataexplorer 要使用的数据资源管理器池的名称 名称 TestDatabase 该数据库名称在群集中必须是唯一的。 默认保留期 365 保证数据可供查询的时间跨度(以天为单位)。 时间跨度从引入数据时算起。 默认缓存期 31 使频繁查询的数据在 SSD 存储或 RAM(而不是更长期的存储)中保持可用的时间跨度(以天为单位)。 选择“创建”以创建数据库。 创建过程通常需要不到一分钟的时间。
在测试群集上创建表
创建与 StormEvents.csv
文件中的数据架构匹配的名为 StormEvents
的表。
提示
下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
定义引入映射
将传入的 CSV 数据映射到创建表时使用的列名称。 在该表上预配 CSV 列映射对象。
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
安装 Python 包
要为 Azure Synapse 数据资源管理器安装 Python 包,请打开其路径中包含 Python 的命令提示符。 运行以下命令:
pip install azure-common
pip install azure-mgmt-kusto
身份验证
若要运行以下示例,你需要可以访问资源的 Microsoft Entra 应用程序和服务主体。 若要创建免费的 Microsoft Entra 应用程序并在订阅级别添加角色分配,请参阅创建 Microsoft Entra 应用程序。 还需要目录(租户)ID、应用程序 ID 和客户端密码。
添加事件中心数据连接
以下示例演示如何以编程方式添加事件中心数据连接。 请参阅连接到事件中心,了解如何使用 Azure 门户添加事件中心数据连接。
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
设置 | 建议的值 | 字段说明 |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 租户 ID。 也称为目录 ID。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 用于创建资源的订阅 ID。 |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 可以访问租户中资源的应用程序的客户端 ID。 |
client_secret | xxxxxxxxxxxxxx | 可以访问租户中资源的应用程序的客户端密码。 |
resource_group_name | testrg | 包含群集的资源组的名称。 |
cluster_name | mykustocluster | 群集的名称。 |
database_name | mykustodatabase | 群集中目标数据库的名称。 |
data_connection_name | myeventhubconnect | 所需的数据连接名称。 |
table_name | StormEvents | 目标数据库中目标表的名称。 |
mapping_rule_name | StormEvents_CSV_Mapping | 与目标表相关的列映射的名称。 |
data_format | csv | 消息的数据格式。 |
event_hub_resource_id | 资源 ID | 包含要引入的数据的事件中心的资源 ID。 |
consumer_group | $Default | 事件中心的使用者组。 |
location | 美国中部 | 数据连接资源的位置。 |
清理资源
若要删除数据连接,请使用以下命令:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)