Создание решения IoT с помощью Stream Analytics
Введение
В этой статье содержатся инструкции по анализу данных в режиме реального времени с использованием Azure Stream Analytics. Разработчики могут легко объединять потоки данных, такие как сведения о посещении, данные журналов и созданные устройствами события, с историческими записями или справочными данными для получения бизнес-информации. Являясь полностью управляемой службой потоковой обработки в режиме реального времени, которая размещена на платформе Microsoft Azure, Azure Stream Analytics обеспечивает высокую надежность, малую задержку и масштабируемую обработку, что помогает приступить к работе за считаные минуты.
После завершения работы с этим решением вы сможете:
- Изучить портал Azure Stream Analytics.
- Настроить и развернуть задание потоковой передачи.
- Сформулировать реальные проблемы и устранить их с помощью языка запросов Stream Analytics.
- Уверенно разрабатывать решения потоковой передачи для клиентов с помощью Stream Analytics.
- Использовать процедуры мониторинга и ведения журнала для устранения неполадок.
Необходимые компоненты
Для создания описанного здесь решения вам потребуется:
Общие сведения о сценарии "Hello, Toll!"
Станции сбора дорожной платы представляют собой распространенное явление. Они встречаются на многих скоростных дорогах, мостах и туннелях по всему миру. Каждая станция имеет несколько пунктов сбора платы. В пунктах, работающих в ручном режиме, водитель останавливается и передает деньги служащему. В пунктах, работающих в автоматическом режиме, размещенный на крыше пункта датчик сканирует RFID-карту, прикрепленную на ветровом стекле автомобиля, во время его проезда через пункт. Легко визуализировать прохождение транспортных средств через эти платные станции как поток событий, над которыми можно выполнять интересные операции.
Входящие данные
В этом решении используется два потока данных. Датчики, установленные на въездах и выездах станций сбора платы, создают первый поток. Второй поток — это статический набор данных, содержащий данные регистрации транспортного средства.
Входной поток данных
Входной поток данных содержит сведения об автомобилях, въезжающих в пункты сбора платы. События данных выхода передаются в динамический поток в концентратор событий из веб-приложения, включенного в пример приложения.
| TollID | EntryTime | LicensePlate | State | Make | Model | VehicleType | VehicleWeight | Toll | Tag |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| 1 |2014-09-10 12:01:00.000 |JNB 7001 |NY |Honda |CRV |1 |0 |7 | |
| 1 |2014-09-10 12:02:00.000 |YXZ 1001 |NY |Toyota |Camry |1 |0 |4 |123456789 |
| 3 |2014-09-10 12:02:00.000 |ABC 1004 |CT |Ford |Taurus |1 |0 |5 |456789123 |
| 2 |2014-09-10 12:03:00.000 |XYZ 1003 |CT |Toyota |Corolla |1 |0 |4 | |
| 1 |2014-09-10 12:03:00.000 |BNJ 1007 |NY |Honda |CRV |1 |0 |5 |789123456 |
| 2 |2014-09-10 12:05:00.000 |CDE 1007 |NJ |Toyota |4x4 |1 |0 |6 |321987654 |
Ниже приведено краткое описание столбцов:
Столбец | Description |
---|---|
ИД пункта сбора | Уникальный идентификатор пункта сбора платы |
Время въезда | Дата и время въезда транспортного средства в пункт сбора платы в формате UTC |
LicensePlate | Номерной знак транспортного средства |
State | Штат в США |
Создание | Изготовитель транспортного средства |
Модель | Номер модели транспортного средства |
Тип транспортного средства | 1 — для пассажирского транспорта, 2 — для коммерческого транспорта |
Тип веса | Вес транспортного средства в тоннах. 0 — для пассажирского транспорта. |
Расскажите | Значение платы в долларах США |
Тег | Электронная метка на автомобиле для автоматической оплаты. Пустая метка, если оплата осуществляется вручную |
Выходной поток данных
Выходной поток данных содержит сведения об автомобилях, выезжающих из станций сбора платы. События данных выхода передаются в динамический поток в концентратор событий из веб-приложения, включенного в пример приложения.
ИД пункта сбора | Время выезда | LicensePlate |
---|---|---|
1 | 10.09.2014T12:03:00.0000000Z | JNB 7001 |
1 | 10.09.2014T12:03:00.0000000Z | YXZ 1001 |
3 | 10.09.2014T12:04:00.0000000Z | ABC 1004 |
2 | 10.09.2014T12:07:00.0000000Z | XYZ 1003 |
1 | 10.09.2014T12:08:00.0000000Z | BNJ 1007 |
2 | 10.09.2014T12:07:00.0000000Z | CDE 1007 |
Ниже приведено краткое описание столбцов:
Столбец | Description |
---|---|
ИД пункта сбора | Уникальный идентификатор пункта сбора платы |
Время выезда | Дата и время выезда транспортного средства с пункта сбора платы в формате UTC |
LicensePlate | Номерной знак транспортного средства |
Данные регистрации коммерческого транспортного средства
В решении используется статический снимок базы данных регистрации коммерческого транспортного средства. Эти данные сохраняются в виде JSON-файла в хранилище BLOB-объектов Azure, включенном в пример.
LicensePlate | RegistrationId | Срок действия истек |
---|---|---|
SVT 6023 | 285429838 | 1 |
XLZ 3463 | 362715656 | 0 |
BAC 1005 | 876133137 | 1 |
RIV 8632 | 992711956 | 0 |
SNY 7188 | 592133890 | 0 |
ELH 9896 | 678427724 | 1 |
Ниже приведено краткое описание столбцов:
Столбец | Description |
---|---|
LicensePlate | Номерной знак транспортного средства |
RegistrationId | Идентификатор регистрации транспортного средства |
Срок действия истек | Состояние регистрации транспортного средства: 0 — если регистрация транспортного средства действительна. 1 — если срок регистрации истек |
Настройка среды для Azure Stream Analytics
Для создания описанного здесь решения требуется подписка Microsoft Azure. Если у вас нет учетной записи Azure, вы можете запросить бесплатную пробную версию.
Обязательно выполните шаги в разделе "Очистка учетной записи Azure" в конце этой статьи, чтобы максимально эффективно использовать деньги на счете в Azure.
Развертывание примера
Существуют некоторые ресурсы, которые можно легко развернуть в группе ресурсов с помощью нескольких действий. Определение решения см. в этом репозитории GitHub: https://github.com/Azure/azure-stream-analytics/tree/master/Samples/TollApp.
Развертывание шаблона TollApp на портале Azure
Чтобы развернуть среду TollApp в Azure, перейдите по этой ссылке.
Если будет предложено, войдите на портал Azure.
Выберите подписку, в которой взимается плата за различные ресурсы.
Укажите новую группу ресурсов с уникальным именем, например
MyTollBooth
.Выберите расположение Azure.
Укажите для значения интервала количество секунд. Это значение используется в примере веб-приложения, чтобы часто отправлять данные в концентратор событий.
Установите флажок, чтобы принять условия.
Выберите Закрепить на панели мониторинга, чтобы позже можно было легко найти ресурсы.
Выберите Приобрести, чтобы развернуть этот шаблон.
Через некоторое время появится уведомление Развертывание прошло успешно.
Просмотр ресурсов TollApp в Azure Stream Analytics
Войдите на портал Azure.
Найдите группу ресурсов, которую вы называли в предыдущем разделе.
Убедитесь, что в группе ресурсов находятся следующие ресурсы:
- Одна учетная запись Azure Cosmos DB
- одно задание Azure Stream Analytics;
- одна учетная запись хранения Azure;
- Один концентратор событий Azure
- два веб-приложения.
Изучение примера задания TollApp
Начиная с группы ресурсов в предыдущем разделе выберите задание потоковой передачи Stream Analytics, начиная с имени
tollapp
(имя содержит случайные символы для уникальности).На странице обзора задания в поле Запрос просмотрите синтаксис запроса.
SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*) AS Count INTO CosmosDB FROM EntryStream TIMESTAMP BY EntryTime GROUP BY TUMBLINGWINDOW(minute, 3), TollId
Чтобы перефразировать цель запроса, допустим, что вам нужно подсчитать количество автомобилей, въезжающих в пункт сбора платы. Так как в пункт сбора платы поступает непрерывный поток транспортных средств, входные события аналогичны потоку, который никогда не останавливается. Чтобы количественно оценить поток, необходимо определить период времени для измерения. Давайте уточним вопрос: "Сколько автомобилей въезжает в пункт сбора платы каждые три минуты?" Этот период часто называют переворачивающимся.
Как видите, Azure Stream Analytics использует язык запросов, такой же как SQL, и добавляет несколько расширений для указания аспектов запроса, связанных со временем. См. дополнительные сведения о конструкциях управления временем и оконных расширениях, используемых в этом запросе.
Проверьте входные данные примера задания TollApp. В текущем запросе используются только входные данные EntryStream.
- Входные данные EntryStream — это подключение концентратора событий, которое очереди данных, представляющих каждый раз, когда автомобиль входит в платный канал на шоссе. Веб-приложение, которое является частью примера, создает события и данные помещается в очередь в этом концентраторе событий. Обратите внимание, что эти входные данные запрашиваются в предложении FROM запроса потоковой передачи.
- Входные данные exitStream — это подключение концентратора событий, которое очереди данных, представляющих каждый раз, когда автомобиль выходит из платных подключений на шоссе. Эти потоковые входные данные используются в вариантах синтаксиса запроса, которые мы рассмотрим позже.
- Входные данные регистрации — это подключение хранилища BLOB-объектов Azure, указывающее на статистический файл registration.json, при необходимости используемый для поиска. Входные эталонные данные используются в вариантах синтаксиса запроса, которые мы рассмотрим позже.
Просмотрите выходные данные примера задания TollApp.
- Выходные данные Azure Cosmos DB — это контейнер базы данных Azure Cosmos DB, который получает события приемника выходных данных. Обратите внимание, что эти выходные данные используются в предложении INTO запроса потоковой передачи.
Запуск задания потовой передачи TollApp
Чтобы запустить задание потоковой передачи, выполните следующие действия:
На странице обзора задания выберите Запустить.
На панели Запуск задания выберите Сейчас.
Через несколько минут, когда задание уже будет выполняться, на странице обзора задания потоковой передачи просмотрите график Мониторинг. На графике будет отображаться несколько тысяч входных событий и десятки выходных событий.
Просмотр выходных данных Azure Cosmos DB
Найдите группу ресурсов, в которой содержатся ресурсы TollApp.
Выберите учетную запись Azure Cosmos DB с таким шаблоном имени: tollapp<random>-cosmos.
Выберите заголовок Обозреватель данных, чтобы открыть страницу обозревателя данных.
Разверните tollAppDatabase>tollAppCollection>Документы.
В списке идентификаторов показано несколько документов после того, как выходные данные будут доступны.
Выберите каждый идентификатор, чтобы просмотреть документ JSON. Обратите внимание на каждое
tollid
windowend time
count of cars
окно и из этого окна.После дополнительных трех минут доступен еще один набор из четырех документов, один документ на
tollid
каждый.
Получение общего времени, затрачиваемого каждым автомобилем
Среднее время, затрачиваемое на проезд через пункт, помогает оценить эффективность процесса и условий работы для клиентов.
Чтобы определить общее время, объедините поток EntryTime и ExitTime. Присоедините два входных потока к равным столбцам TollId и LicensePlate. Для использования оператора JOIN требуется указать запас, описывающий допустимую разницу во времени между объединенными событиями. С помощью функции DATEDIFF укажите, что события должны происходить с интервалом не более 15 минут. Кроме того, примените функцию DATEDIFF к значениям времени въезда и выезда для вычисления фактического времени, проводимого автомобилем на станции сбора платы. Обратите внимание на различия использования DATEDIFF при применении в инструкции SELECT по сравнению с условием JOIN.
SELECT EntryStream.TollId, EntryStream.EntryTime, ExitStream.ExitTime, EntryStream.LicensePlate, DATEDIFF (minute, EntryStream.EntryTime, ExitStream.ExitTime) AS DurationInMinutes
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN ExitStream TIMESTAMP BY ExitTime
ON (EntryStream.TollId= ExitStream.TollId AND EntryStream.LicensePlate = ExitStream.LicensePlate)
AND DATEDIFF (minute, EntryStream, ExitStream ) BETWEEN 0 AND 15
Чтобы обновить синтаксис запроса задания потовой передачи TollApp, сделайте следующее:
На странице обзора задания выберите Остановить.
Через некоторое время появится уведомление о том, что задание остановлено.
В разделе заголовка "Топология задания" выберите <> Запрос.
Вставьте настроенный SQL-запрос потоковой передачи.
Выберите Сохранить, чтобы сохранить запрос. Щелкните Да, чтобы сохранить изменения.
На странице обзора задания выберите Запустить.
На панели Запуск задания выберите Сейчас.
Просмотр общего времени в выходных данных
Повторите действия, описанные в предыдущем разделе, чтобы просмотреть выходные данные Azure Cosmos DB из задания потоковой передачи. Просмотрите последние JSON-документы.
Например, в этом документе показан пример автомобиля с определенным номерным номером, entrytime
а exit time
также вычисляемое durationinminutes
поле DATEDIFF, показывающее продолжительность платных стендов в течение двух минут:
{
"tollid": 4,
"entrytime": "2018-04-05T06:51:39.0491173Z",
"exittime": "2018-04-05T06:53:09.0491173Z",
"licenseplate": "JVR 9425",
"durationinminutes": 2,
"id": "ff52eb25-d580-7566-2879-1f52bba6601e",
"_rid": "+8E4AI1DZgBjAAAAAAAAAA==",
"_self": "dbs/+8E4AA==/colls/+8E4AI1DZgA=/docs/+8E4AI1DZgBjAAAAAAAAAA==/",
"_etag": "\"ad02f6b8-0000-0000-0000-5ac5c8330000\"",
"_attachments": "attachments/",
"_ts": 1522911283
}
Получение сведений о транспортных средствах с истекшим сроком действия регистрации
Azure Stream Analytics может использовать статические снимки эталонных данных для объединения с потоками временных данных. Чтобы продемонстрировать эту возможность, можно использовать следующий пример вопроса. Входные данные регистрации представляют собой статический JSON-файл большого двоичного объекта, в котором перечислены сведения об истечении срока действия лицензии. Объединившись по номерному знаку, эталонные данные сравниваются с каждым транспортным средством, проходящим через пункт оплаты.
Если коммерческое транспортное средство зарегистрировано в компании автодорожных сборов, оно может проезжать через пункт оплаты без остановки для проверки. Для выявления всех коммерческих транспортных средств с истекшим сроком действия регистрации используйте таблицу подстановки регистрации.
SELECT EntryStream.EntryTime, EntryStream.LicensePlate, EntryStream.TollId, Registration.RegistrationId
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN Registration
ON EntryStream.LicensePlate = Registration.LicensePlate
WHERE Registration.Expired = '1'
Повторите шаги из предыдущего раздела, чтобы обновить синтаксис запроса задания потоковой передачи TollApp.
Повторите действия, описанные в предыдущем разделе, чтобы просмотреть выходные данные Azure Cosmos DB из задания потоковой передачи.
Пример результата:
{
"entrytime": "2018-04-05T08:01:28.0252168Z",
"licenseplate": "GMT 3221",
"tollid": 1,
"registrationid": "763220582",
"id": "47db0535-9716-4eb2-db58-de7886966cbf",
"_rid": "y+F8AJ9QWACSAQAAAAAAAA==",
"_self": "dbs/y+F8AA==/colls/y+F8AJ9QWAA=/docs/y+F8AJ9QWACSAQAAAAAAAA==/",
"_etag": "\"88007d8d-0000-0000-0000-5ac5d7e20000\"",
"_attachments": "attachments/",
"_ts": 1522915298
}
Горизонтальное увеличение масштаба задания
Azure Stream Analytics поддерживает масштабируемость, что позволяет обрабатывать большие объемы данных. Запрос Azure Stream Analytics может с помощью предложения PARTITION BY сообщить системе, что этот шаг горизонтально увеличивает масштаб. Система добавляет специальный столбец PartitionId для сопоставления с входными данными (из концентратора событий) по идентификатору секции.
Чтобы горизонтально увеличить масштаб запроса, разделив его на разделы, измените синтаксис запроса на следующий код:
SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*)AS Count
INTO CosmosDB
FROM EntryStream
TIMESTAMP BY EntryTime
PARTITION BY PartitionId
GROUP BY TUMBLINGWINDOW(minute,3), TollId, PartitionId
Чтобы увеличить количество единиц потоковой передачи задания, сделайте следующее:
Остановите текущее задание.
Обновите синтаксис запроса на странице <> Запрос и сохраните изменения.
Под заголовком CONFIGURE задания потоковой передачи выберите Масштабировать.
Перетяните ползунок Единицы потоковой передачи с 1 до 6. Единицы потоковой передачи определяют объем вычислительной мощности, которую может получать задание. Выберите Сохранить.
Запустите задание потоковой передачи, в котором применяется дополнительное масштабирование. Azure Stream Analytics распределяет работу между большими вычислительными ресурсами и обеспечивает лучшую пропускную способность, секционируя работу между ресурсами с помощью столбца, указанного в предложении PARTITION BY.
Отслеживание задания
Область Мониторинг содержит статистические данные по выполнению задания. Выполните первоначальную настройку, чтобы использовать учетную запись хранения в том же регионе (задайте имя toll, как в остальной части этой статьи).
Доступ к журналам действий можно также получить на панели мониторинга заданий в области Параметры.
Удаление ресурсов TollApp
Остановите выполнение задания Stream Analytics на портале Azure.
Найдите группу ресурсов, содержащую восемь ресурсов, связанных с шаблоном TollApp.
Выберите команду Удалить группу ресурсов. Введите имя группы ресурсов, чтобы подтвердить удаление.
Заключение
В этой статье приведены общие сведения о службе Azure Stream Analytics. Вы узнали, как настроить входные и выходные данные для задания Stream Analytics. С помощью сценария "Платные данные" решение объяснило распространенные типы проблем, возникающих в пространстве данных в движении, и как их можно решить с помощью простых запросов, таких как SQL, в Azure Stream Analytics. Здесь описаны конструкции расширений SQL для работы с временными данными. Здесь также рассмотрено объединение потоков данных, их дополнение статическими справочными данными, а также горизонтальное увеличение масштаба запроса для повышения пропускной способности.
Хотя это решение предоставляет хорошее введение, оно не завершено никакими средствами. Дополнительные шаблоны запросов на языке SQL можно найти в статье Примеры запросов для распространенных шаблонов использования Stream Analytics.