你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 C# 为 Azure Synapse 数据资源管理器创建事件中心数据连接(预览版)
Azure Synapse 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Azure Synapse 数据资源管理器提供从事件中心、IoT 中心和写入 blob 容器的 blob 的引入(数据加载)。
在本文中,将使用 C# 为 Azure Synapse 数据资源管理器创建事件中心数据连接。
先决条件
Azure 订阅。 创建免费 Azure 帐户。
使用 Synapse Studio 或 Azure 门户创建数据资源管理器池
创建数据资源管理器数据库。
在 Synapse Studio 的左窗格中,选择“数据”。
选择“+”(添加新资源)>“数据资源管理器池”,并使用以下信息:
设置 建议值 说明 池名称 contosodataexplorer 要使用的数据资源管理器池的名称 名称 TestDatabase 该数据库名称在群集中必须是唯一的。 默认保留期 365 保证数据可供查询的时间跨度(以天为单位)。 时间跨度从引入数据时算起。 默认缓存期 31 使频繁查询的数据在 SSD 存储或 RAM(而不是更长期的存储)中保持可用的时间跨度(以天为单位)。 选择“创建”以创建数据库。 创建过程通常需要不到一分钟的时间。
注意
如果 Synapse 工作区在启用了数据外泄保护的情况下适用托管虚拟网络,则将数据从事件中心引入数据资源管理器池将不起作用。
- Visual Studio 2019,下载并使用免费的 Visual Studio 2019 Community Edition。 在安装 Visual Studio 的过程中,请确保启用“Azure 开发”。
在测试群集上创建表
创建与 StormEvents.csv
文件中的数据架构匹配的名为 StormEvents
的表。
提示
下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
定义引入映射
将传入的 CSV 数据映射到创建表时使用的列名称。 在该表上预配 CSV 列映射对象。
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
安装 C# NuGet
身份验证
若要运行以下示例,你需要可以访问资源的 Microsoft Entra 应用程序和服务主体。 若要创建免费的 Microsoft Entra 应用程序并在订阅级别添加角色分配,请参阅创建 Microsoft Entra 应用程序。 还需要目录(租户)ID、应用程序 ID 和客户端密码。
添加事件中心数据连接
以下示例演示如何以编程方式添加事件中心数据连接。 有关使用 Azure 门户添加事件中心数据连接的信息,请参阅连接到事件中心。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
设置 | 建议的值 | 字段说明 |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 租户 ID。 也称为目录 ID。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 用于创建资源的订阅 ID。 |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 可以访问租户中资源的应用程序的客户端 ID。 |
clientSecret | xxxxxxxxxxxxxx | 可以访问租户中资源的应用程序的客户端密码。 |
resourceGroupName | testrg | 包含群集的资源组的名称。 |
clusterName | mykustocluster | 群集的名称。 |
databaseName | mykustodatabase | 群集中目标数据库的名称。 |
dataConnectionName | myeventhubconnect | 所需的数据连接名称。 |
tableName | StormEvents | 目标数据库中目标表的名称。 |
mappingRuleName | StormEvents_CSV_Mapping | 与目标表相关的列映射的名称。 |
dataFormat | csv | 消息的数据格式。 |
eventHubResourceId | 资源 ID | 包含要引入的数据的事件中心的资源 ID。 |
consumerGroup | $Default | 事件中心的使用者组。 |
location | 美国中部 | 数据连接资源的位置。 |
compression | “Gzip”或“None” | 数据压缩的类型。 |
生成数据
请参阅可生成数据并将其发送到事件中心的示例应用。
事件可以包含一个或多个记录(直到达到事件的大小限制)。 在下面的示例中,我们将发送两个事件,每个事件有五个追加的记录:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
清理资源
若要删除数据连接,请使用以下命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);