你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
事件中心数据连接(预览版)
Azure 事件中心是大数据流式处理平台和事件引入服务。 Azure Synapse 数据资源管理器提供从客户管理的事件中心持续引入数据的功能。
Azure 事件中心引入管道通过几个步骤将事件传输到 Azure Synapse 数据资源管理器。 首先,在 Azure 门户中创建事件中心。 然后,在 Azure Synapse 数据资源管理器中创建一个目标表,特定格式的数据将通过给定的引入属性引入到该表中。 事件中心连接需要知道事件路由。 根据事件系统属性映射,使用选定的属性嵌入数据。 与事件中心建立连接,以创建事件中心并发送事件。 可以通过 Azure 门户使用 C# 或 Python 以编程方式管理此过程,也可以使用 Azure 资源管理器模板来这样做。
有关 Azure Synapse 数据资源管理器中的数据引入功能的常规信息,请参阅 Azure Synapse 数据资源管理器数据引入概述。
数据格式
将以 EventData 对象的形式从事件中心读取数据。
请参阅支持的格式。
注意
事件中心不支持 .raw 格式。
可使用
GZip
压缩算法来压缩数据。 指定引入属性中的Compression
。- 压缩格式(Avro、Parquet、ORC)不支持数据压缩。
- 压缩数据不支持自定义编码和嵌入式系统属性。
引入属性
引入属性会指示引入过程、数据路由到的位置以及数据处理方式。 可以使用 EventData.Properties 指定事件引入的引入属性。 可以设置以下属性:
属性 | 说明 |
---|---|
表 | 现有目标表的名称(区分大小写)。 替代“Data Connection ”窗格上设置的“Table ”。 |
格式 | 数据格式。 替代“Data Connection ”窗格上设置的“Data format ”。 |
IngestionMappingReference | 要使用的现有引入映射的名称。 替代“Data Connection ”窗格上设置的“Column mapping ”。 |
压缩 | 数据压缩。None (默认值)或 GZip 压缩。 |
编码 | 数据编码,默认值为 UTF8。 可以是 .NET 支持的任何编码。 |
Tags | 将要与引入的数据(格式设置为 JSON 数组字符串)关联的标记的列表。 使用标记时存在性能影响。 |
注意
只有创建数据连接后进入队列的事件才会被引入。
事件路由
当你设置事件中心与 Azure Synapse 数据资源管理器群集之间的连接时,请指定目标表属性(表名称、数据格式、压缩和映射)。 数据的默认路由也称为 static routing
。
还可以使用事件属性指定每个事件的目标表属性。 连接将按照 EventData.Properties 中指定的要求动态路由数据,替代此事件的静态属性。
在以下示例中,设置事件中心详细信息并将天气指标数据发送到表 WeatherMetrics
。
数据采用 json
格式。 mapping1
在 WeatherMetrics
表中预定义。
警告
方便起见,本示例使用连接字符串身份验证连接到事件中心。 然而,将连接字符串硬编码到脚本中需要对应用程序具有高度信任,并且存在安全风险。
对于安全的长期解决方案,请使用以下选项之一:
- 无密码身份验证
- 将连接字符串存储在 Azure Key Vault 中,并使用此方法在代码中检索此字符串。
var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 };
var data = JsonConvert.SerializeObject(metric);
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");
// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();
事件系统属性映射
系统属性在事件排队时存储由事件中心服务设置的属性。 Azure Synapse 数据资源管理器事件中心连接会将所选属性嵌入到在表中登陆的数据。
注意
系统属性
事件中心公开以下系统属性:
properties | 数据类型 | 说明 |
---|---|---|
x-opt-enqueued-time | datetime | 将事件排队时的 UTC 时间 |
x-opt-sequence-number | long | 事件中心分区流中的事件的逻辑序列号 |
x-opt-offset | string | 事件与事件中心分区流之间的偏移量。 偏移量标识符在事件中心流的分区中独一无二 |
x-opt-publisher | string | 发布服务器名称(如果消息已发送到发布服务器终结点) |
x-opt-partition-key | string | 存储了事件的相应分区的分区键 |
如果在表的“数据源”部分中选择了“事件系统属性”,则必须在表架构和映射中包含这些属性。
架构映射示例
表架构映射示例
如果数据包含三列(Timespan
、Metric
和 Value
)并且包含的属性是 x-opt-enqueued-time
和 x-opt-offset
,请使用以下命令创建或更改表架构:
.create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)
CSV 映射示例
运行以下命令,将数据添加到记录的开头。 记下序号值。
.create table TestTable ingestion csv mapping "CsvMapping1"
'['
' { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
' { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
' { "column" : "Value", "Properties":{"Ordinal":"4"}},'
' { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
' { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
']'
JSON 映射示例
使用系统属性映射添加数据。 运行以下命令:
.create table TestTable ingestion json mapping "JsonMapping1"
'['
' { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
' { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
' { "column" : "Value", "Properties":{"Path":"$.value"}},'
' { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
' { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
']'
事件中心连接
注意
为了获得最佳性能,请在 Azure Synapse 数据资源管理器群集所在的同一区域中创建所有资源。
创建事件中心
如果还没有事件中心,请创建一个事件中心。 可以通过 Azure 门户、使用 C# 或 Python 以编程方式或使用 Azure 资源管理器模板来管理到数据中心的连接。
注意
- 分区计数不可更改,因此在设置分区计数时应考虑长期规模。
- 使用者组对于每个使用者来说必须独一无二。 创建专用于 Azure Synapse 数据资源管理器连接的使用者组。
发送事件
请参阅用于生成数据并将其发送到事件中心的示例应用。
有关如何生成示例数据的示例,请参阅将数据从事件中心引入到 Azure Synapse 数据资源管理器
设置异地灾难恢复解决方案
事件中心提供异地灾难恢复解决方案。
Azure Synapse 数据资源管理器不支持 Alias
事件中心命名空间。 若要在解决方案中实现异地灾难恢复,请创建两个事件中心数据连接:一个用于主命名空间,另一个用于辅助命名空间。 Azure Synapse 数据资源管理器将侦听这两个事件中心连接。
注意
用户负责实现从主命名空间到辅助命名空间的故障转移。