Упражнение. Обработка событий и хранение данных в Azure Cosmos DB

Завершено

Вторая функция может прослушивать события определенного пространства имен в концентраторе событий Azure и обрабатывать их и хранить в базе данных, созданной с помощью Azure Cosmos DB.

Создание базы данных с помощью Azure Cosmos DB

Чтобы создать базу данных, используйте команду az cosmosdb create. Команда использует учетную запись Azure Cosmos DB, базу данных и контейнер SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Для нашего сценария температура интересна. Поэтому мы определяем temperatureStatus в качестве ключа раздела.

Создание, настройка и развертывание другой функции Azure

С помощью центров событий можно начать с потоков данных в мегабайтах и увеличиться до гигабайтов или терабайтов. Функция автоинфляции — это один из многих вариантов, доступных для масштабирования количества единиц пропускной способности в соответствии с потребностями использования.

Приложения-потребители для каждой функции имеют отдельное представление о потоке событий. Они читают поток независимо, по своему темпу и со своими собственными смещениями.

В нашем сценарии вы создадите одну из используемых функций Azure в качестве примера. Чтобы создать функцию, следуя рекомендациям, она должна быть независимой, с собственной учетной записью хранения и привязками для свободного связывания и масштабируемости.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Получение строк подключения

Функция-получатель должна знать о своей учетной записи хранения и концентраторе событий. Кроме того, необходимо учитывать базу данных, в которую она записывает обработанные события.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Для проверки правильности установки переменной можно использовать команду echo $EVENT_HUB_CONNECTION_STRING. В противном случае выполните следующую команду:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Эти строки подключения должны храниться в параметрах приложения для учетной записи Функций Azure.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Заметка

В рабочих средах можно использовать экземпляр Azure Key Vault для хранения строк подключения и управления ими.

Создание приложения функций

Перед созданием следующей функции убедитесь, что вы находитесь в правильной папке.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

Команда создает приложение, как в последнем упражнении. Удалите тестовые файлы, обновите local.settings.file с помощью команды fetch-app-settings, а затем замените существующий файл Function.java.

cd telemetry-functions-consumer
rm -r src/test

Обновите локальные параметры для локального выполнения и отладки.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Затем откройте файл Function.java и замените содержимое следующим кодом:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Создайте другой новый файл с именем TelemetryItem.java в том же расположении, что и Function.java, и добавьте следующий код:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Когда концентратор событий получает сообщение, он создает событие. Функция processSensorData выполняется при получении события. Затем он обрабатывает данные события и использует выходную привязку Azure Cosmos DB для отправки результатов в базу данных. Мы снова используем класс TelemetryItem.java. Объекты TelemetryItem можно рассматривать как контракт, формируемый потребителями, между участниками этой системы, работающей на основе событий.

Запуск локально

С помощью Функций Azure вы можете получать события со всего мира. Да, вы даже можете получать события локально на вашей машине разработки!

mvn clean package
mvn azure-functions:run

После сообщений сборки и запуска вы увидите входящие события при выполнении функции:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

На портале Azure перейдите к учетной записи Azure Cosmos DB. Выберите Data Explorer, выберите TelemetryInfo, а затем выберите Элементы, чтобы просмотреть ваши данные по их поступлении.

снимок экрана, показывающий TelemetryInfo в обозревателе данных Azure Cosmos DB.

Развертывание в Azure

Теперь давайте переместим всю рабочую нагрузку в облаке. Чтобы развернуть функции в Azure Functions, используйте команду Maven mvn azure-functions:deploy. Убедитесь, что вы все еще находитесь в правильном репозитории, модуль телеметрических функций.

mvn azure-functions:deploy

Замечательный! Мы развернули весь сценарий телеметрии, отправив данные в концентратор событий и обработав их с помощью независимой функции. Функция обрабатывает данные, а затем сохраняет результат в базе данных, созданной с помощью Azure Cosmos DB. Как убедиться, что наше приложение соответствует нашим предопределенным требованиям? С помощью мониторинга.