Поделиться через


Прием данных с помощью Azure Stream Analytics в Azure Cosmos DB для PostgreSQL

Область применения: Azure Cosmos DB для PostgreSQL (на базе расширения базы данных Citus до PostgreSQL)

Azure Stream Analytics — это подсистема аналитики и обработки событий в режиме реального времени, предназначенная для обработки больших объемов данных быстрой потоковой передачи данных с устройств, датчиков и веб-сайтов. Эта служба также доступна в среде выполнения Azure IoT Edge, что позволяет обрабатывать данные на устройствах Интернета вещей.

Схема, демонстрирующая архитектуру Stream Analytics с Помощью Azure Cosmos DB для PostgreSQL.

Azure Cosmos DB для PostgreSQL освещается в рабочих нагрузках в режиме реального времени, таких как IoT. Для этих рабочих нагрузок Stream Analytics может выступать в качестве безкода, производительности и масштабируемой альтернативы предварительной обработки и потоковой передачи данных из Центры событий Azure, Центр Интернета вещей Azure и Хранилище BLOB-объектов Azure в Azure Cosmos DB для PostgreSQL.

Действия по настройке Stream Analytics

Примечание.

В этой статье используется Центр Интернета вещей Azure в качестве примера источника данных, но этот метод применим к любому другому источнику, поддерживаемому Stream Analytics. Кроме того, следующие демонстрационные данные поступают из симулятора телеметрии устройств Интернета вещей Azure. В этой статье не рассматривается настройка симулятора.

  1. В портал Azure разверните меню портала в левом верхнем углу и выберите "Создать ресурс".

  2. В списке результатов выберите Аналитика>Задание Stream Analytics.

  3. Заполните страницу задания Stream Analytics следующими сведениями:

    • Подписка — выберите подписку Azure, которую необходимо использовать для этого задания.
    • Группа ресурсов— выберите ту же группу ресурсов, что и центр Интернета вещей.
    • Имя — введите имя для идентификации задания Stream Analytics.
    • Регион — выберите регион Azure для размещения задания Stream Analytics. Используйте географическое расположение, ближайшее к пользователям, для повышения производительности и снижения затрат на передачу данных.
    • Среда размещения. Выберите облако для развертывания в облаке Azure или Edge для развертывания на устройстве IoT Edge.
    • Единицы потоковой передачи— выберите количество единиц потоковой передачи для вычислительных ресурсов, которые необходимо выполнить.
  4. Выберите Проверить и создать, а затем выберите Создать. Уведомление о ходе развертывания должно отображаться в правом верхнем углу.

    Снимок экрана: форма задания Stream Analytics.

  5. Настройте входные данные задания.

    Снимок экрана: настройка входных данных задания в Stream Analytics.

    1. После развертывания ресурсов перейдите к заданию Stream Analytics. Выберите "Добавить входные>данные потока>"Центр Интернета вещей.

    2. На странице Центр Интернета вещей укажите такие значения:

      • Псевдоним ввода— введите имя для идентификации входных данных задания.
      • Подписка— выберите подписку Azure, которая имеет учетную запись Центр Интернета вещей.
      • Центр Интернета вещей— выберите имя центра Интернета вещей.
    3. Выберите Сохранить.

    4. После добавления входного потока можно также проверить или скачать поток данных. В следующем коде показаны данные для примера события:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "aaaa0000-bb11-2222-33cc-444444dddddd",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Настройте выходные данные задания.

    1. На странице задания Stream Analytics выберите outputs>Add>PostgreSQL database (preview).

      Снимок экрана: выбор выходных данных базы данных PostgreSQL.

    2. На странице Azure PostgreSQL укажите следующие значения:

      • Псевдоним выходных данных— введите имя для идентификации выходных данных задания.
      • Выберите "Указать параметры базы данных PostgreSQL" вручную и введите полное доменное имя сервера, базу данных, таблицу, имя пользователя и пароль. В примере набора данных используйте таблицу device_data.
    3. Выберите Сохранить.

    Настройте выходные данные задания в Azure Stream Analytics.

  7. Определите запрос преобразования.

    Запрос преобразования в Azure Stream Analytics.

    1. На странице задания Stream Analytics выберите "Запрос " в меню слева.

    2. В этом руководстве вы используете только альтернативные события из Центр Интернета вещей в Azure Cosmos DB для PostgreSQL, чтобы уменьшить общий размер данных. Скопируйте и вставьте на панели запросов следующий запрос:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Выберите Сохранить запрос.

      Примечание.

      Запрос используется не только для выборки данных, но и для извлечения нужных атрибутов из потока данных. Настраиваемый параметр запроса с Stream Analytics полезен при предварительной обработке или преобразовании данных перед приемом в базу данных.

  8. Запустите задание Stream Analytics и проверьте выходные данные.

    1. Вернитесь на страницу обзора задания и нажмите кнопку Запуск.

    2. На странице начального задания выберите "Теперь" для времени начала выходных данных задания и нажмите кнопку "Пуск".

    3. Задание занимает некоторое время, чтобы начать первый раз, но после активации он продолжает выполняться по мере поступления данных. Через несколько минут можно запросить кластер, чтобы убедиться, что данные загружены.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Примечание.

Функция тестового подключения в настоящее время не поддерживается для Azure Cosmos DB для PostgreSQL и может вызвать ошибку, даже если подключение работает нормально.

Следующие шаги

Узнайте, как создать панель мониторинга в режиме реального времени с помощью Azure Cosmos DB для PostgreSQL.