你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Python,为 Azure Synapse 数据资源管理器创建事件中心数据连接(预览版)

Azure Synapse 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure Synapse 数据资源管理器提供从事件中心、IoT 中心和写入 blob 容器的 blob 的引入(数据加载)。

本文使用 Python 为 Azure Synapse 数据资源管理器创建事件中心数据连接。

先决条件

  • Azure 订阅。 创建免费 Azure 帐户

  • 使用 Synapse StudioAzure 门户创建数据资源管理器池

  • 创建数据资源管理器数据库。

    1. 在 Synapse Studio 的左窗格中,选择“数据”。

    2. 选择“+”(添加新资源)>“数据资源管理器池”,并使用以下信息:

      设置 建议值 说明
      池名称 contosodataexplorer 要使用的数据资源管理器池的名称
      名称 TestDatabase 该数据库名称在群集中必须是唯一的。
      默认保留期 365 保证数据可供查询的时间跨度(以天为单位)。 时间跨度从引入数据时算起。
      默认缓存期 31 使频繁查询的数据在 SSD 存储或 RAM(而不是更长期的存储)中保持可用的时间跨度(以天为单位)。
    3. 选择“创建”以创建数据库。 创建过程通常需要不到一分钟的时间。

在测试群集上创建表

创建与 StormEvents.csv 文件中的数据架构匹配的名为 StormEvents 的表。

提示

下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

定义引入映射

将传入的 CSV 数据映射到创建表时使用的列名称。 在该表上预配 CSV 列映射对象

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

安装 Python 包

要为 Azure Synapse 数据资源管理器安装 Python 包,请打开其路径中包含 Python 的命令提示符。 运行以下命令:

pip install azure-common
pip install azure-mgmt-kusto

身份验证

若要运行以下示例,你需要可以访问资源的 Microsoft Entra 应用程序和服务主体。 若要创建免费的 Microsoft Entra 应用程序并在订阅级别添加角色分配,请参阅创建 Microsoft Entra 应用程序。 还需要目录(租户)ID、应用程序 ID 和客户端密码。

添加事件中心数据连接

以下示例演示如何以编程方式添加事件中心数据连接。 请参阅连接到事件中心,了解如何使用 Azure 门户添加事件中心数据连接。

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
设置 建议的值 字段说明
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。
client_secret xxxxxxxxxxxxxx 可以访问租户中资源的应用程序的客户端密码。
resource_group_name testrg 包含群集的资源组的名称。
cluster_name mykustocluster 群集的名称。
database_name mykustodatabase 群集中目标数据库的名称。
data_connection_name myeventhubconnect 所需的数据连接名称。
table_name StormEvents 目标数据库中目标表的名称。
mapping_rule_name StormEvents_CSV_Mapping 与目标表相关的列映射的名称。
data_format csv 消息的数据格式。
event_hub_resource_id 资源 ID 包含要引入的数据的事件中心的资源 ID。
consumer_group $Default 事件中心的使用者组。
location 美国中部 数据连接资源的位置。

清理资源

若要删除数据连接,请使用以下命令:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

后续步骤