Подключение к данным Центров событий (предварительная версия)
Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Azure Synapse Data Explorer предлагает возможности непрерывного приема данных из управляемых клиентами Центров событий.
Конвейер приема центров событий передает события в Azure Synapse Data Explorer несколькими шагами. Сначала вы создаете центры событий в портал Azure. Затем вы создадите целевую таблицу в Azure Synapse Data Explorer, в которой данные в определенном формате будут приняты с использованием заданных свойств приема. Подключение к Центрам событий должно поддерживать маршрутизацию событий. Данные внедряются с выбранными свойствами в соответствии с сопоставлением свойств системы событий. Создайте подключение к Центрам событий для создания центров событий и отправки событий. Этим процессом можно управлять с помощью портала Azure, программно с помощью C# или Pythonили с помощью шаблона Azure Resource Manager.
Общие сведения о приеме данных в Azure Synapse Data Explorer см. в разделе Обзор приема данных в Azure Synapse Data Explorer.
Формат данных
Данные считываются из Центров событий в виде объектов EventData .
См. раздел Поддерживаемые форматы.
Примечание.
Центр событий не поддерживает формат RAW.
Данные можно сжимать с помощью алгоритма сжатия
GZip
. УкажитеCompression
в свойствах сжатия.- Сжатые форматы (Avro, Parquet, ORC) не поддерживают сжатие данных.
- Сжатые данные не поддерживают пользовательское кодирование и внедренные системные свойства.
Свойства приема
Свойства приема указывают процессу приема, куда направлять данные и как их обрабатывать. Вы можете указать свойства приема для приема событий с помощью EventData.Properties. Можно задать следующие свойства:
Свойство | Description |
---|---|
Таблицу | Имя (с учетом регистра) существующей целевой таблицы. Переопределяет набор Table на панели Data Connection . |
Формат | Формат данных. Переопределяет набор Data format на панели Data Connection . |
IngestionMappingReference | Имя существующего сопоставления приема, которое будет использоваться. Переопределяет набор Column mapping на панели Data Connection . |
Сжатие | Сжатие данных, None (по умолчанию), или сжатие GZip . |
Кодировка | Кодировка данных, по умолчанию — UTF8. Может быть любой из поддерживаемых кодировок .NET. |
Теги | Список тегов, которые нужно связать с принятыми данными, в формате строки массива JSON. При использовании тегов может понизиться производительность. |
Примечание.
Принимаются только события, поставленные в очередь после создания подключения к данным.
Маршрутизация событий
При настройке подключения Центров событий к кластеру Azure Synapse Data Explorer необходимо указать целевые свойства таблицы (имя таблицы, формат данных, сжатие и сопоставление). Маршрутизация по умолчанию для данных также указывается как static routing
.
Вы также можете указать свойства целевой таблицы для каждого события, используя свойства события. Соединение будет динамически маршрутизировать данные, как указано в EventData.Properties, переопределяя статические свойства для этого события.
В следующем примере задайте сведения о центрах событий и отправьте данные метрик погоды в таблицу WeatherMetrics
.
Данные имеют формат json
. Свойство mapping1
предварительно определено для таблицы WeatherMetrics
.
Предупреждение
В этом примере для простоты примера используется проверка подлинности строка подключения для подключения к Центрам событий. Однако жесткое программирование строка подключения в скрипте требует очень высокой степени доверия к приложению и несет риски безопасности.
Для долгосрочных безопасных решений используйте один из следующих вариантов:
- Проверка подлинности без пароля
- Сохраните строка подключения в Azure Key Vault и используйте этот метод, чтобы получить его в коде.
var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 };
var data = JsonConvert.SerializeObject(metric);
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");
// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();
Сопоставление свойств системы событий
Системные свойства хранят свойства, заданные службой Центров событий, в то время событие задается. Подключение Центров событий Azure Synapse Data Explorer внедряет выбранные свойства в целевую базу данных в таблице.
Примечание.
- Системные свойства поддерживаются для
json
и табличных форматов (csv
,tsv
и т. д.) и не поддерживаются для сжатых данных. При использовании неподдерживаемого формата данные по-прежнему будут приняты, но свойства будут игнорироваться. - Для табличных данных системные свойства поддерживаются только для сообщений о событиях с одной записью.
- Для данных JSON системные свойства также поддерживаются для сообщений о событиях с несколькими записями. В таких случаях системные свойства добавляются только к первой записи сообщения о событии.
- При сопоставлении в формате
csv
свойства добавляются в начало записи в порядке, определенном в таблице Системные свойства. - При сопоставлении в формате
json
свойства добавляются в соответствии с именами свойств, указанных в таблице Системные свойства.
Свойства системы
Центры событий предоставляют следующие системные свойства:
Свойство | Тип данных | Description |
---|---|---|
x-opt-enqueued-time | datetime | Время поставки событий в очередь в формате UTC |
x-opt-sequence-number | длинный | Логический порядковый номер события в потоке секционирования Центров событий |
x-opt-offset | строка | Смещение события из потока секционирования Центров событий. Идентификатор смещения является уникальным в пределах секции потока Центров событий |
x-opt-publisher | строка | Имя издателя, если сообщение было отправлено в конечную точку издателя |
x-opt-partition-key | строка | Ключ соответствующей секции, в которой хранится событие |
Если вы выбрали Свойства системы событий в разделе Источник данных таблицы, вы должны включить свойства в схему таблицы и сопоставление.
Примеры схемы сопоставления
Пример схемы сопоставления таблицы
Если данные содержат три столбца (Timespan
, Metric
и Value
) и включены свойства x-opt-enqueued-time
и x-opt-offset
, создайте или измените схему таблицы с помощью следующей команды:
.create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)
Пример сопоставления CSV-файла
Выполните следующие команды, чтобы добавить данные в начало записи. Запишите порядковые номера.
.create table TestTable ingestion csv mapping "CsvMapping1"
'['
' { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
' { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
' { "column" : "Value", "Properties":{"Ordinal":"4"}},'
' { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
' { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
']'
Пример сопоставления JSON
Данные добавляются с помощью сопоставления свойств системы. Выполните следующие команды.
.create table TestTable ingestion json mapping "JsonMapping1"
'['
' { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
' { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
' { "column" : "Value", "Properties":{"Path":"$.value"}},'
' { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
' { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
']'
Подключение Центров событий
Примечание.
Для обеспечения максимальной производительности создавайте все ресурсы в том же регионе, что и кластер Azure Synapse Data Explorer.
Создание центров событий
Если у вас еще нет, создайте центры событий. Подключение к Центрам событий можно управлять с помощью портал Azure, программно с помощью C# или Python или с помощью шаблона Azure Resource Manager.
Примечание.
- Так как число секций неизменно, вам следует продумать масштаб заранее.
- Группа потребителей должна быть уникальной для каждого потребителя. Создайте группу потребителей, предназначенную для подключения к Azure Synapse Data Explorer.
Отправка событий
Ознакомьтесь с примером приложения , которое создает данные и отправляет его в Центры событий.
Пример создания примеров данных см. в разделе "Прием данных из Центров событий" в Azure Synapse Data Explorer
Настройка решения для геоизбыточного аварийного восстановления
Центры событий предлагают решение для геокатастасного восстановления .
Azure Synapse Data Explorer не поддерживает Alias
пространства имен Центров событий. Чтобы реализовать геоизбыточное восстановление в решении, создайте два подключения к данным Центров событий: один для основного пространства имен и один для дополнительного пространства имен. Azure Synapse Data Explorer будет прослушивать оба подключения Центров событий.
Примечание.
Ответственность за реализацию отработки отказа из основного пространства имен в дополнительное несет пользователь.
Следующие шаги
- Прием данных из Центров событий в Azure Synapse Data Explorer
- Создание подключения к данным Центров событий для Azure Synapse Data Explorer с помощью C#
- Создание подключения к данным Центров событий для Azure Synapse Data Explorer с помощью Python
- Создание подключения к данным Центров событий для Azure Synapse Data Explorer с помощью шаблона Azure Resource Manager