Общие шаблоны запросов в Azure Stream Analytics
Запросы в Azure Stream Analytics выражаются на языке запросов на основе SQL. Эти языковые конструкции описаны в справочнике по языку запросов Stream Analytics.
Конструктор запросов может выражать простую сквозную логику для перемещения данных событий из входного потока в хранилище выходных данных. Либо же конструктор может выполнять комплексное сопоставление шаблонов и темпоральный анализ для вычисления агрегатов по различным временным окнам, как описано в руководстве Создание решения IoT с помощью Stream Analytics. Вы можете объединить данные из нескольких источников, чтобы сгруппировать события потоковой передачи и выполнять поиск по статическим эталонным данным, что позволит повысить информативность значений событий. Вы можете также записать данные на несколько выходов.
В этой статье описаны решения для нескольких стандартных шаблонов запросов на основе реальных сценариев.
Поддерживаемые форматы данных
Azure Stream Analytics поддерживает обработку событий в форматах CSV, JSON и Avro. Форматы JSON и Avro могут содержать сложные типы, такие как вложенные объекты (записи) или массивы. Дополнительные сведения о работе с этими сложными типами данных см. в разделе Анализ данных JSON и AVRO.
Отправка данных на несколько выходов
Для вывода данных в различные приемники выходных данных можно использовать несколько инструкций SELECT. Например, одна инструкция SELECT может выводить оповещение на основе порогового значения, а другое может выводить события в хранилище BLOB-объектов.
Рассмотрим следующие входные данные:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
И вам нужны следующие два выходных данных из запроса:
ArchiveOutput:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
AlertOutput:
| Make | Time | Count |
| --- | --- | --- |
| Make2 |2023-01-01T00:00:10.0000000Z |3 |
Запрос с двумя операторами SELECT с выходными данными архива и выходными данными оповещений в качестве выходных данных:
SELECT
*
INTO
ArchiveOutput
FROM
Input TIMESTAMP BY Time
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO
AlertOutput
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING
[Count] >= 3
Предложение INTO сообщает службе Stream Analytics, в которой выходные данные записываются. Первая инструкция SELECT определяет запрос к серверу, который получает данные из входных данных и отправляет их в выход с именем ArchiveOutput. Второй запрос агрегирует и фильтрует данные перед отправкой результатов в нижестоящий вывод системы оповещений с именем AlertOutput.
Предложение WITH можно использовать для определения нескольких блоков вложенных запросов. При этом вы получаете преимущество, так как нужно открывать меньше читателей в источнике входных данных.
Запрос:
WITH ReaderQuery AS (
SELECT
*
FROM
Input TIMESTAMP BY Time
)
SELECT * INTO ArchiveOutput FROM ReaderQuery
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO AlertOutput
FROM ReaderQuery
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING [Count] >= 3
Дополнительные сведения см. в разделе Предложение WITH.
Простой запрос к серверу
Простой запрос к серверу можно использовать для копирования данных входного потока в выходные данные. Например, если поток данных, содержащих сведения об автомобиле в режиме реального времени, необходимо сохранить в базе данных SQL для последующего анализа, простой сквозной запрос выполняет задание.
Рассмотрим следующие входные данные:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
Вы хотите, чтобы выходные данные были одинаковыми для входных данных:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
Ниже приведен запрос:
SELECT
*
INTO Output
FROM Input
Этот запрос SELECT * проектирует все поля входящего события и отправляет их в выходные данные. Вместо этого можно проецируемые только необходимые поля в инструкции SELECT . В следующем примере инструкции SELECT проекты только полей Make и Time из входных данных.
Рассмотрим следующие входные данные:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
Выходные данные должны иметь только поля Make and Time:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:04.0000000Z |
Ниже приведен запрос , который проектит только необходимые поля:
SELECT
Make, Time
INTO Output
FROM Input
Совпадение строк с помощью инструкций LIKE и NOT LIKE
Для проверки соответствия поля определенному шаблону можно использовать инструкции LIKENOT LIKE. Например, можно использовать фильтр, чтобы вернуть только номерные знаки, начинающиеся с буквы A
и заканчивающиеся номером 9
.
Рассмотрим следующие входные данные:
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
Вы хотите, чтобы выходные данные имели номерные знаки, начинающиеся с буквы A
и заканчивающиеся номером 9
:
| Make | License_plate | Time |
| --- | --- | --- |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
Ниже приведен запрос , использующий оператор LIKE:
SELECT
*
FROM
Input TIMESTAMP BY Time
WHERE
License_plate LIKE 'A%9'
Используйте инструкцию LIKE для проверки значения поля License_plate. Он должен начинаться с буквы A
, а затем иметь любую строку от нуля или более символов, заканчивая числом 9.
Вычисление по прошлым событиям
Для просмотра прошлых событий в течение временного окна и сравнения их с текущим событием может использоваться функция LAG. Например, сделать текущий автомобиль можно выводить, если он отличается от того, что последний автомобиль прошел через платный стенд.
Пример входных данных:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
Пример выходных данных:
| Make | Time |
| --- | --- |
| Make2 |2023-01-01T00:00:02.0000000Z |
Пример запроса:
SELECT
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(minute, 1)) <> Make
Используйте LAG, чтобы вернуться во входном потоке на одно событие назад, получить значение Make, сравнить его со значением Make текущего события и вывести событие.
Дополнительные сведения см. в статье LAG.
Возврат последнего события в окне
Так как события используются системой в режиме реального времени, функция не может определить, является ли событие последним для этого периода времени. Для этого входной поток должен быть присоединен к другому, где время события — максимальное время для всех событий в этом окне.
Пример входных данных:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
Пример выходных данных с информацией о последних автомобилях в двух десятиминутных временных окнах:
| License_plate | Make | Time |
| --- | --- | --- |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
Пример запроса:
WITH LastInWindow AS
(
SELECT
MAX(Time) AS LastEventTime
FROM
Input TIMESTAMP BY Time
GROUP BY
TumblingWindow(minute, 10)
)
SELECT
Input.License_plate,
Input.Make,
Input.Time
FROM
Input TIMESTAMP BY Time
INNER JOIN LastInWindow
ON DATEDIFF(minute, Input, LastInWindow) BETWEEN 0 AND 10
AND Input.Time = LastInWindow.LastEventTime
Первый шаг запроса находит максимальную метку времени в 10-минутных окнах, то есть метку времени последнего события для этого окна. Второй шаг — соединить результаты первого запроса с исходным потоком, чтобы найти событие, соответствующее последней метке времени в каждом окне.
DATEDIFF — это функция для работы с датами, которая сравнивает метки времени и возвращает разницу во времени между двумя полями даты и времени. Дополнительные сведения см. в статье о функциях для работы с датами.
Дополнительные сведения о присоединении потоков см. в описании инструкции JOIN.
Агрегация данных за определенный период времени
Чтобы вычислить информацию за период времени, можно агрегировать данные. В этом примере инструкция вычисляет количество за последние 10 секунд времени для каждого конкретного вида автомобиля.
Пример входных данных:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
Пример выходных данных:
| Make | Count |
| --- | --- |
| Make1 | 2 |
| Make2 | 1 |
Запрос:
SELECT
Make,
COUNT(*) AS Count
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
При таком агрегировании автомобили группируются по полю Make, а их число подсчитывается каждые 10 секунд. Выходные данные имеют make and Count of автомобилей, которые прошли через платный стенд.
TumblingWindow — это функция управления окнами, используемая для группирования событий. Агрегирование можно применять ко всем сгруппированным событиям. Дополнительные сведения см. в описании функций управления окнами.
Дополнительные сведения об агрегировании см. в описании агрегатных функций.
Периодический вывод значений
Если события отсутствуют или нерегулярны, регулярные выходные данные интервала можно создать из более разреженных входных данных. Например, создавайте каждые 5 секунд событие, сообщающее последнюю видимую точку данных.
Пример входных данных:
| Time | Value |
| --- | --- |
| "2014-01-01T06:01:00" |1 |
| "2014-01-01T06:01:05" |2 |
| "2014-01-01T06:01:10" |3 |
| "2014-01-01T06:01:15" |4 |
| "2014-01-01T06:01:30" |5 |
| "2014-01-01T06:01:35" |6 |
Пример выходных данных (первые 10 строк):
| Window_end | Last_event.Time | Last_event.Value |
| --- | --- | --- |
| 2014-01-01T14:01:00.000Z |2014-01-01T14:01:00.000Z |1 |
| 2014-01-01T14:01:05.000Z |2014-01-01T14:01:05.000Z |2 |
| 2014-01-01T14:01:10.000Z |2014-01-01T14:01:10.000Z |3 |
| 2014-01-01T14:01:15.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:20.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:25.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:30.000Z |2014-01-01T14:01:30.000Z |5 |
| 2014-01-01T14:01:35.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:40.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:45.000Z |2014-01-01T14:01:35.000Z |6 |
Пример запроса:
SELECT
System.Timestamp() AS Window_end,
TopOne() OVER (ORDER BY Time DESC) AS Last_event
FROM
Input TIMESTAMP BY Time
GROUP BY
HOPPINGWINDOW(second, 300, 5)
Этот запрос создает события каждые 5 секунд и выводит последнее событие, полученное ранее. От длительности HOPPINGWINDOW зависит, насколько далеко будет возвращаться запрос при поиске последнего события.
Дополнительные сведения см. в разделе "Прыгающее" окно.
Выявление корреляции между событиями в потоке
Выявить корреляцию между событиями в одном потоке можно путем возврата к предыдущим событиям с помощью функции LAG. Например, выходные данные можно создавать каждый раз, когда два последовательных автомобиля из одного и того же Make проходят через платный стенд за последние 90 секунд.
Пример входных данных:
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make1 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make2 |DEF-987 |2023-01-01T00:00:03.0000000Z |
| Make1 |GHI-345 |2023-01-01T00:00:04.0000000Z |
Пример выходных данных:
| Make | Time | Current_car_license_plate | First_car_license_plate | First_car_time |
| --- | --- | --- | --- | --- |
| Make1 |2023-01-01T00:00:02.0000000Z |AAA-999 |ABC-123 |2023-01-01T00:00:01.0000000Z |
Пример запроса:
SELECT
Make,
Time,
License_plate AS Current_car_license_plate,
LAG(License_plate, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_license_plate,
LAG(Time, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(second, 90)) = Make
Функция LAG может вернуться во входном потоке на одно событие назад и получить значение Make, сравнив его со значением Make текущего события. При выполнении условия данные из предыдущего события можно спроектировать с помощью функции LAG в инструкцию SELECT.
Дополнительные сведения см. в статье LAG.
Определение промежутка между событиями
Длительность события можно вычислить на основе данных о последнем событии запуска после получения события завершения. Этот запрос может быть полезен для определения времени, затрачиваемого пользователем на страницу или функцию.
Пример входных данных:
| User | Feature | Event | Time |
| --- | --- | --- | --- |
| user@location.com |RightMenu |Start |2023-01-01T00:00:01.0000000Z |
| user@location.com |RightMenu |End |2023-01-01T00:00:08.0000000Z |
Пример выходных данных:
| User | Feature | Duration |
| --- | --- | --- |
| user@location.com |RightMenu |7 |
Пример запроса:
SELECT
[user],
feature,
DATEDIFF(
second,
LAST(Time) OVER (PARTITION BY [user], feature LIMIT DURATION(hour, 1) WHEN Event = 'start'),
Time) as duration
FROM input TIMESTAMP BY Time
WHERE
Event = 'end'
Функцию LAST можно использовать для получения последнего события при определенном условии. В этом примере условие — это событие типа Start, которое позволяет секционировать поиск по пользователю и функции с помощью PARTITION BY. Таким образом, каждый пользователь и функция обрабатываются независимо при поиске события Start. LIMIT DURATION ограничивает время поиска между событиями End и Start в прошлом промежутке в 1 час.
Число уникальных значений
Для подсчета количества уникальных значений поля, которые отображаются в потоке в течение определенного временного окна, можно использовать инструкции COUNT и DISTINCT. Вы можете создать запрос, чтобы вычислить количество уникальных автомобилей, прошедших через платный стенд в 2-секундном окне.
Пример входных данных:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
Пример выходных данных:
| Count_make | Time |
| --- | --- |
| 2 |2023-01-01T00:00:02.000Z |
| 1 |2023-01-01T00:00:04.000Z |
Пример запроса:
SELECT
COUNT(DISTINCT Make) AS Count_make,
System.TIMESTAMP() AS Time
FROM Input TIMESTAMP BY TIME
GROUP BY
TumblingWindow(second, 2)
COUNT(DISTINCT Make) возвращает количество уникальных значений в столбце Make в течение определенного временного окна. Дополнительные сведения см. в разделе Агрегатная функция COUNT.
Получение первого события в окне
Вы можете использовать IsFirst
для получения первого события в окне времени. Например, можно выводить сведения о первом автомобиле через каждые 10 минут.
Пример входных данных:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
Пример выходных данных:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
Пример запроса:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) = 1
Функция IsFirst также может секционировать данные и вычислять первое событие для каждой конкретной марки автомобиля (Make) в течение 10-минутного интервала.
Пример выходных данных:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
Пример запроса:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) OVER (PARTITION BY Make) = 1
Дополнительные сведения см. в разделе IsFirst.
Удаление повторяющихся событий за период
При выполнении операции, такой как вычисление средних значений по событиям в заданном окне времени, следует отфильтровать повторяющиеся события. В следующем примере второе событие является дубликатом первого.
Пример входных данных:
| DeviceId | Time | Attribute | Value |
| --- | --- | --- | --- |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 2 |2018-07-27T00:00:01.0000000Z |Temperature |40 |
| 1 |2018-07-27T00:00:05.0000000Z |Temperature |60 |
| 2 |2018-07-27T00:00:05.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:10.0000000Z |Temperature |100 |
Пример выходных данных:
| AverageValue | DeviceId |
| --- | --- |
| 70 | 1 |
|45 | 2 |
Пример запроса:
WITH Temp AS (
SELECT Value, DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, System.Timestamp()
)
SELECT
AVG(Value) AS AverageValue, DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
При выполнении первой инструкции повторяющиеся записи объединяются в одну, так как поля в группе по предложению совпадают. Таким образом, он удаляет дубликаты.
Указание логики для различных случаев и значений (операторы CASE)
Инструкции CASE могут предоставлять различные вычисления для разных полей на основе определенного критерия. Например, назначьте полосу A
автомобилям Make1
и полосе B
любому другому сделать.
Пример входных данных:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
Пример выходных данных:
| Make |Dispatch_to_lane | Time |
| --- | --- | --- |
| Make1 |"A" |2023-01-01T00:00:01.0000000Z |
| Make2 |"B" |2023-01-01T00:00:02.0000000Z |
Пример запроса:
SELECT
Make
CASE
WHEN Make = "Make1" THEN "A"
ELSE "B"
END AS Dispatch_to_lane,
System.TimeStamp() AS Time
FROM
Input TIMESTAMP BY Time
Выражение CASE сравнивает выражение с набором простых выражений, чтобы определить их результат. В этом примере транспортные средства отправляются в полосу, а транспортные средства Make1
любого другого элемента будут назначены по полосе A
B
.
Дополнительные сведения см. в описании выражения case.
Преобразование данных
Данные можно трансформировать в другие типы в режиме реального времени с помощью метода CAST. Например, вес автомобиля можно преобразовать из типа nvarchar(max) в тип bigint и использовать в числовых вычислениях.
Пример входных данных:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
Пример выходных данных:
| Make | Weight |
| --- | --- |
| Make1 |3000 |
Пример запроса:
SELECT
Make,
SUM(CAST(Weight AS BIGINT)) AS Weight
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
Используйте инструкцию CAST, чтобы указать тип данных. Ознакомьтесь со списком поддерживаемых типов данных Azure Stream Analytics.
Дополнительные сведения см. в статье о функции преобразования типов.
Определение продолжительности условия
Для условий, охватывающих несколько событий, продолжительность условия можно задать с помощью функции LAG. Предположим, произошла ошибка, которая привела к неправильному отображению массы всех автомобилей (больше 20 000 фунтов). Необходимо вычислить длительность ошибки.
Пример входных данных:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |2000 |
| Make2 |2023-01-01T00:00:02.0000000Z |25000 |
| Make1 |2023-01-01T00:00:03.0000000Z |26000 |
| Make2 |2023-01-01T00:00:04.0000000Z |25000 |
| Make1 |2023-01-01T00:00:05.0000000Z |26000 |
| Make2 |2023-01-01T00:00:06.0000000Z |25000 |
| Make1 |2023-01-01T00:00:07.0000000Z |26000 |
| Make2 |2023-01-01T00:00:08.0000000Z |2000 |
Пример выходных данных:
| Start_fault | End_fault |
| --- | --- |
| 2023-01-01T00:00:02.000Z |2023-01-01T00:00:07.000Z |
Пример запроса:
WITH SelectPreviousEvent AS
(
SELECT
*,
LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previous_time,
LAG([weight]) OVER (LIMIT DURATION(hour, 24)) as previous_weight
FROM input TIMESTAMP BY [time]
)
SELECT
LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN previous_weight < 20000 ) [Start_fault],
previous_time [End_fault]
FROM SelectPreviousEvent
WHERE
[weight] < 20000
AND previous_weight > 20000
Первая инструкция SELECT выявляет корреляцию текущего измерения веса с предыдущим и проектирует его вместе с текущим измерением. Вторая инструкция SELECT ищет последнее событие, для которого previous_weight меньше 20 000, если текущий вес меньше 20 000, а previous_weight текущего события больше 20 000.
End_fault является текущим событием, в котором предыдущее событие было неисправным, и Start_fault является последним событием, неуклюжным до этого.
Обработка событий с независимым временем (подпотоки)
События могут поступать с опозданием или не по порядку из-за рассинхронизации часов поставщиков событий, секций или сетевой задержки. Например, часы устройства для TollID 2 — пять секунд за 1, а часы устройства для TollID 3 — 10 секунд за tollID 1. Вычисление для каждого пункта взимания дорожного сбора может выполняться независимо. При этом в качестве метки времени используются только данные собственных часов пункта.
Пример входных данных:
| LicensePlate | Make | Time | TollID |
| --- | --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:01.0000000Z | 1 |
| YHN 6970 |Make2 |2023-07-27T00:00:05.0000000Z | 1 |
| QYF 9358 |Make1 |2023-07-27T00:00:01.0000000Z | 2 |
| GXF 9462 |Make3 |2023-07-27T00:00:04.0000000Z | 2 |
| VFE 1616 |Make2 |2023-07-27T00:00:10.0000000Z | 1 |
| RMV 8282 |Make1 |2023-07-27T00:00:03.0000000Z | 3 |
| MDR 6128 |Make3 |2023-07-27T00:00:11.0000000Z | 2 |
| YZK 5704 |Make4 |2023-07-27T00:00:07.0000000Z | 3 |
Пример выходных данных:
| TollID | Count |
| --- | --- |
| 1 | 2 |
| 2 | 2 |
| 1 | 1 |
| 3 | 1 |
| 2 | 1 |
| 3 | 1 |
Пример запроса:
SELECT
TollId,
COUNT(*) AS Count
FROM input
TIMESTAMP BY Time OVER TollId
GROUP BY TUMBLINGWINDOW(second, 5), TollId
Предложение TIMESTAMP BY OVER обеспечивает независимый просмотр временной шкалы каждого устройства отдельно с помощью подпотоков. Исходящие события для каждого TollID создаются по мере их вычисления. Это означает, что события отображаются по порядку по отношению к каждому TollID, а не переупорядочиваются, как если бы часы всех устройств были синхронизированы.
Дополнительные сведения см. в описании TIMESTAMP BY OVER.
Сеансовые окна
Окно сеанса — это окно, которое продолжает расширяться по мере возникновения событий и закрывается для вычислений, если событие не получено после определенного периода времени или если окно достигает максимальной длительности. Это окно особенно удобно для расчета данных взаимодействия с пользователем. Окно запускается, когда пользователь начинает взаимодействовать с системой, и закрывается, когда события больше не наблюдаются, что означает, что пользователь прекратил взаимодействие. Например, если пользователь взаимодействует с веб-страницей, для которой в журнал заносится число щелчков, сеансовое окно можно использовать для определения времени, в течение которого пользователь взаимодействует с сайтом.
Пример входных данных:
| User_id | Time | URL |
| --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | "www.example.com/a.html" |
| 0 | 2017-01-26T00:00:20.0000000Z | "www.example.com/b.html" |
| 1 | 2017-01-26T00:00:55.0000000Z | "www.example.com/c.html" |
| 0 | 2017-01-26T00:01:10.0000000Z | "www.example.com/d.html" |
| 1 | 2017-01-26T00:01:15.0000000Z | "www.example.com/e.html" |
Пример выходных данных:
| User_id | StartTime | EndTime | Duration_in_seconds |
| --- | --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | 2017-01-26T00:01:10.0000000Z | 70 |
| 1 | 2017-01-26T00:00:55.0000000Z | 2017-01-26T00:01:15.0000000Z | 20 |
Пример запроса:
SELECT
user_id,
MIN(time) as StartTime,
MAX(time) as EndTime,
DATEDIFF(second, MIN(time), MAX(time)) AS duration_in_seconds
FROM input TIMESTAMP BY time
GROUP BY
user_id,
SessionWindow(minute, 1, 60) OVER (PARTITION BY user_id)
Инструкция SELECT проецирует данные, относящиеся к взаимодействию с пользователем, а также длительность взаимодействия. Группирование данных выполняется по пользователям и окнам SessionWindow, которые закрываются при отсутствии взаимодействия в течение 1 минуты с максимальным размером окна 60 минут.
Дополнительные сведения о SessionWindow см. в статье о сеансовом окне.
Определяемые пользователем функции в JavaScript и C#
Язык запросов Azure Stream Analytics можно расширить с помощью пользовательских функций, написанных на языке JavaScript или C#. Определяемые пользователем функции — это пользовательские или сложные вычисления, которые нельзя легко выразить с помощью языка SQL. Эти пользовательские функции можно определить один раз, а затем многократно использовать в запросе. Например, определяемая пользователем функция может использоваться для преобразования шестнадцатеричного значения nvarchar(max) в значение bigint.
Пример входных данных:
| Device_id | HexValue |
| --- | --- |
| 1 | "B4" |
| 2 | "11B" |
| 3 | "121" |
Пример выходных данных:
| Device_id | Decimal |
| --- | --- |
| 1 | 180 |
| 2 | 283 |
| 3 | 289 |
function hex2Int(hexValue){
return parseInt(hexValue, 16);
}
public static class MyUdfClass {
public static long Hex2Int(string hexValue){
return int.Parse(hexValue, System.Globalization.NumberStyles.HexNumber);
}
}
SELECT
Device_id,
udf.Hex2Int(HexValue) AS Decimal
From
Input
Определяемая пользователем функция вычисляет большое значение из HexValue на каждом событии, используемом.
Дополнительные сведения см. в инструкциях по JavaScript и C#.
Расширенное сопоставление шаблонов с помощью MATCH_RECOGNIZE
MATCH_RECOGNIZE — это расширенный механизм сопоставления шаблонов, который можно использовать для сопоставления последовательности событий с точно определенным шаблоном регулярного выражения. Например, если для банкомата получено два последовательных предупреждения, требующих уведомления администратора, для него активируется мониторинг в режиме реального времени.
Входные данные:
| ATM_id | Operation_id | Return_Code | Time |
| --- | --- | --- | --- |
| 1 | "Entering Pin" | "Success" | 2017-01-26T00:10:00.0000000Z |
| 2 | "Opening Money Slot" | "Success" | 2017-01-26T00:10:07.0000000Z |
| 2 | "Closing Money Slot" | "Success" | 2017-01-26T00:10:11.0000000Z |
| 1 | "Entering Withdraw Quantity" | "Success" | 2017-01-26T00:10:08.0000000Z |
| 1 | "Opening Money Slot" | "Warning" | 2017-01-26T00:10:14.0000000Z |
| 1 | "Printing Bank Balance" | "Warning" | 2017-01-26T00:10:19.0000000Z |
Выходные данные:
| ATM_id | First_Warning_Operation_id | Warning_Time |
| --- | --- | --- |
| 1 | "Opening Money Slot" | 2017-01-26T00:10:14.0000000Z |
SELECT *
FROM input TIMESTAMP BY time OVER ATM_id
MATCH_RECOGNIZE (
LIMIT DURATION(minute, 1)
PARTITION BY ATM_id
MEASURES
First(Warning.ATM_id) AS ATM_id,
First(Warning.Operation_Id) AS First_Warning_Operation_id,
First(Warning.Time) AS Warning_Time
AFTER MATCH SKIP TO NEXT ROW
PATTERN (Success+ Warning{2,})
DEFINE
Success AS Success.Return_Code = 'Success',
Warning AS Warning.Return_Code <> 'Success'
) AS patternMatch
Этот запрос соответствует по крайней мере двум последовательным событиям сбоя и создает оповещение при выполнении условий. PATTERN определяет регулярное выражение, которое будет использоваться при сопоставлении. В данном случае это как минимум одна успешная операция, за которой следуют по крайней мере два предупреждения. Успешное выполнение и предупреждение определяются с помощью значения Return_Code и после выполнения условия MEASURES проецируются со значением ATM_id, первой операцией с предупреждением и временем первого предупреждения.
Дополнительные сведения см. в описании MATCH_RECOGNIZE.
Геозоны и геопространственные запросы
Azure Stream Analytics предоставляет встроенные геопространственные функции, которые можно использовать для реализации таких сценариев, как управление парком, обеспечение общего доступа к маршрутам, организация работы подключенных автомобилей и отслеживание ресурсов. Геопространственные данные могут быть приняты в формате GeoJSON или WKT в составе потока событий или ссылочных данных. Например, компания, которая специализируется на производстве устройств для печати паспортов, может сдавать их в аренду государственным организациям и консульствам. Расположение этих устройств жестко контролируется во избежание незаконного перемещения и потенциального использования для подделки паспортов. Каждое устройство оснащено GPS-трекером, который возвращает информацию в задание Azure Stream Analytics. Производитель хотел бы отслеживать расположение этих устройств и получать оповещения, если какое-либо из них покинет санкционированную область, чтобы иметь возможность удаленно его отключить, предупредить уполномоченные органы и вернуть себе оборудование.
Входные данные:
| Equipment_id | Equipment_current_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13288797982818 47.64082002051315)" | 2017-01-26T00:10:00.0000000Z |
| 1 | "POINT(-122.13307252987875 47.64081350934929)" | 2017-01-26T00:11:00.0000000Z |
| 1 | "POINT(-122.13308862313283 47.6406508603241)" | 2017-01-26T00:12:00.0000000Z |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
Входные ссылочные данные
| Equipment_id | Equipment_lease_location |
| --- | --- |
| 1 | "POLYGON((-122.13326028450979 47.6409833866794,-122.13261655434621 47.6409833866794,-122.13261655434621 47.64061471602751,-122.13326028450979 47.64061471602751,-122.13326028450979 47.6409833866794))" |
Выходные данные:
| Equipment_id | Equipment_alert_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
SELECT
input.Equipment_id AS Equipment_id,
input.Equipment_current_location AS Equipment_current_location,
input.Time AS Time
FROM input TIMESTAMP BY time
JOIN
referenceInput
ON input.Equipment_id = referenceInput.Equipment_id
WHERE
ST_WITHIN(input.Equipment_current_location, referenceInput.Equipment_lease_location) = 1
Запрос позволяет производителю автоматически отслеживать расположение устройств и получать оповещения при выходе устройства из разрешенной геозоны. Встроенная геопространственная функция позволяет пользователям использовать данные GPS в запросе без библиотек сторонних производителей.
Дополнительные сведения см. в статье Сценарии геозоны и геопространственного агрегирования с Azure Stream Analytics.
Получить помощь
За дополнительной информацией перейдите на страницу вопросов и ответов об Azure Stream Analytics.